Skip to content

Commit

Permalink
feat(angular-query): added support for observables on a query$ property
Browse files Browse the repository at this point in the history
chore(angular-query): fix knip issues

feat(angular-query): throw errors when the observable breaks
  • Loading branch information
rgolea committed Jan 17, 2024
1 parent 96cdb28 commit 26de580
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 24 deletions.
4 changes: 3 additions & 1 deletion packages/angular-query-experimental/package.json
Expand Up @@ -54,12 +54,14 @@
"@angular/platform-browser": "^17.0.8",
"@angular/platform-browser-dynamic": "^17.0.8",
"ng-packagr": "^17.0.3",
"rxjs": "^7.8.1",
"typescript": "5.2.2",
"zone.js": "^0.14.2"
},
"peerDependencies": {
"@angular/core": "^17",
"@angular/common": "^17"
"@angular/common": "^17",
"rxjs": "^7"
},
"module": "build/fesm2022/tanstack-angular-query-experimental.mjs",
"types": "build/index.d.ts",
Expand Down
@@ -1,8 +1,16 @@
import { describe, expectTypeOf } from 'vitest'
import { interval, map, take } from 'rxjs'
import { injectQuery } from '../inject-query'
import { simpleFetcher } from './test-utils'
import type { Signal } from '@angular/core'

function simpleObservable() {
return interval(1000).pipe(
map((_, i) => `Some data ${i}`),
take(5),
)
}

describe('Discriminated union return type', () => {
test('data should be possibly undefined by default', () => {
const query = injectQuery(() => ({
Expand Down Expand Up @@ -56,4 +64,24 @@ describe('Discriminated union return type', () => {
expectTypeOf(query.error).toEqualTypeOf<Signal<Error>>()
}
})

test('data should be infered from a passed in observable', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

expectTypeOf(query.data).toEqualTypeOf<Signal<string | undefined>>()
})

test('data should still be defined when query is successful', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

if (query.isSuccess()) {
expectTypeOf(query.data).toEqualTypeOf<Signal<string>>()
}
})
})
Expand Up @@ -2,6 +2,7 @@ import { computed, signal } from '@angular/core'
import { TestBed, fakeAsync, flush, tick } from '@angular/core/testing'
import { QueryClient } from '@tanstack/query-core'
import { expect, vi } from 'vitest'
import { Subject } from 'rxjs'
import { injectQuery } from '../inject-query'
import { provideAngularQuery } from '../providers'
import {
Expand Down Expand Up @@ -215,4 +216,51 @@ describe('injectQuery', () => {

expect(query.status()).toBe('error')
}))

test('should allow an observable to be passed as queryFn', fakeAsync(() => {
const subject$ = new Subject<string>()
const query = TestBed.runInInjectionContext(() => {
return injectQuery(() => ({
queryKey: ['key14'],
query$: () => subject$.asObservable(),
}))
})

expect(query.status()).toBe('pending')

flush()

subject$.next('Some data')

flush()
expect(query.status()).toBe('success')
expect(query.data()).toBe('Some data')
}))

test('should allow to error when the stream is emitting after the first time it emits', fakeAsync(() => {
const subject$ = new Subject<string>()
const query = TestBed.runInInjectionContext(() => {
return injectQuery(() => ({
queryKey: ['key14'],
query$: () => subject$.asObservable(),
}))
})

expect(query.status()).toBe('pending')

flush()

subject$.next('Some data')

flush()
expect(query.status()).toBe('success')
expect(query.data()).toBe('Some data')

subject$.error(new Error('Some error'))

flush()

expect(query.status()).toBe('error')
expect(query.error()).toMatchObject({ message: 'Some error' })
}))
})
106 changes: 101 additions & 5 deletions packages/angular-query-experimental/src/create-base-query.ts
Expand Up @@ -7,9 +7,28 @@ import {
signal,
} from '@angular/core'
import { notifyManager } from '@tanstack/query-core'
import {
EMPTY,
Subject,
catchError,
fromEvent,
lastValueFrom,
shareReplay,
skip,
switchMap,
take,
takeUntil,
} from 'rxjs'
import { takeUntilDestroyed } from '@angular/core/rxjs-interop'
import { signalProxy } from './signal-proxy'
import type { QueryClient, QueryKey, QueryObserver } from '@tanstack/query-core'
import type {
QueryClient,
QueryFunctionContext,
QueryKey,
QueryObserver,
} from '@tanstack/query-core'
import type { CreateBaseQueryOptions, CreateBaseQueryResult } from './types'
import type { Subscription } from 'rxjs'

