Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(angular-query): added observable support #6726

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/angular-query-experimental/package.json
Expand Up @@ -54,12 +54,14 @@
"@angular/platform-browser": "^17.0.8",
"@angular/platform-browser-dynamic": "^17.0.8",
"ng-packagr": "^17.0.3",
"rxjs": "^7.8.1",
"typescript": "5.2.2",
"zone.js": "^0.14.2"
},
"peerDependencies": {
"@angular/core": "^17",
"@angular/common": "^17"
"@angular/common": "^17",
"rxjs": "^7"
},
"module": "build/fesm2022/tanstack-angular-query-experimental.mjs",
"types": "build/index.d.ts",
Expand Down
@@ -1,8 +1,16 @@
import { describe, expectTypeOf } from 'vitest'
import { interval, map, take } from 'rxjs'
import { injectQuery } from '../inject-query'
import { simpleFetcher } from './test-utils'
import type { Signal } from '@angular/core'

function simpleObservable() {
return interval(1000).pipe(
map((_, i) => `Some data ${i}`),
take(5),
)
}

describe('Discriminated union return type', () => {
test('data should be possibly undefined by default', () => {
const query = injectQuery(() => ({
Expand Down Expand Up @@ -56,4 +64,24 @@ describe('Discriminated union return type', () => {
expectTypeOf(query.error).toEqualTypeOf<Signal<Error>>()
}
})

test('data should be infered from a passed in observable', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

expectTypeOf(query.data).toEqualTypeOf<Signal<string | undefined>>()
})

test('data should still be defined when query is successful', () => {
const query = injectQuery(() => ({
queryKey: ['key'],
query$: () => simpleObservable(),
}))

if (query.isSuccess()) {
expectTypeOf(query.data).toEqualTypeOf<Signal<string>>()
}
})
})
Expand Up @@ -2,6 +2,7 @@ import { computed, signal } from '@angular/core'
import { TestBed, fakeAsync, flush, tick } from '@angular/core/testing'
import { QueryClient } from '@tanstack/query-core'
import { expect, vi } from 'vitest'
import { Subject } from 'rxjs'
import { injectQuery } from '../inject-query'
import { provideAngularQuery } from '../providers'
import {
Expand Down Expand Up @@ -215,4 +216,51 @@ describe('injectQuery', () => {

expect(query.status()).toBe('error')
}))

test('should allow an observable to be passed as queryFn', fakeAsync(() => {
const subject$ = new Subject<string>()
const query = TestBed.runInInjectionContext(() => {
return injectQuery(() => ({
queryKey: ['key14'],
query$: () => subject$.asObservable(),
}))
})

expect(query.status()).toBe('pending')

flush()

subject$.next('Some data')

flush()
expect(query.status()).toBe('success')
expect(query.data()).toBe('Some data')
}))

test('should allow to error when the stream is emitting after the first time it emits', fakeAsync(() => {
const subject$ = new Subject<string>()
const query = TestBed.runInInjectionContext(() => {
return injectQuery(() => ({
queryKey: ['key14'],
query$: () => subject$.asObservable(),
}))
})

expect(query.status()).toBe('pending')

flush()

subject$.next('Some data')

flush()
expect(query.status()).toBe('success')
expect(query.data()).toBe('Some data')

subject$.error(new Error('Some error'))

flush()

expect(query.status()).toBe('error')
expect(query.error()).toMatchObject({ message: 'Some error' })
}))
})
106 changes: 101 additions & 5 deletions packages/angular-query-experimental/src/create-base-query.ts
Expand Up @@ -7,9 +7,28 @@ import {
signal,
} from '@angular/core'
import { notifyManager } from '@tanstack/query-core'
import {
EMPTY,
Subject,
catchError,
fromEvent,
lastValueFrom,
shareReplay,
skip,
switchMap,
take,
takeUntil,
} from 'rxjs'
import { takeUntilDestroyed } from '@angular/core/rxjs-interop'
import { signalProxy } from './signal-proxy'
import type { QueryClient, QueryKey, QueryObserver } from '@tanstack/query-core'
import type {
QueryClient,
QueryFunctionContext,
QueryKey,
QueryObserver,
} from '@tanstack/query-core'
import type { CreateBaseQueryOptions, CreateBaseQueryResult } from './types'
import type { Subscription } from 'rxjs'

/**
* Base implementation for `injectQuery` and `injectInfiniteQuery`.
Expand All @@ -20,6 +39,7 @@ export function createBaseQuery<
TData,
TQueryData,
TQueryKey extends QueryKey,
TPageParam = never,
>(
options: (
client: QueryClient,
Expand All @@ -28,24 +48,100 @@ export function createBaseQuery<
TError,
TData,
TQueryData,
TQueryKey
TQueryKey,
TPageParam
>,
Observer: typeof QueryObserver,
queryClient: QueryClient,
): CreateBaseQueryResult<TData, TError> {
assertInInjectionContext(createBaseQuery)
const destroyRef = inject(DestroyRef)

/**
* Subscription to the query$ observable.
*/
let subscription: Subscription | undefined

/**
* Signal that has the default options from query client applied
* computed() is used so signals can be inserted into the options
* making it reactive. Wrapping options in a function ensures embedded expressions
* are preserved and can keep being applied after signal changes
*/
const defaultedOptionsSignal = computed(() => {
const defaultedOptions = queryClient.defaultQueryOptions(
options(queryClient),
)
const { query$, ...opts } = options(queryClient)

// If there is a subscription, unsubscribe from it this is to prevent
// multiple subscriptions on the same computed type
if (subscription) subscription.unsubscribe()

/**
* Subscribe to the query$ observable and set the query data
* when the observable emits a value. This creates a promise
* on the queryFn and on each new emit it will update the client
* side.
*/
if (query$) {
const trigger$ = new Subject<
QueryFunctionContext<TQueryKey, TPageParam>
>()

const obs$ = trigger$.pipe(
switchMap((context) =>
query$(context).pipe(
takeUntil(
// If the signal is aborted, abort the observable
fromEvent(context.signal, 'abort'),
),
),
),
shareReplay(1),
takeUntilDestroyed(destroyRef),
)

subscription = obs$
.pipe(
skip(1),
catchError((error: Error) => {
const query = queryClient
.getQueryCache()
.find({ queryKey: opts.queryKey })
if (query) {
const { state } = query
// Mimic the dispatch code on the error case found in the query-core package
query.setState({
...state,
error,
errorUpdateCount: state.errorUpdateCount + 1,
errorUpdatedAt: Date.now(),
fetchFailureCount: state.fetchFailureCount + 1,
fetchFailureReason: error,
fetchStatus: 'idle',
status: 'error',
})
}
return EMPTY
}),
)
.subscribe({
next: (value) =>
queryClient.setQueryData<TQueryFnData>(opts.queryKey, value),
})

const queryFn = (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => {
// Trigger the observable with the new context.
const promise = lastValueFrom(obs$.pipe(take(1)))
trigger$.next(context)
return promise
}

opts.queryFn = queryFn
}

const defaultedOptions = queryClient.defaultQueryOptions(opts)

defaultedOptions._optimisticResults = 'optimistic'
return defaultedOptions
})
Expand Down
18 changes: 16 additions & 2 deletions packages/angular-query-experimental/src/types.ts
@@ -1,4 +1,5 @@
import type { Signal } from '@angular/core'
import type { Observable } from 'rxjs'

import type {
DefaultError,
Expand All @@ -8,6 +9,7 @@ import type {
MutateFunction,
MutationObserverOptions,
MutationObserverResult,
QueryFunctionContext,
QueryKey,
QueryObserverOptions,
QueryObserverResult,
Expand All @@ -21,10 +23,22 @@ export interface CreateBaseQueryOptions<
TData = TQueryFnData,
TQueryData = TQueryFnData,
TQueryKey extends QueryKey = QueryKey,
TPageParam = never,
> extends WithRequired<
QueryObserverOptions<TQueryFnData, TError, TData, TQueryData, TQueryKey>,
QueryObserverOptions<
TQueryFnData,
TError,
TData,
TQueryData,
TQueryKey,
TPageParam
>,
'queryKey'
> {}
> {
query$?: (
context: QueryFunctionContext<TQueryKey, TPageParam>,
) => Observable<TQueryFnData>
}

type CreateStatusBasedQueryResult<
TStatus extends QueryObserverResult['status'],
Expand Down