Skip to content

Commit

Permalink
feat(angular-query): added support for observables on a query$ property
Browse files Browse the repository at this point in the history
  • Loading branch information
rgolea committed Jan 17, 2024
1 parent c9cd303 commit 53df354
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 25 deletions.
4 changes: 3 additions & 1 deletion packages/angular-query-experimental/package.json
Expand Up @@ -54,11 +54,13 @@
"@angular/platform-browser": "^17.0.8",
"@angular/platform-browser-dynamic": "^17.0.8",
"ng-packagr": "^17.0.3",
"rxjs": "^7.8.1",
"typescript": "5.2.2",
"zone.js": "^0.14.2"
},
"peerDependencies": {
"@angular/core": "^17"
"@angular/core": "^17",
"rxjs": "^7"
},
"module": "build/fesm2022/tanstack-angular-query-experimental.mjs",
"types": "build/index.d.ts",
Expand Down
@@ -1,6 +1,6 @@
import { describe, expectTypeOf } from 'vitest'
import { injectQuery } from '../inject-query'
import { simpleFetcher } from './test-utils'
import { simpleFetcher, simpleObservable } from './test-utils'
import type { Signal } from '@angular/core'

describe('Discriminated union return type', () => {
Expand Down Expand Up @@ -56,4 +56,24 @@ describe('Discriminated union return type', () => {
expectTypeOf(query.error).toEqualTypeOf<Signal<Error>>()
}
})

test('data should be infered from a passed in observable', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

expectTypeOf(query.data).toEqualTypeOf<Signal<string | undefined>>()
})

test('data should still be defined when query is successful', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

if (query.isSuccess()) {
expectTypeOf(query.data).toEqualTypeOf<Signal<string>>()
}
})
})
Expand Up @@ -2,6 +2,7 @@ import { computed, signal } from '@angular/core'
import { TestBed, fakeAsync, flush, tick } from '@angular/core/testing'
import { QueryClient } from '@tanstack/query-core'
import { expect, vi } from 'vitest'
import { Subject } from 'rxjs'
import { injectQuery } from '../inject-query'
import { provideAngularQuery } from '../providers'
import {
Expand Down Expand Up @@ -215,4 +216,24 @@ describe('injectQuery', () => {

expect(query.status()).toBe('error')
}))

test('should allow an observable to be passed as queryFn', fakeAsync(() => {
const subject$ = new Subject<string>()
const query = TestBed.runInInjectionContext(() => {
return injectQuery(() => ({
queryKey: ['key14'],
query$: () => subject$.asObservable(),
}))
})

expect(query.status()).toBe('pending')

flush()

subject$.next('Some data')

flush()
expect(query.status()).toBe('success')
expect(query.data()).toBe('Some data')
}))
})
@@ -1,4 +1,5 @@
import { isSignal, untracked } from '@angular/core'
import { interval, map, take } from 'rxjs'

