Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Render async timeouts in the listener, as Jetty 9 requires #221

Merged
merged 1 commit into from Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 25 additions & 15 deletions servlet/src/main/scala/org/http4s/servlet/AsyncHttp4sServlet.scala
Expand Up @@ -62,17 +62,13 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
ctx.setTimeout(asyncTimeoutMillis)
// Must be done on the container thread for Tomcat's sake when using async I/O.
val bodyWriter = servletIo.initWriter(servletResponse)
val result = F
.attempt(
toRequest(servletRequest).fold(
onParseFailure(_, servletResponse, bodyWriter),
val result =
toRequest(servletRequest)
.fold(
onParseFailure(_, servletResponse),
handleRequest(ctx, _, bodyWriter),
)
)
.flatMap {
case Right(()) => F.delay(ctx.complete)
case Left(t) => errorHandler(servletRequest, servletResponse)(t)
}
.recoverWith(errorHandler(servletRequest, servletResponse))
dispatcher.unsafeRunAndForget(result)
} catch errorHandler(servletRequest, servletResponse).andThen(dispatcher.unsafeRunSync _)

Expand All @@ -87,17 +83,23 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
// It is an error to add a listener to an async context that is
// already completed, so we must take care to add the listener
// before the response can complete.

val timeout =
F.async[Response[F]](cb =>
F.async[Unit](cb =>
gate.complete(ctx.addListener(new AsyncTimeoutHandler(cb))).as(noopCancelToken)
)
val response =
gate.get *>
F.defer(serviceFn(request))
.recoverWith(serviceErrorHandler(request))
val servletResponse = ctx.getResponse.asInstanceOf[HttpServletResponse]
F.race(timeout, response).flatMap(r => renderResponse(r.merge, servletResponse, bodyWriter))
F.race(timeout, response).flatMap {
case Left(_) =>
// In Jetty, if onTimeout is called, we need to complete on the
// listener's own thread.
F.unit
case Right(resp) =>
val servletResponse = ctx.getResponse.asInstanceOf[HttpServletResponse]
renderResponse(resp, servletResponse, bodyWriter) *> F.delay(ctx.complete())
}
}

private def errorHandler(
Expand All @@ -124,11 +126,19 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
}
}

private class AsyncTimeoutHandler(cb: Callback[Response[F]]) extends AbstractAsyncListener {
private class AsyncTimeoutHandler(cb: Callback[Unit]) extends AbstractAsyncListener {
override def onTimeout(event: AsyncEvent): Unit = {
// In Jetty, we must complete on the same thread as the timeout
// handler. This triggers a cancellation of the service so we
// can take over.
cb(Right(()))

val ctx = event.getAsyncContext
val req = event.getAsyncContext.getRequest.asInstanceOf[HttpServletRequest]
logger.info(s"Request timed out: ${req.getMethod} ${req.getServletPath}${req.getPathInfo}")
cb(Right(Response.timeout[F]))
val resp = event.getAsyncContext.getResponse.asInstanceOf[HttpServletResponse]
resp.sendError(Response.timeout.status.code, "Response timed out")
ctx.complete()
}
}
}
Expand Down
Expand Up @@ -61,7 +61,7 @@ class BlockingHttp4sServlet[F[_]] private (
val bodyWriter = servletIo.initWriter(servletResponse)

val render = toRequest(servletRequest).fold(
onParseFailure(_, servletResponse, bodyWriter),
onParseFailure(_, servletResponse),
handleRequest(_, servletResponse, bodyWriter),
)

Expand Down
13 changes: 9 additions & 4 deletions servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala
Expand Up @@ -77,14 +77,19 @@ abstract class Http4sServlet[F[_]](
serverSoftware = ServerSoftware(servletContext.getServerInfo)
}

@deprecated("Use the overload without bodyWriter.", "0.23.15")
protected def onParseFailure(
parseFailure: ParseFailure,
servletResponse: HttpServletResponse,
bodyWriter: BodyWriter[F],
): F[Unit] = {
val response = Response[F](Status.BadRequest).withEntity(parseFailure.sanitized)
renderResponse(response, servletResponse, bodyWriter)
}
): F[Unit] =
onParseFailure(parseFailure, servletResponse)

protected def onParseFailure(
parseFailure: ParseFailure,
servletResponse: HttpServletResponse,
): F[Unit] =
F.delay(servletResponse.sendError(Status.BadRequest.code, parseFailure.sanitized))

protected def renderResponse(
response: Response[F],
Expand Down