Skip to content

Commit

Permalink
Implement and enable support for whole-endpoint transactions.
Browse files Browse the repository at this point in the history
Fix transactions test.
  • Loading branch information
Glauber Costa authored and BearLemma committed Jan 28, 2022
1 parent 5498e67 commit ba4bc6b
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 80 deletions.
82 changes: 82 additions & 0 deletions cli/tests/lit/transaction.lit
@@ -0,0 +1,82 @@
# SPDX-FileCopyrightText: © 2021 ChiselStrike <info@chiselstrike.com>

# RUN: sh -e @file

cat << EOF > "$TEMPDIR/models/types.ts"
export class Person extends Chisel.ChiselEntity {
name: string = "";
}
EOF

cat << EOF > "$TEMPDIR/endpoints/store_person.js"
import { Person } from "../types.ts";

export default async function chisel(req: Request) {
const req_json = await req.json();

let p = new Person();
p.name = req_json.name;
await p.save();

if (req_json.command == "die with honor") {
throw "Let's see if transaction gets cancelled";
}

return new Response('Mission acomplished');
}
EOF

cat << EOF > "$TEMPDIR/endpoints/retrieve_all.ts"
import { Person } from "../models/types.ts";

export default async function chisel(req: Request) {
let resp = "[";
for await (let p of Person.cursor()) {
resp += p.name + " ";
}
resp += "]";
return new Response(resp);
}
EOF

cat << EOF > "$TEMPDIR/endpoints/write_and_read.ts"
import { Person } from "../models/types.ts";

export default async function chisel(req: Request) {
let p = new Person();
p.name = "ThisIsTheBestName";
await p.save();

let resp = "[";
for await (let p of Person.cursor()) {
resp += p.name + " ";
}
resp += "]";

return new Response(resp);
}
EOF

cd "$TEMPDIR"
$CHISEL apply

$CURL -X POST --data '{ "name": "Adalbrecht" }' $CHISELD_HOST/dev/store_person
# CHECK: Mission acomplished

$CURL $CHISELD_HOST/dev/retrieve_all
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht ]

$CURL -X POST --data '{
"name": "Ruprt",
"command": "die with honor"
}' $CHISELD_HOST/dev/store_person
# CHECK: HTTP/1.1 500 Internal Server Error

$CURL $CHISELD_HOST/dev/retrieve_all
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht ]

$CURL -X POST $CHISELD_HOST/dev/write_and_read
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht ThisIsTheBestName ]
14 changes: 13 additions & 1 deletion docusaurus/docs/data-access.md
Expand Up @@ -212,7 +212,7 @@ The `findMany()` method is convenient, but also problematic if you have a lot of
entities stored because loading them can take a lot of time and memory. In future
releases of ChiselStrike, the runtime will enforce a maximum number of entities
`findMany()` can return and also enforce timeouts at the data store level. The
runtime will also provide optional pagination for the `findMany()` method.
runtime will also provide optional pagination for the `findMany()` method.
:::

## Cursors
Expand Down Expand Up @@ -270,3 +270,15 @@ The methods provided by `ChiselCursor` are outlined in the following table.
The `ChiselCursor` interface is still work-in-progress. For example, methods such as `skip()`, `map()`, and `reduce()` are planned for future releases.
Also, the current implementation of `filter()` takes a _restriction object_, but future ChiselStrike runtimes will allow you to write filter functions using TypeScript, which are automatically converted to efficient database queries in many cases.
:::

## Transacitons

We currently support implicit transactional evaluation. The transaction is created before ChiselStrike
starts evaluating your endpoint and is automatically committed after your endpoint ends and we generate
the HTTP response. In case your endpoint returns a stream, any database-related operation done within
stream generation code will happen outside of the transaction and can result in a crash.

If your code crashes or explicitly throws exception that is not caught, ChiselStrike rollbacks the
transaction automatically.

Explicit user-controlled transactions are coming soon.
5 changes: 2 additions & 3 deletions docusaurus/docs/known_issues.md
Expand Up @@ -20,9 +20,8 @@ While we do plan to provide you with a better experience in the future, for now
want to use external modules, browser-style should work.

* **Transactions:** ChiselStrike aims to fully support transactions at the endpoint boundary and,
at a later date, with user-defined granularity (for advanced users). There is a known bug with
that, though: endpoints should be serialized so RMW is write, but transactions won't be rolled
back if there is an exception in the endpoint and it doesn't complete.
at a later date, with user-defined granularity (for advanced users). We currently wrap the
whole endpoint in one big transaction.

* **Nested models:** (Also known as relationships.) With the exception of the special `OAuthUser`
model, it is not possible to embed a model inside another yet. The code to persist nested models
Expand Down
2 changes: 1 addition & 1 deletion server/src/auth.rs
Expand Up @@ -65,7 +65,7 @@ async fn insert_user_into_db(username: &str) -> Result<String> {
}
user.insert("username".into(), json!(username));
query_engine
.add_row(&oauth_user_type, &user)
.add_row(&oauth_user_type, &user, None)
.await?
.get("id")
.ok_or_else(|| anyhow!("Didn't get user ID from storing a user."))?
Expand Down
38 changes: 19 additions & 19 deletions server/src/db.rs
Expand Up @@ -6,6 +6,7 @@ use crate::policies::FieldPolicies;
use crate::query::engine;
use crate::query::engine::new_query_results;
use crate::query::engine::SqlStream;
use crate::query::engine::TransactionStatic;
use crate::runtime;
use crate::types::{Field, ObjectType, Type, TypeSystem, TypeSystemError, OAUTHUSER_TYPE_NAME};
use crate::JsonObject;
Expand All @@ -21,7 +22,6 @@ use itertools::Itertools;
use pin_project::pin_project;
use serde_json::value::Value;
use sqlx::any::AnyRow;
use sqlx::AnyPool;
use std::collections::HashMap;
use std::collections::HashSet;
use std::pin::Pin;
Expand Down Expand Up @@ -286,7 +286,7 @@ fn sql_where_for_match_login(
}