export function simpleFetcher(): Promise<string> {
return new Promise((resolve) => {
Expand All @@ -8,6 +9,13 @@ export function simpleFetcher(): Promise<string> {
})
}

export function simpleObservable() {
return interval(1000).pipe(
map((_, i) => `Some data ${i}`),
take(5),
)
}

export function delayedFetcher(timeout = 0): () => Promise<string> {
return () =>
new Promise((resolve) => {
Expand Down
80 changes: 75 additions & 5 deletions packages/angular-query-experimental/src/create-base-query.ts
Expand Up @@ -7,9 +7,26 @@ import {
signal,
} from '@angular/core'
import { notifyManager } from '@tanstack/query-core'
import {
Subject,
fromEvent,
lastValueFrom,
shareReplay,
skip,
switchMap,
take,
takeUntil,
} from 'rxjs'
import { takeUntilDestroyed } from '@angular/core/rxjs-interop'
import { signalProxy } from './signal-proxy'
import type { QueryClient, QueryKey, QueryObserver } from '@tanstack/query-core'
import type {
QueryClient,
QueryFunctionContext,
QueryKey,
QueryObserver,
} from '@tanstack/query-core'
import type { CreateBaseQueryOptions, CreateBaseQueryResult } from './types'
import type { Subscription } from 'rxjs'

/**
* Base implementation for `injectQuery` and `injectInfiniteQuery`.
Expand All @@ -20,6 +37,7 @@ export function createBaseQuery<
TData,
TQueryData,
TQueryKey extends QueryKey,
TPageParam = never,
>(
options: (
client: QueryClient,
Expand All @@ -28,24 +46,76 @@ export function createBaseQuery<
TError,
TData,
TQueryData,
TQueryKey
TQueryKey,
TPageParam
>,
Observer: typeof QueryObserver,
queryClient: QueryClient,
): CreateBaseQueryResult<TData, TError> {
assertInInjectionContext(createBaseQuery)
const destroyRef = inject(DestroyRef)

/**
* Subscription to the query$ observable.
*/
let subscription: Subscription | undefined

/**
* Signal that has the default options from query client applied
* computed() is used so signals can be inserted into the options
* making it reactive. Wrapping options in a function ensures embedded expressions
* are preserved and can keep being applied after signal changes
*/
const defaultedOptionsSignal = computed(() => {
const defaultedOptions = queryClient.defaultQueryOptions(
options(queryClient),
)
const { query$, ...opts } = options(queryClient)

// If there is a subscription, unsubscribe from it this is to prevent
// multiple subscriptions on the same computed type
if (subscription) subscription.unsubscribe()

/**
* Subscribe to the query$ observable and set the query data
* when the observable emits a value. This creates a promise
* on the queryFn and on each new emit it will update the client
* side.
*/
if (query$) {
const trigger$ = new Subject<
QueryFunctionContext<TQueryKey, TPageParam>
>()

const obs$ = trigger$.pipe(
switchMap((context) =>
query$(context).pipe(
takeUntil(
// If the signal is aborted, abort the observable
fromEvent(context.signal, 'abort'),
),
),
),
shareReplay(1),
takeUntilDestroyed(destroyRef),
)

subscription = obs$.pipe(skip(1)).subscribe({
next: (value) =>
queryClient.setQueryData<TQueryFnData>(opts.queryKey, value),
})

const queryFn = (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => {
// Trigger the observable with the new context.
const promise = lastValueFrom(obs$.pipe(take(1)))
trigger$.next(context)
return promise
}

opts.queryFn = queryFn
}

const defaultedOptions = queryClient.defaultQueryOptions(opts)

defaultedOptions._optimisticResults = 'optimistic'
return defaultedOptions
})
Expand Down
18 changes: 16 additions & 2 deletions packages/angular-query-experimental/src/types.ts
@@ -1,4 +1,5 @@
import type { Signal } from '@angular/core'
import type { Observable } from 'rxjs'

import type {
DefaultError,
Expand All @@ -8,6 +9,7 @@ import type {
MutateFunction,
MutationObserverOptions,
MutationObserverResult,
QueryFunctionContext,
QueryKey,
QueryObserverOptions,
QueryObserverResult,
Expand All @@ -21,10 +23,22 @@ export interface CreateBaseQueryOptions<
TData = TQueryFnData,
TQueryData = TQueryFnData,
TQueryKey extends QueryKey = QueryKey,
TPageParam = never,
> extends WithRequired<
QueryObserverOptions<TQueryFnData, TError, TData, TQueryData, TQueryKey>,
QueryObserverOptions<
TQueryFnData,
TError,
TData,
TQueryData,
TQueryKey,
TPageParam
>,
'queryKey'
> {}
> {
query$?: (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => Observable<TQueryFnData>
}

type CreateStatusBasedQueryResult<
TStatus extends QueryObserverResult['status'],
Expand Down

0 comments on commit 53df354

Please sign in to comment.