Replies: 13 comments 56 replies
-
Thank you for this RFC, I see that your team made practical research, not only theoretical :)
Runtime errors are the worst. It is better to prevent an error at the compilation stage (one of the reasons why I love Rust so much). Please consider creating different functions (or overload For example: // initialValue is not optional
function fromAsyncObservable<T = unknown>(obs: Observable<T>, initialValue: T): Signal<T>;
// throw an exception on creation if an observable is not actually async (would be great if we could catch it at the "build" stage)
function fromSyncObservable<T = unknown>(obs: Observable<T>): Signal<T>;
// if we have no information, we should assume that the given observable can be async
function fromObservable<T = unknown>(obs: Observable<T>, initialValue: T): Signal<T>;
For:
I guess it's planned to be implemented (or already implemented) using
What kind of Observable I'm thankful for having a chance to participate in this discussion! Good luck with this challenging task :) |
Beta Was this translation helpful? Give feedback.
-
It has to be run in an injection context (or given an injector) because the inner |
Beta Was this translation helpful? Give feedback.
-
4a - I think you should use function arity and detect if fromObservable is called with a default value - so undefined would not be a default value unless explicitly called. Alternatively you could use a fluent interface to allow a default option to be specified. I feel like throwing errors by default only leads to pain and I know our team will always be using anything that avoids errors being thrown. If we had to pick between 2 and 3, we'd pick three. |
Beta Was this translation helpful? Give feedback.
-
First off, I found some potentially buggy code in your And I have a couple of suggestions:
In either case, I think if (typeof Symbol.observable === 'symbol') {
signal[Symbol.observable] = () => ({
subscribe(observer) {
let active = true
const watcher = effect(() => {
let value;
try {
// IMPORTANT: When dealing with any reactive function call you don't control
// like `observer.next()` (or anything similar), you want to catch the error
// in the producer call, in this case `signal()`. You don't want to catch errors
// in the `observer.next` call itself. (this is a common mistake and it's not
// unique to RxJS, you'll see it in the next example as well)
value = signal()
} catch (err) {
observer.error(err);
return;
}
observer.next(value);
}, {injector, manualCleanup: true});
return {
unsubscribe() {
if (active) {
active = false
watcher.destroy()
}
}
}
}
})
} However... option 3 may be more platable, which is to implement signal[Symbol.asyncIterator] = () => {
const deferreds = []
let dirty = true
const handleError = (reason) => {
// Called during errors
while (deferreds.length) {
deferreds.shift()!.reject(reason)
}
}
const watcher = effect(() => {
let value;
try {
value = signal();
dirty = true
} catch (err) {
handleError(err)
return
}
if (deferreds.length) {
dirty = false
deferreds.shift()!.resolve({ done: false, value })
}
}, {injector, manualCleanup: true});
return {
next() {
return new Promise((resolve, reject) => {
if (dirty) {
dirty = false;
resolve({ done: false, value: signal() })
} else {
deferreds.push({ resolve, reject })
}
})
},
throw(reason) {
handleError(reason)
watcher.destroy();
},
return() {
// Called as a finalizer.
while (deferreds.length) {
deferreds.shift()!.resolve({ done: true, value: undefined })
}
watcher.destroy();
return Promise.resolve({ done: true, value: undefined })
}
}
} And then your signal can be consumed in the following ways: import { from, of, concatMap } from 'rxjs':
// These two work with Symbol.observable or Symbol.asyncIterator
const source$ = from(signal);
const source2$ = of('test').pipe(concatMap(() => signal))
// This one only works with Symbol.asyncIterator
async function test() {
for await (const value of signal) {
console.log(value)
if (value === 3) {
// it'll even stop watching here.
break;
}
}
} Why should you consider a better interop change than
const observableToSignal = fromObservable; // fromObservable is a bad name
import { r, concatMap, map, filter } from 'rxjs';
const newSignal: Signal = r(
signal, // <-- A signal!
concatMap(fn), // <-- familiar RxJS operators!
map(fn2),
filter(fn3),
observableToSignal // <-- back to a signal!
) Final suggestion: Maybe spend more time talking to your old friend, Ben. The RxJS team hasn't heard from y'all in years. I still love you, despite leaving in 2019. :) |
Beta Was this translation helpful? Give feedback.
-
I thought we already discussed adding alternative names to A typical example is |
Beta Was this translation helpful? Give feedback.
-
From my point of view if we must have runtime error they should happen as early as possible |
Beta Was this translation helpful? Give feedback.
-
My two cents on 4a: I'd say probably throw an error as soon as possible. On read is fine. Otherwise it could get weird when people use this compositionally. Once types are corrected on RxJS 8, this will work (and it technically works now, just TypeScript will complain): const tickerSignal = timer(0, 1000).pipe(
startWith(0),
toSignal
) Alternatively, you just make the default value mandatory. Then I'd recommend providing an operator at some point (or people can just roll their own): const asSignal = (defaultValue: T) => (source: Observable<T>) => toSignal(source, defaultValue);
const signal = someObservable$.pipe(
asSignal('default value')
) |
Beta Was this translation helpful? Give feedback.
-
Is this supported? It converts input directly into observable. class MyCmp {
count$ = fromSignal(input(1))
} |
Beta Was this translation helpful? Give feedback.
-
Maybe the mental model behind the signals can implement the InteropObservable type to build the bridge between rxjs and signals, translating between both will ease! Implementing InteropObservable interface you can use the signal directly in your rxjs pipes if you need. For instance, lets say that angular changes the @input to wrap values into a signal, you can still use it with the rxjs's reactive utils if you need and the code will look cleaner:
Also, no need to worry about signals implementation depending directly on Rxjs. The implementation doesn't need even to import it! All it has to do is to provide a |
Beta Was this translation helpful? Give feedback.
-
if |
Beta Was this translation helpful? Give feedback.
-
Could someone figure out the issue, seems the issue is clear here. or did i miss anything? version: 16.0.0-rc.0
|
Beta Was this translation helpful? Give feedback.
-
Would it be possible to provide a Similar to RxJS Or is there another good way to detect Signals? |
Beta Was this translation helpful? Give feedback.
-
I could not say it better than @e-oz:
But there is one thing with signals in angular that feels not right to me. And it has also implications for the Observable and Signal Interoperability. It is the scheduling of effects:
I think it is a reasonable default, th have It would be really nice to be able to configure the scheduler for the effect in toObservable. Something like: * Options passed to the `effect` function.
*
* @developerPreview
*/
export declare interface ToObservableOptions {
/**
*
synchronousScheduler | trigger effect immediatley
queueScheduler | Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
asapScheduler | Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions.
asyncScheduler | Schedules work with setInterval. Use this for time-based operations.
animationFrameScheduler | Schedules task that will happen just before next browser content repaint. Can be used to create smooth browser animations.
*
*/
scheduler?: SCHEDULER
} This could be used like: toObservable(mySignal, { scheduler: synchronousScheduler }); |
Beta Was this translation helpful? Give feedback.
-
Sub-RFC 4: Observable and Signal Interoperability
Changelog
April 10, 2023
fromSignal
andfromObservable
renamed totoObservable
andtoSignal
, respectively.toSignal
will beSignal<T|undefined>
whereT
is the return type of theObservable
.InteropObservable
.Introduction
Many existing applications and libraries use RxJS to manage state. Observables should be able to interoperate with these applications. To solve this problem, we propose two new APIs:
toSignal
andtoObservable
, both exported from the@angular/core/rxjs-interop
package. The goal with these functions is to provide a lightweight and sensible default bridge betweenObservable
andSignal
concepts. This allows interop with existing RxJS code and optionally enables every Signal-based API in Angular to be used with RxJS, for much more complete support (for example, unlocks inputs as observables).RxJS is a powerful and feature-rich library and there will certainly be cases that are not covered in these default behaviors. Rather than trying to include every possible feature, we want to make sure what we're including covers most use-cases without bloating the API in order to support uncommon scenarios.
toSignal
You can convert an
Observable
to an AngularSignal
by using thetoSignal
function.The two different signatures support two different ways of dealing with the initial value (see the "Asynchronicity" section below).
Managing the subscription
The
toSignal
function internally subscribes to the givenObservable
and updates the returnedSignal
any time theObservable
emits a value.This subscription will be created immediately as opposed to waiting until the
Signal
is read. We do this intentionally because we do not want the signal read to have side effects (e.g. opening a network connection on subscription). If the subscription were created lazily, then the act of reading the signal for the first time would have the side-effect of creating theObservable
subscription and this may have its own side effects. This is also how theasync
pipe works.Angular will automatically unsubscribe from the
Observable
when the context in which it was created is destroyed. For example, iftoSignal
is called in a component or directive, then the subscription is cleaned up when the component or directive is destroyed. If it is instead created in a service, it will be cleaned up when that service's injector is destroyed.Asynchronicity
Observables can be used to model both synchronous and asynchronous data flow. However, they don't distinguish these two cases in their API - any
Observable
might be synchronous, or it might be asynchronous. Signals, on the other hand, are always synchronous. The signature of thetoSignal
function supports both synchronous and asynchronous Observables.Initial Values
Before the Observable emits, the signal returned by
toSignal
must have an "initial" value.If not provided explicitly, this initial value is
undefined
(similarly to how theasync
pipe returnsnull
initially).There are many cases where
undefined
is not the best choice of initial value. For these cases,toSignal
allows the initial value to be configured directly:Requiring synchronous emit
Some Observables are known to emit synchronously. In those cases, you can have
toSignal
verify that the Observable produces a value immediately, and forgo providing or dealing with an initial value.This is a trade-off: requiring a synchronous emit avoids any need for handling of
undefined
values (or manually configuring initial values). However, ifbalance$
is ever made asynchronous (such as by adding a debounce operation, for example),toSignal
will throw an error.Error and completion states
A signal is a wrapper around a value, which is capable of notifying interested consumers when that value changes. An
Observable
has three types of notifications when anObserver
subscribes to it:next
,error
, andcomplete
.A signal's value is directly linked to the values coming from the
next
notification of theObservable
.When the
Observer
created bytoSignal
is notified of anerror
, it will throw this error the next time thesignal
is read. This error can be handled the same way any other error coming from a signal would be:If handling the error where the signal is used is not desirable, the error can be transformed either on the Observable side (via
catchError
) or on the signal side (viacomputed
):Signals do not have a concept of being "complete," because they are just wrappers around a value. When an
Observable
is complete, theObserver
created bytoSignal
will simply not receive new values from the stream. We acknowledge that the completion state does have a meaning for anObservable
and sometimes you might care about this. Even with it not being surfaced directly in the returned signal, this state can be represented in a different signal, for example:There’s also a materialize operator that “represents all of the notifications from the source
Observable
asnext
emissions marked with their original types within Notification objects.” This pipe could be used to surface both thecomplete
anderror
notifications.toObservable
toObservable
is a function that takes an Angular Signal and returns anObservable
. It does this by creating aneffect
when the Observable is subscribed to, which takes values from the signal and streams them to subscribers.Lifecycle and Cleanup
Observables are designed to manage the lifecycle of the side effects they create. When a
toObservable
Observable is subscribed, it creates aneffect
to monitor the signal, which exists until that subscriber unsubscribes.This differs from
toSignal
, which automatically cleans up its subscription when the component or service which invoked it is destroyed.We believe it would be incorrect for
toObservable
to tie the resultingObservable
to the lifecycle of the enclosing component, for two reasons:switchMap
. Moreover, it could be passed into a service and persisted. Finally, observable unsubscriptions might have side-effects, and the user should control when those run, as with other Observables.If desired, it's straightforward to tie the resulting Observable to the component's lifecycle manually:
All values are asynchronous
The Observable produced by
toObservable
uses aneffect
to send the next value. However, because effects are by nature asynchronous, the Observable emissions will be asynchronous, even for the first value.Therefore, all values emitted by the
toObservable
Observable are delivered asynchronously.This has some interesting consequences. Consider setting a signal several times in a row, which is a synchronous operation:
The observable
obs
will only ever produce the value3
, because the signal is set synchronously, but the observable is scheduled to update asynchronously. In this way, a Signal is subtly different from aBehaviorSubject
. In other words, unlike Observables, signals are not streams.We do not emit the first value synchronously. To do so, we'd have to skip the first emission in the effect -- but what if the signal updates before the effect is scheduled for the first time? We'd need to check the signal's value equality, which might not be reliable (e.g. a mutated object).
If you want to get the first value synchronously, you can straightforwardly implement this behavior as so:
One Effect for All Observers (via
shareReplay
)If you expect to have many subscribers to an observable produced from a signal, you might want to use the
shareReplay
operator. This helps you avoid creating a new effect for each subscriber.This would look something like the following:
refCount
is set so that the underlying effect is cleaned up after all users unsubscribe. ThebufferSize
is one because we only care about caching the latest value.Why not do this automatically inside
toObservable
? There are a few potential reasons:shareReplay
for everyone using RxJS interop, when some applications may not want it.shareReplay
yourself if you want this behavior. It's not nearly as easy to get rid of it, if you care about the sync/async discrepancy.shareReplay
observable.InteropObservable
An idea that came up during the discussion was to make signals implement
InteropObservable
. While this can work in some situations, we ultimately decided against it for two principle reasons:This is because effects fundamentally need access to the effect scheduler for the application they're running in, so we'd need to use inject to get ahold of it.
This would be a very surprising and burdensome requirement for anyone trying to put signal reads inside e.g. switchMap, since Observable pipelines don't typically run with injection contexts. You'd have to wrap the operators in runInInjectionContext, which would be painful and boilerplate-y. An explicit conversion from signal to observable outside of the observable pipeline is actually more ergonomic.
For example, the following pattern:
may function correctly if the initial emit is synchronous, but error later on when the
switchMap
operator tries to call theInteropObservable
API ondataSignal
internally the next timesomething$
emits, because it'll no longer be in an injection context.Additionally, there is no straightforward way to fix this issue, because the injection context comes from the event source (whatever produced the next value to
something$
), not the Observable chain creation. So it's not possible to wrapswitchMap
inrunInInjectionContext
- a custom, injection-context-aware operator would have to be used instead.Overall, this adds a level of friction that we felt undermines any potential convenience of
InteropObservable
.This part of the argument is more philosophical. Just like how subscribing to an Observable has side effects and manual subscriptions should be done with care, choosing to use the signal graph to manually drive additional data flow should also be done with consideration for the impact on the application's architecture. In some cases it can be necessary, but often there are better patterns such as driving Observable chains from the original application events, rather than as side effects of signal updates.
takeUntilDestroyed
Angular users often want to complete a stream when a related subject completes. The following illustrative pattern is quite common:
We are introducing a new RxJS operator called
takeUntilDestroyed
, which simplifies this example into the following:By default, this operator will inject the current cleanup context. For example, used in a component, it will use the component's lifetime.
takeUntilDestroyed
is especially useful when you want to tie the lifecycle of yourtoObservable
Observable to a particular component's lifecycle.However, if you want to override the default behavior, you can manually provide a
DestroyRef
. This lets you calltakeUntilDestroyed
outside of a context whereinject
is available.Beta Was this translation helpful? Give feedback.
All reactions