/**
* Base implementation for `injectQuery` and `injectInfiniteQuery`.
Expand All @@ -20,6 +39,7 @@ export function createBaseQuery<
TData,
TQueryData,
TQueryKey extends QueryKey,
TPageParam = never,
>(
options: (
client: QueryClient,
Expand All @@ -28,24 +48,100 @@ export function createBaseQuery<
TError,
TData,
TQueryData,
TQueryKey
TQueryKey,
TPageParam
>,
Observer: typeof QueryObserver,
queryClient: QueryClient,
): CreateBaseQueryResult<TData, TError> {
assertInInjectionContext(createBaseQuery)
const destroyRef = inject(DestroyRef)

/**
* Subscription to the query$ observable.
*/
let subscription: Subscription | undefined

/**
* Signal that has the default options from query client applied
* computed() is used so signals can be inserted into the options
* making it reactive. Wrapping options in a function ensures embedded expressions
* are preserved and can keep being applied after signal changes
*/
const defaultedOptionsSignal = computed(() => {
const defaultedOptions = queryClient.defaultQueryOptions(
options(queryClient),
)
const { query$, ...opts } = options(queryClient)

// If there is a subscription, unsubscribe from it this is to prevent
// multiple subscriptions on the same computed type
if (subscription) subscription.unsubscribe()

/**
* Subscribe to the query$ observable and set the query data
* when the observable emits a value. This creates a promise
* on the queryFn and on each new emit it will update the client
* side.
*/
if (query$) {
const trigger$ = new Subject<
QueryFunctionContext<TQueryKey, TPageParam>
>()

const obs$ = trigger$.pipe(
switchMap((context) =>
query$(context).pipe(
takeUntil(
// If the signal is aborted, abort the observable
fromEvent(context.signal, 'abort'),
),
),
),
shareReplay(1),
takeUntilDestroyed(destroyRef),
)

subscription = obs$
.pipe(
skip(1),
catchError((error: Error) => {
const query = queryClient
.getQueryCache()
.find({ queryKey: opts.queryKey })
if (query) {
const { state } = query
// Mimic the dispatch code on the error case found in the query-core package
query.setState({
...state,
error,
errorUpdateCount: state.errorUpdateCount + 1,
errorUpdatedAt: Date.now(),
fetchFailureCount: state.fetchFailureCount + 1,
fetchFailureReason: error,
fetchStatus: 'idle',
status: 'error',
})
}
return EMPTY
}),
)
.subscribe({
next: (value) =>
queryClient.setQueryData<TQueryFnData>(opts.queryKey, value),
})

const queryFn = (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => {
// Trigger the observable with the new context.
const promise = lastValueFrom(obs$.pipe(take(1)))
trigger$.next(context)
return promise
}

opts.queryFn = queryFn
}

const defaultedOptions = queryClient.defaultQueryOptions(opts)

defaultedOptions._optimisticResults = 'optimistic'
return defaultedOptions
})
Expand Down
18 changes: 16 additions & 2 deletions packages/angular-query-experimental/src/types.ts
@@ -1,4 +1,5 @@
import type { Signal } from '@angular/core'
import type { Observable } from 'rxjs'

import type {
DefaultError,
Expand All @@ -8,6 +9,7 @@ import type {
MutateFunction,
MutationObserverOptions,
MutationObserverResult,
QueryFunctionContext,
QueryKey,
QueryObserverOptions,
QueryObserverResult,
Expand All @@ -21,10 +23,22 @@ export interface CreateBaseQueryOptions<
TData = TQueryFnData,
TQueryData = TQueryFnData,
TQueryKey extends QueryKey = QueryKey,
TPageParam = never,
> extends WithRequired<
QueryObserverOptions<TQueryFnData, TError, TData, TQueryData, TQueryKey>,
QueryObserverOptions<
TQueryFnData,
TError,
TData,
TQueryData,
TQueryKey,
TPageParam
>,
'queryKey'
> {}
> {
query$?: (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => Observable<TQueryFnData>
}

type CreateStatusBasedQueryResult<
TStatus extends QueryObserverResult['status'],
Expand Down

0 comments on commit 26de580

Please sign in to comment.