fn sql_backing_store(
pool: &AnyPool,
tr: TransactionStatic,
columns: Vec<(String, Type)>,
limit_str: &str,
ty: Arc<ObjectType>,
Expand All @@ -313,7 +313,7 @@ fn sql_backing_store(
if policies.transforms.is_empty() {
return Query::Sql(query);
}
let inner = new_query_results(query, pool);
let inner = new_query_results(query, tr);
let pstream = Box::pin(PolicyApplyingStream {
inner,
policies,
Expand Down Expand Up @@ -405,14 +405,14 @@ fn join_stream(columns: &[(String, Type)], left: SqlStream, right: SqlStream) ->
}

fn sql_filter(
pool: &AnyPool,
tr: TransactionStatic,
columns: Vec<(String, Type)>,
limit_str: &str,
alias_count: &mut u32,
inner: Relation,
restrictions: Vec<Restriction>,
) -> Query {
let inner_sql = sql_impl(pool, inner, alias_count);
let inner_sql = sql_impl(tr, inner, alias_count);
let inner_sql = match inner_sql {
Query::Sql(s) => s,
Query::Stream(s) => return filter_stream(s, columns, restrictions),
Expand Down Expand Up @@ -450,8 +450,8 @@ pub(crate) fn sql_restrictions(restrictions: Vec<Restriction>) -> String {
})
}

fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStream {
let inner = new_query_results(s, pool);
fn to_stream(tr: TransactionStatic, s: String, columns: Vec<(String, Type)>) -> SqlStream {
let inner = new_query_results(s, tr);
Box::pin(PolicyApplyingStream {
inner,
policies: FieldPolicies::default(),
Expand All @@ -460,7 +460,7 @@ fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStre
}

fn sql_join(
pool: &AnyPool,
tr: TransactionStatic,
columns: &[(String, Type)],
limit_str: &str,
alias_count: &mut u32,
Expand All @@ -487,15 +487,15 @@ fn sql_join(
// FIXME: Optimize the case of table.left or table.right being just
// a BackingStore with all fields. The database probably doesn't
// care, but will make the logs cleaner.
let lsql = sql_impl(pool, left, alias_count);
let rsql = sql_impl(pool, right, alias_count);
let lsql = sql_impl(tr.clone(), left, alias_count);
let rsql = sql_impl(tr.clone(), right, alias_count);
let (lsql, rsql) = match (lsql, rsql) {
(Query::Sql(l), Query::Sql(r)) => (l, r),
(Query::Stream(l), Query::Sql(r)) => {
return join_stream(columns, l, to_stream(pool, r, right_columns))
return join_stream(columns, l, to_stream(tr, r, right_columns))
}
(Query::Sql(l), Query::Stream(r)) => {
return join_stream(columns, to_stream(pool, l, left_columns), r)
return join_stream(columns, to_stream(tr, l, left_columns), r)
}
(Query::Stream(l), Query::Stream(r)) => return join_stream(columns, l, r),
};
Expand All @@ -517,34 +517,34 @@ fn sql_join(
))
}

fn sql_impl(pool: &AnyPool, rel: Relation, alias_count: &mut u32) -> Query {
fn sql_impl(tr: TransactionStatic, rel: Relation, alias_count: &mut u32) -> Query {
let limit_str = rel
.limit
.map(|x| format!("LIMIT {}", x))
.unwrap_or_else(String::new);
match rel.inner {
Inner::BackingStore(ty, policies) => {
sql_backing_store(pool, rel.columns, &limit_str, ty, policies)
sql_backing_store(tr, rel.columns, &limit_str, ty, policies)
}
Inner::Filter(inner, restrictions) => sql_filter(
pool,
tr,
rel.columns,
&limit_str,
alias_count,
*inner,
restrictions,
),
Inner::Join(left, right) => {
sql_join(pool, &rel.columns, &limit_str, alias_count, *left, *right)
sql_join(tr, &rel.columns, &limit_str, alias_count, *left, *right)
}
}
}

pub(crate) fn sql(pool: &AnyPool, rel: Relation) -> SqlStream {
pub(crate) fn sql(tr: TransactionStatic, rel: Relation) -> SqlStream {
let mut v = 0;
let columns = rel.columns.clone();
match sql_impl(pool, rel, &mut v) {
Query::Sql(s) => to_stream(pool, s, columns),
match sql_impl(tr.clone(), rel, &mut v) {
Query::Sql(s) => to_stream(tr, s, columns),
Query::Stream(s) => s,
}
}

0 comments on commit ba4bc6b

Please sign in to comment.