Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow sending custom headers and status codes on a SSE controller when an async flow is required #12260

Open
1 task done
notaphplover opened this issue Aug 22, 2023 · 3 comments
Labels
needs triage This issue has not been looked into type: enhancement 🐺

Comments

@notaphplover
Copy link

notaphplover commented Aug 22, 2023

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

I've been doing a PoC involving the SSE feature.

The abstraction looks good, working with Observables feel like a nice way to representing an SSE based flow. Having said that, I really don't know how I should implement a non 200 HTTP status code in some scenarios.

After reading the spec, I would say it's perfectly fine to send non 200 status codes, the spec even states that redirections are allowed:

Event stream requests can be redirected using HTTP 301 and 307 redirects as with normal HTTP requests. Clients will reconnect if the connection is closed; a client can be told to stop reconnecting using the HTTP 204 No Content response code.

Having a look to the original issue that let to the SSE feature (#4826), it seems this can be done accesing the response:

@Sse("verify-payment")
	verifyPayment(@Res() response: { raw: ServerResponse }, @Query("tx") tx?: string): Observable<{ data: { status: DTOBookingStage | "close", msg?: string } }> {
	response.raw.setHeader('Access-Control-Allow-Origin', BBConfig.envVars.NODE_ENV === "production" ? "https://www.example.com" : "http://localhost:3000"); //FIXME: It won't be good when i'll have staging
	return this.bookService.verifyClientPayment({}, tx);
}

It seems I could probably set the status code in the same way I set a header, but I would say this is sort of tricky: it only allow us to set headers / status code if we do this in a syncronous flow, before the observable result is returned.

Having a look at the nest repository, it seems the headers are send as soon as the observable object is returned by the controller handler:

Having a look at packages/core/router/router-response-controller.ts:

public sse<
    TInput extends Observable<unknown> = any,
    TResponse extends WritableHeaderStream = any,
    TRequest extends IncomingMessage = any,
  >(
    result: TInput,
    response: TResponse,
    request: TRequest,
    options?: { additionalHeaders: AdditionalHeaders },
  ) {
    // It's possible that we sent headers already so don't use a stream
    if (response.writableEnded) {
      return;
    }

    this.assertObservable(result);

    const stream = new SseStream(request);
    stream.pipe(response, options);

    // Code continues handling the observable result.
}

When piping the SSE stream, it seems headers are flushed whatsoever:

Having a look at packages/core/router/sse-stream.ts

pipe<T extends WritableHeaderStream>(
    destination: T,
    options?: {
      additionalHeaders?: AdditionalHeaders;
      end?: boolean;
    },
  ): T {
    if (destination.writeHead) {
      destination.writeHead(200, {
        ...options?.additionalHeaders,
        // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
        'Content-Type': 'text/event-stream',
        Connection: 'keep-alive',
        // Disable cache, even for old browsers and proxies
        'Cache-Control':
          'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
        Pragma: 'no-cache',
        Expire: '0',
        // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
        'X-Accel-Buffering': 'no',
      });
      destination.flushHeaders();
    }

    destination.write('\n');
    return super.pipe(destination, options);
  }

So, keeping all of this in mind, I cannot see a way of sending a non 200 http status code nor some custom headers if I require an async flow to determine these headers / status code.

A simple example would be a SSE endpoint to subscribe to the events happening on a resource. Let's say I want to implement a GET v1/games/{gameId}/events SSE endpoint. I expect this endpoint to return a 404 status code if the {gameId} game resource does not exist. In order to accomplish that, I would need to search my game resource in my data source, an operation it's very likely to be asyncronous. By the way I would had fetched my data source response, the headers would had been flushed, so it would be too late to send my 404 status code in case no game is found.

Describe the solution you'd like

I would love an abstraction that lets the developer do an async flow in order to determine custom headers / status codes.

Maybe this is a terrible approach, but allowing SSE controller handlers to return a Promise<Observable<MessageEvent>> could be a simple, non breaking change solution. Sending an Observable<MessageEvent> would still be allowed. The idea would be waiting for the observable promise to be fullfilled so the sse handler can set custom headers / custom status codes. Once the promise is fullfilled, headers would be flushed.

Teachability, documentation, adoption, migration strategy

In case of going this way, SSE doc page should be updated to reflect the possibilty of sending a Promise<Observable<MessageEvent>> and some examples should be added.

The migration strategy would be trivial imo due to the fact this is not a breaking change. Sending an Observable<MessageEvent> would still be allowed.

What is the motivation / use case for changing the behavior?

The lack of an approach to set custom headers / status code in the scenario I described in the previous section of the issue.

@notaphplover notaphplover added needs triage This issue has not been looked into type: enhancement 🐺 labels Aug 22, 2023
@kamilmysliwiec
Copy link
Member

In case of going this way, SSE doc page should be updated to reflect the possibilty of sending a Promise<Observable> and some examples should be added.

Why can't you use rxjs defer and then mergeMap operator?

@notaphplover
Copy link
Author

notaphplover commented Aug 22, 2023

Hey @kamilmysliwiec, thank you so much for your time :)

