Skip to content

Commit

Permalink
Avoid optimistic updates for explicit readwrite transactions (#1881)
Browse files Browse the repository at this point in the history
* No optimistic updates for explicit rw transactions
When explicit 'rw' transaction commits, make sure to invalidate affected parts of the cache and signal subscribers of those parts to rerun their querier.

Also in this PR (which is being squash-committed):

* Unit test verifying isolations from rw tx to liveQueries.

* Workaround for vscode don't find tsconfig in src

* Compile with optional chaining as it is mature now

* changes needed to compile with stricter ts options
  • Loading branch information
dfahlander committed Feb 7, 2024
1 parent e8ecb84 commit 1eacbc8
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 30 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"# Build dist/dexie.js, dist/dexie.mjs and dist/dexie.d.ts",
"cd src",
"tsc [--watch 'Watching for file changes']",
"tsc --target es2018 --outdir ../tools/tmp/modern/src/",
"tsc --target es2020 --outdir ../tools/tmp/modern/src/",
"rollup -c ../tools/build-configs/rollup.config.js",
"rollup -c ../tools/build-configs/rollup.umd.config.js",
"rollup -c ../tools/build-configs/rollup.modern.config.js",
Expand Down
2 changes: 2 additions & 0 deletions src/classes/dexie/transaction-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export function enterTransactionScope(
} else {
try {
trans.create(); // Create the native transaction so that complete() or error() will trigger even if no operation is made upon it.
// @ts-ignore Mark the idbtrans object with "_explicit". DBCore middleware won't have access to Dexie trans but will need to have this info.
trans.idbtrans._explicit = true;
db._state.PR1398_maxLoop = 3;
} catch (ex) {
if (ex.name === errnames.InvalidState && db.isOpen() && --db._state.PR1398_maxLoop > 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/live-query/cache/apply-optimistic-ops.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ export function applyOptimisticOps(
const { multiEntry } = index;
const queryRange = req.query.range;
const primaryKey = table.schema.primaryKey;
const extractPrimKey = primaryKey.extractKey;
const extractIndex = index.extractKey;
const extractLowLevelIndex = (index.lowLevelIndex || index).extractKey;
const extractPrimKey = primaryKey.extractKey!;
const extractIndex = index.extractKey!;
const extractLowLevelIndex = (index.lowLevelIndex || index).extractKey!;

let finalResult = ops.reduce((result, op) => {
let modifedResult = result;
Expand Down
48 changes: 33 additions & 15 deletions src/live-query/cache/cache-middleware.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { LiveQueryContext } from '..';
import type { Transaction } from '../../classes/transaction';
import { getEffectiveKeys } from '../../dbcore/get-effective-keys';
import { deepClone, delArrayItem, setByKeyPath } from '../../functions/utils';
import DexiePromise, { PSD } from '../../helpers/promise';
Expand All @@ -8,6 +9,7 @@ import {
DBCoreQueryResponse
} from '../../public/types/dbcore';
import { Middleware } from '../../public/types/middleware';
import { obsSetsOverlap } from '../obs-sets-overlap';
import { adjustOptimisticFromFailures } from './adjust-optimistic-request-from-failures';
import { applyOptimisticOps } from './apply-optimistic-ops';
import { cache } from './cache';
Expand All @@ -32,6 +34,7 @@ export const cacheMiddleware: Middleware<DBCore> = {
options
) as IDBTransaction & {
mutatedParts?: ObservabilitySet;
_explicit?: boolean;
};
// Maintain TblQueryCache.ops array when transactions commit or abort
if (mode === 'readwrite') {
Expand All @@ -46,13 +49,26 @@ export const cacheMiddleware: Middleware<DBCore> = {
// Go through all tables in transaction and check if they have any optimistic updates
for (const storeName of stores) {
const tblCache = cache[`idb://${dbName}/${storeName}`];
const table = core.table(storeName);
if (tblCache) {
const table = core.table(storeName);
// Pick optimistic ops that are part of this transaction
const ops = tblCache.optimisticOps.filter(
(op) => op.trans === idbtrans
);
if (ops.length > 0) {
// Transaction was marked as _explicit in enterTransactionScope(), transaction-helpers.ts.
if (idbtrans._explicit && wasCommitted && idbtrans.mutatedParts) {
// Invalidate all queries that overlap with the mutated parts and signal their subscribers
for (const entries of Object.values(
tblCache.queries.query
)) {
for (const entry of entries.slice()) {
if (obsSetsOverlap(entry.obsSet, idbtrans.mutatedParts)) {
delArrayItem(entries, entry); // Remove the entry from the cache so it can be refreshed
entry.subscribers.forEach((requery) => affectedSubscribers.add(requery));
}
}
}
} else if (ops.length > 0) {
// Remove them from the optimisticOps array
tblCache.optimisticOps = tblCache.optimisticOps.filter(
(op) => op.trans !== idbtrans
Expand Down Expand Up @@ -129,9 +145,11 @@ export const cacheMiddleware: Middleware<DBCore> = {
const tableMW = {
...downTable,
mutate(req: DBCoreMutateRequest): Promise<DBCoreMutateResponse> {
const trans = PSD.trans as Transaction;
if (
primKey.outbound || // Non-inbound tables are harded to apply optimistic updates on because we can't know primary key of results
PSD.trans.db._options.cache === 'disabled' // User has opted-out from caching
trans.db._options.cache === 'disabled' || // User has opted-out from caching
trans.explicit // It's an explicit write transaction being made. Don't affect cache until transaction commits.
) {
// Just forward the request to the core.
return downTable.mutate(req);
Expand All @@ -149,12 +167,12 @@ export const cacheMiddleware: Middleware<DBCore> = {
const reqWithResolvedKeys = {
...req,
values: req.values.map((value, i) => {
const valueWithKey = primKey.keyPath.includes('.')
const valueWithKey = primKey.keyPath?.includes('.')
? deepClone(value)
: {
...value,
};
setByKeyPath(valueWithKey, primKey.keyPath, res.results[i]);
setByKeyPath(valueWithKey, primKey.keyPath, res.results![i]);
return valueWithKey;
})
};
Expand All @@ -163,13 +181,13 @@ export const cacheMiddleware: Middleware<DBCore> = {
// Signal subscribers after the observability middleware has complemented req.mutatedParts with the new keys.
// We must queue the task so that we get the req.mutatedParts updated by observability middleware first.
// If we refactor the dependency between observability middleware and this middleware we might not need to queue the task.
queueMicrotask(()=>signalSubscribersLazily(req.mutatedParts)); // Reason for double laziness: in user awaits put and then does another put, signal once.
queueMicrotask(()=>req.mutatedParts && signalSubscribersLazily(req.mutatedParts)); // Reason for double laziness: in user awaits put and then does another put, signal once.
});
} else {
// Enque the operation immediately
tblCache.optimisticOps.push(req);
// Signal subscribers that there are mutated parts
signalSubscribersLazily(req.mutatedParts);
req.mutatedParts && signalSubscribersLazily(req.mutatedParts);
promise.then((res) => {
if (res.numFailures > 0) {
// In case the operation failed, we need to remove it from the optimisticOps array.
Expand All @@ -178,26 +196,26 @@ export const cacheMiddleware: Middleware<DBCore> = {
if (adjustedReq) {
tblCache.optimisticOps.push(adjustedReq);
}
signalSubscribersLazily(req.mutatedParts); // Signal the rolling back of the operation.
req.mutatedParts && signalSubscribersLazily(req.mutatedParts); // Signal the rolling back of the operation.
}
});
promise.catch(()=> {
// In case the operation failed, we need to remove it from the optimisticOps array.
delArrayItem(tblCache.optimisticOps, req);
signalSubscribersLazily(req.mutatedParts); // Signal the rolling back of the operation.
req.mutatedParts && signalSubscribersLazily(req.mutatedParts); // Signal the rolling back of the operation.
});
}
return promise;
},
query(req: DBCoreQueryRequest): Promise<DBCoreQueryResponse> {
if (!isCachableContext(PSD, downTable) || !isCachableRequest("query", req)) return downTable.query(req);
const freezeResults =
(PSD as LiveQueryContext).trans.db._options.cache === 'immutable';
(PSD as LiveQueryContext).trans?.db._options.cache === 'immutable';
const { requery, signal } = PSD as LiveQueryContext;
let [cacheEntry, exactMatch, tblCache, container] =
findCompatibleQuery(dbName, tableName, 'query', req);
if (cacheEntry && exactMatch) {
cacheEntry.obsSet = req.obsSet; // So that optimistic result is monitored.
cacheEntry.obsSet = req.obsSet!; // So that optimistic result is monitored.
// How? - because observability-middleware will track result where optimistic
// mutations are applied and record it in the cacheEntry.
// TODO: CHANGE THIS! The difference is resultKeys only.
Expand All @@ -216,7 +234,7 @@ export const cacheMiddleware: Middleware<DBCore> = {
const promise = downTable.query(req).then((res) => {
// Freeze or clone results
const result = res.result;
cacheEntry.res = result;
if (cacheEntry) cacheEntry.res = result;
if (freezeResults) {
// For performance reasons don't deep freeze.
// Only freeze the top-level array and its items.
Expand All @@ -242,7 +260,7 @@ export const cacheMiddleware: Middleware<DBCore> = {
return Promise.reject(error);
});
cacheEntry = {
obsSet: req.obsSet,
obsSet: req.obsSet!,
promise,
subscribers: new Set(),
type: 'query',
Expand All @@ -267,15 +285,15 @@ export const cacheMiddleware: Middleware<DBCore> = {
tblCache.queries.query[req.query.index.name || ''] = container;
}
}
subscribeToCacheEntry(cacheEntry, container, requery, signal);
subscribeToCacheEntry(cacheEntry, container!, requery, signal);
return cacheEntry.promise.then((res: DBCoreQueryResponse) => {
return {
result: applyOptimisticOps(
res.result,
req,
tblCache?.optimisticOps,
downTable,
cacheEntry,
cacheEntry!,
freezeResults
) as any[], // readonly any[]
};
Expand Down
17 changes: 8 additions & 9 deletions src/live-query/cache/signalSubscribers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import { cache } from './cache';
let unsignaledParts: ObservabilitySet = {};
let isTaskEnqueued = false;

export function signalSubscribersLazily(part: ObservabilitySet) {
export function signalSubscribersLazily(part: ObservabilitySet, optimistic = false) {
extendObservabilitySet(unsignaledParts, part);
if (!isTaskEnqueued) {
isTaskEnqueued = true;
setTimeout(() => {
isTaskEnqueued = false;
const parts = unsignaledParts;
unsignaledParts = {};
signalSubscribersNow(parts);
signalSubscribersNow(parts, false);
}, 0);
}
}
Expand All @@ -28,7 +28,7 @@ export function signalSubscribersNow(
if (updatedParts.all) {
// Signal all subscribers to requery.
for (const tblCache of Object.values(cache)) {
signalTableSubscribersNow(
collectTableSubscribers(
tblCache,
updatedParts,
queriesToSignal,
Expand All @@ -42,7 +42,7 @@ export function signalSubscribersNow(
const [, dbName, tableName] = parts;
const tblCache = cache[`idb://${dbName}/${tableName}`];
if (tblCache)
signalTableSubscribersNow(
collectTableSubscribers(
tblCache,
updatedParts,
queriesToSignal,
Expand All @@ -55,18 +55,17 @@ export function signalSubscribersNow(
queriesToSignal.forEach((requery) => requery());
}

function signalTableSubscribersNow(
function collectTableSubscribers(
tblCache: TblQueryCache,
updatedParts: ObservabilitySet,
outQueriesToSignal: Set<() => void>,
deleteAffectedCacheEntries: boolean
) {
const updatedEntryLists: [string, CacheEntry[]][] =
deleteAffectedCacheEntries && [];
const updatedEntryLists: [string, CacheEntry[]][] = [];
for (const [indexName, entries] of Object.entries(tblCache.queries.query)) {
const filteredEntries: CacheEntry[] = deleteAffectedCacheEntries && [];
const filteredEntries: CacheEntry[] = [];
for (const entry of entries) {
if (entry.obsSet && obsSetsOverlap(updatedParts, entry.obsSet)) {
if (obsSetsOverlap(updatedParts, entry.obsSet)) {
// This query is affected by the mutation. Remove it from cache
// and signal all subscribers to requery.
entry.subscribers.forEach((requery) => outQueriesToSignal.add(requery));
Expand Down
2 changes: 1 addition & 1 deletion src/live-query/observability-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export const observabilityMiddleware: Middleware<DBCore> = {
trans.mutatedParts || {},
mutatedParts
);
return res;
return res;
});
},
};
Expand Down
1 change: 0 additions & 1 deletion src/public/types/cache.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export interface TblQueryCache {
objs: Map<string | number, object>;
optimisticOps: DBCoreMutateRequest[];
unsignaledParts: ObservabilitySet;
signalTimer?: number;
}

interface CacheEntryCommon {
Expand Down
33 changes: 33 additions & 0 deletions test/tests-live-query.js
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,36 @@ promisedTest("RxJS compability", async ()=>{
deepEqual(res2, [1, 2, 3, 4], "We should have get an updated mapped result");
s.unsubscribe();
});

promisedTest("Isolation: Explicit rw transactions do not affect live queries before committed", async ()=> {
let log = [];
let signal = new Signal();
let subscription = liveQuery(()=>db.items.toArray()).subscribe(result => {
log.push({type: "emit", result});
signal.resolve(result);
});
let result = await signal.promise;
deepEqual(result, [{id:1},{id:2},{id:3}], "First callback should give initally populated content");
deepEqual(log, [{type: "emit", result: [{id:1},{id:2},{id:3}]}], "First callback should give initally populated content");
signal = new Signal();
await db.transaction('rw', db.items, async ()=>{
await db.items.add({id: 4});
await db.items.update(4, {name: "A"});
await db.items.toArray(); // Make some additional work in the transaction
await db.items.count(); // Make some additional work in the transaction
equal(log.length, 1, "No new emit should have been made yet");
});
await signal.promise;
deepEqual(log, [
{type: "emit", result: [{id:1},{id:2},{id:3}]},
{type: "emit", result: [{id:1},{id:2},{id:3},{id:4, name: "A"}]}
], "The committed transaction should now have been made");
//signal = new Signal();
await db.transaction('rw', db.items, async (tx)=>{
await db.items.add({id: 5});
equal(log.length, 2, "No new emit should have been made");
tx.abort(); // Aborting the transaction should make no new emit
}).catch(()=>{});
equal(log.length, 2, "No new emit should have been made");
subscription.unsubscribe();
});
3 changes: 3 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"extends": "./src/tsconfig.json",
}

0 comments on commit 1eacbc8

Please sign in to comment.