Why can't you use rxjs defer and then mergeMap operator?

I could but I would hazard to say that would not solve the issue. Even if I use those excellent tools, headers are flushed just after the observable is returned. This is done before any asyncronous operation finishes. Nest won't wait for any asyncronous operation at all before sending those headers.

Let's supose we go that way, then packages/core/router/router-response-controller.ts is going to receive the observable result:

  public sse<
    TInput extends Observable<unknown> = any,
    TResponse extends WritableHeaderStream = any,
    TRequest extends IncomingMessage = any,
  >(
    result: TInput,
    response: TResponse,
    request: TRequest,
    options?: { additionalHeaders: AdditionalHeaders },
  ) {
    // It's possible that we sent headers already so don't use a stream
    if (response.writableEnded) {
      return;
    }

    this.assertObservable(result);

    const stream = new SseStream(request);
    stream.pipe(response, options);

    // non relevant code continues
}

stream.pipe(response, options); is likely to be called way before any async operation is completed even with defer and mergeMap because NestJS doesn't care about what's going on in the observable to decide when it sends HTTP headers. Once stream.pipe(response, options); is called, headers are flushed and after that it's too late to send headers / status code.

The Observable pattern is great but asumes we always want to start an alive connection. Promise<Observable<MessageEvent>> is not my cup of tea, but it would mean "I am deciding if I really want to accept this connection, wait until I finish before starting to send an HTTP Response". Any other abstraction able to represent this would be great to consider.

To ilustrate the possible solution, consider the following change at packages/core/router/router-response-controller.ts:

public async sse<
    TInput extends Observable<unknown> = any,
    TResponse extends WritableHeaderStream = any,
    TRequest extends IncomingMessage = any,
  >(
    result: TInput | Promise<TInput>,
    response: TResponse,
    request: TRequest,
    options?: { additionalHeaders: AdditionalHeaders },
  ) {
    // It's possible that we sent headers already so don't use a stream
    if (response.writableEnded) {
      return;
    }

    let observableResult: TInput;

    if (isPromise(result)) {
        observableResult = await result;
    } else {
       observableResult = result;
    }

    this.assertObservable(observableResult);

    const stream = new SseStream(request);
    stream.pipe(response, options);

   // Code continues using observableResult instead of result
}

This way an asyncronous flow could decide to append additional headers to the response or even set a different status code.
Of course, some additional minor changes would be required in order to integrate this changes, but that would be the basic idea

@notaphplover
Copy link
Author

notaphplover commented Aug 25, 2023

After sucessfully implementing an abstraction that allows my NestJS framework layer interact with a normal NestJS controller to serve a SSE endpoint, let me add some caveats to consider before implementing this feature in the core package:

  1. It seems the NodeJS HTTP API is flexible enough to define and redefine headers when using the implicit mode, developers can safely set headers and then response.raw.writeHead would set SSE headers, but this approach wouldn't allow setting the status code, so it would be great if NestJS tries to extract an already set status code from response.raw in order to call response.raw.writeHead with the custom status code set.
  2. The suggested approach of waiting for an async flow before sending heaaders has the implication of disallowing the attemp of sending any MessageEvent (something that would trigger an attempt to send any partial body before having sent HTTP headers!). In order to accomplish this, a buffer should capture MessageEvent emitted in this lapse of time so the buffer can be emptied once headers are sent.
  3. Given the fact an SSE controller can be reduced to a traditional nest controller, does it make sense to keep this feature in the core package? Maybe with the right design we could extract this feature to a @nestjs/sse package, so users would install it and use an @Sse decorator that, under the scenes, uses the @Get decorator to provide the sse endpoint handler. This way we could get rid of some complexity from the core package in the next major release.

The implementation on my side was a little bit tedious but pretty straightforward. I hope we find the right abstraction in order to go for this feature so no one needs to reinvent the wheel again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into type: enhancement 🐺
Projects
None yet
Development

No branches or pull requests

2 participants