Skip to content

Commit

Permalink
Merge branch 'series/0.23'
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Jul 29, 2021
2 parents 530bf82 + 60f9be1 commit 453221b
Show file tree
Hide file tree
Showing 45 changed files with 1,635 additions and 618 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
scala: [2.13.6, 2.12.13, 3.0.0]
scala: [2.13.6, 2.12.14, 3.0.0]
java: [adopt@1.8, adopt@1.11, adopt@1.16]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -82,7 +82,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.13]
scala: [2.12.14]
java: [adopt@1.8]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -134,7 +134,7 @@ jobs:
echo "$SSH_PRIVATE_KEY" | ssh-add -
git config --global user.name "GitHub Actions CI"
git config --global user.email "ghactions@invalid"
sbt ++2.12.13 website/makeSite website/ghpagesPushSite
sbt ++2.12.14 website/makeSite website/ghpagesPushSite
Expand All @@ -143,7 +143,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.13]
scala: [2.12.14]
java: [adopt@1.8]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -172,7 +172,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.13]
scala: [2.12.14]
java: [adopt@1.8]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -201,7 +201,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.13.6, 2.12.13, 3.0.0]
scala: [2.13.6, 2.12.14, 3.0.0]
java: [adopt@1.8]
runs-on: ${{ matrix.os }}
steps:
Expand Down
100 changes: 38 additions & 62 deletions blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package org.http4s
package blaze
package client

import cats.effect.kernel.{Async, Resource}
import cats.effect.kernel.{Async, Deferred, Resource}
import cats.effect.implicits._
import cats.syntax.all._
import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import java.util.concurrent.{CancellationException, TimeoutException}
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.{IdleTimeoutStage, ResponseHeaderTimeoutStage}
import org.http4s.blazecore.ResponseHeaderTimeoutStage
import org.http4s.client.{Client, RequestKey}
import org.log4s.getLogger
import scala.concurrent.ExecutionContext
Expand All @@ -39,7 +39,6 @@ object BlazeClient {
private[blaze] def makeClient[F[_], A <: BlazeConnection[F]](
manager: ConnectionManager[F, A],
responseHeaderTimeout: Duration,
idleTimeout: Duration,
requestTimeout: Duration,
scheduler: TickWheelExecutor,
ec: ExecutionContext
Expand All @@ -63,69 +62,46 @@ object BlazeClient {
invalidate(next.connection)
}

def idleTimeoutStage(conn: A): Resource[F, Option[IdleTimeoutStage[ByteBuffer]]] =
Resource.makeCase {
idleTimeout match {
case d: FiniteDuration =>
val stage = new IdleTimeoutStage[ByteBuffer](d, scheduler, ec)
F.delay(conn.spliceBefore(stage)).as(Some(stage))
case _ =>
F.pure(None)
}
} {
case (_, ExitCase.Succeeded) => F.unit
case (stageOpt, _) => F.delay(stageOpt.foreach(_.removeStage()))
}

def loop: F[Resource[F, Response[F]]] =
borrow.use { next =>
idleTimeoutStage(next.connection).use { stageOpt =>
val idleTimeoutF = stageOpt match {
case Some(stage) =>
F.async[TimeoutException] { cb =>
F.delay(stage.init(cb)).as(None)
}
case None => F.never[TimeoutException]
val res: F[Resource[F, Response[F]]] = next.connection
.runRequest(req)
.map { response: Resource[F, Response[F]] =>
response.flatMap(r =>
Resource.make(F.pure(r))(_ => manager.release(next.connection)))
}
val res = next.connection
.runRequest(req, idleTimeoutF)
.map { r =>
Resource.makeCase(F.pure(r)) {
case (_, ExitCase.Succeeded) =>
F.delay(stageOpt.foreach(_.removeStage()))
.guarantee(manager.release(next.connection))
case _ =>
F.delay(stageOpt.foreach(_.removeStage()))
.guarantee(manager.invalidate(next.connection))
}
}

responseHeaderTimeout match {
case responseHeaderTimeout: FiniteDuration =>
F.deferred[Unit].flatMap { gate =>
val responseHeaderTimeoutF: F[TimeoutException] =
F.delay {
val stage =
new ResponseHeaderTimeoutStage[ByteBuffer](
responseHeaderTimeout,
scheduler,
ec)
next.connection.spliceBefore(stage)
stage
}.bracket { stage =>
F.async[TimeoutException] { cb =>
F.delay(stage.init(cb)) >> gate.complete(()).as(None)
}
}(stage => F.delay(stage.removeStage()))
responseHeaderTimeout match {
case responseHeaderTimeout: FiniteDuration =>
Deferred[F, Unit].flatMap { gate =>
val responseHeaderTimeoutF: F[TimeoutException] =
F.delay {
val stage =
new ResponseHeaderTimeoutStage[ByteBuffer](
responseHeaderTimeout,
scheduler,
ec)
next.connection.spliceBefore(stage)
stage
}.bracket(stage =>
F.async[TimeoutException] { cb =>
F.delay(stage.init(cb)) >> gate.complete(()).as(None)
})(stage => F.delay(stage.removeStage()))

F.race(gate.get *> res, responseHeaderTimeoutF)
.flatMap[Resource[F, Response[F]]] {
case Left(r) => F.pure(r)
case Right(t) => F.raiseError(t)
}
}
case _ => res
}
F.racePair(gate.get *> res, responseHeaderTimeoutF)
.flatMap[Resource[F, Response[F]]] {
case Left((outcome, fiber)) =>
fiber.cancel >> outcome.embed(
F.raiseError(new CancellationException("Response canceled")))
case Right((fiber, outcome)) =>
fiber.cancel >> outcome.fold(
F.raiseError(new TimeoutException("Response timeout also timed out")),
F.raiseError,
_.flatMap(F.raiseError)
)
}
}
case _ => res
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,28 @@ import org.log4s.getLogger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/** @param sslContext Some custom `SSLContext`, or `None` if the
* default SSL context is to be lazily instantiated.
/** Configure and obtain a BlazeClient
* @param responseHeaderTimeout duration between the submission of a request and the completion of the response header. Does not include time to read the response body.
* @param idleTimeout duration that a connection can wait without traffic being read or written before timeout
* @param requestTimeout maximum duration from the submission of a request through reading the body before a timeout.
* @param connectTimeout Duration a connection attempt times out after
* @param userAgent optional custom user agent header
* @param maxTotalConnections maximum connections the client will have at any specific time
* @param maxWaitQueueLimit maximum number requests waiting for a connection at any specific time
* @param maxConnectionsPerRequestKey Map of RequestKey to number of max connections
* @param sslContext Some custom `SSLContext`, or `None` if the default SSL context is to be lazily instantiated.
* @param checkEndpointIdentification require endpoint identification for secure requests according to RFC 2818, Section 3.1. If the certificate presented does not match the hostname of the request, the request fails with a CertificateException. This setting does not affect checking the validity of the cert via the sslContext's trust managers.
* @param maxResponseLineSize maximum length of the request line
* @param maxHeaderLength maximum length of headers
* @param maxChunkSize maximum size of chunked content chunks
* @param chunkBufferMaxSize Size of the buffer that is used when Content-Length header is not specified.
* @param parserMode lenient or strict parsing mode. The lenient mode will accept illegal chars but replaces them with � (0xFFFD)
* @param bufferSize internal buffer size of the blaze client
* @param executionContext custom executionContext to run async computations.
* @param scheduler execution scheduler
* @param asynchronousChannelGroup custom AsynchronousChannelGroup to use other than the system default
* @param channelOptions custom socket options
* @param customDnsResolver customDnsResolver to use other than the system default
*/
sealed abstract class BlazeClientBuilder[F[_]] private (
val responseHeaderTimeout: Duration,
Expand Down Expand Up @@ -219,7 +239,6 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
} yield BlazeClient.makeClient(
manager = manager,
responseHeaderTimeout = responseHeaderTimeout,
idleTimeout = idleTimeout,
requestTimeout = requestTimeout,
scheduler = scheduler,
ec = executionContext
Expand Down Expand Up @@ -282,6 +301,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
channelOptions = channelOptions,
connectTimeout = connectTimeout,
dispatcher = dispatcher,
idleTimeout = idleTimeout,
getAddress = customDnsResolver.getOrElse(BlazeClientBuilder.getAddress(_))
).makeClient
Resource.make(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package org.http4s
package blaze
package client

import cats.effect.Resource

import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import org.http4s.blaze.pipeline.TailStage
import org.http4s.client.Connection

private trait BlazeConnection[F[_]] extends TailStage[ByteBuffer] with Connection[F] {
def runRequest(req: Request[F], idleTimeout: F[TimeoutException]): F[Response[F]]
def runRequest(req: Request[F]): F[Resource[F, Response[F]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package org.http4s
package blaze
package client

import cats.effect.kernel.{Async, Resource}
import cats.effect.kernel.{Async, Outcome, Resource}
import cats.effect.std.Dispatcher
import cats.effect.implicits._
import cats.syntax.all._
import fs2._

import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import org.http4s.Uri.{Authority, RegName}
import org.http4s.blaze.pipeline.Command.EOF
import org.http4s.blazecore.Http1Stage
import org.http4s.blazecore.{Http1Stage, IdleTimeoutStage}
import org.http4s.blazecore.util.Http1Writer
import org.http4s.client.RequestKey
import org.http4s.headers.{Connection, Host, `Content-Length`, `User-Agent`}
Expand All @@ -47,6 +48,7 @@ private final class Http1Connection[F[_]](
override val chunkBufferMaxSize: Int,
parserMode: ParserMode,
userAgent: Option[`User-Agent`],
idleTimeoutStage: Option[IdleTimeoutStage[ByteBuffer]],
override val dispatcher: Dispatcher[F]
)(implicit protected val F: Async[F])
extends Http1Stage[F]
Expand Down Expand Up @@ -114,7 +116,10 @@ private final class Http1Connection[F[_]](
val state = stageState.get()
val nextState = state match {
case ReadWrite => Some(Write)
case Read => Some(Idle(Some(startIdleRead())))
case Read =>
// idleTimeout is activated when entering ReadWrite state, remains active throughout Read and Write and is deactivated when entering the Idle state
idleTimeoutStage.foreach(_.cancelTimeout())
Some(Idle(Some(startIdleRead())))
case _ => None
}

Expand All @@ -129,7 +134,10 @@ private final class Http1Connection[F[_]](
val state = stageState.get()
val nextState = state match {
case ReadWrite => Some(Read)
case Write => Some(Idle(Some(startIdleRead())))
case Write =>
// idleTimeout is activated when entering ReadWrite state, remains active throughout Read and Write and is deactivated when entering the Idle state
idleTimeoutStage.foreach(_.cancelTimeout())
Some(Idle(Some(startIdleRead())))
case _ => None
}

Expand All @@ -149,16 +157,16 @@ private final class Http1Connection[F[_]](
f
}

def runRequest(req: Request[F], idleTimeoutF: F[TimeoutException]): F[Response[F]] =
F.defer[Response[F]] {
def runRequest(req: Request[F]): F[Resource[F, Response[F]]] =
F.defer[Resource[F, Response[F]]] {
stageState.get match {
case i @ Idle(idleRead) =>
if (stageState.compareAndSet(i, ReadWrite)) {
logger.debug(s"Connection was idle. Running.")
executeRequest(req, idleTimeoutF, idleRead)
executeRequest(req, idleRead)
} else {
logger.debug(s"Connection changed state since checking it was idle. Looping.")
runRequest(req, idleTimeoutF)
runRequest(req)
}
case ReadWrite | Read | Write =>
logger.error(s"Tried to run a request already in running state.")
Expand All @@ -176,8 +184,7 @@ private final class Http1Connection[F[_]](

private def executeRequest(
req: Request[F],
idleTimeoutF: F[TimeoutException],
idleRead: Option[Future[ByteBuffer]]): F[Response[F]] = {
idleRead: Option[Future[ByteBuffer]]): F[Resource[F, Response[F]]] = {
logger.debug(s"Beginning request: ${req.method} ${req.uri}")
validateRequest(req) match {
case Left(e) =>
Expand All @@ -199,6 +206,11 @@ private final class Http1Connection[F[_]](
case None => getHttpMinor(req) == 0
}

val idleTimeoutF = idleTimeoutStage match {
case Some(stage) => F.async_[TimeoutException](stage.setTimeout)
case None => F.never[TimeoutException]
}

idleTimeoutF.start.flatMap { timeoutFiber =>
val idleTimeoutS = timeoutFiber.joinWithNever.attempt.map {
case Right(t) => Left(t): Either[Throwable, Unit]
Expand All @@ -213,18 +225,24 @@ private final class Http1Connection[F[_]](
case t => F.delay(logger.error(t)("Error rendering request"))
}

val response: F[Response[F]] = for {
writeFiber <- writeRequest.start
response <- receiveResponse(
mustClose,
doesntHaveBody = req.method == Method.HEAD,
idleTimeoutS,
idleRead)
_ <- writeFiber.join
} yield response
val response: F[Resource[F, Response[F]]] =
F.bracketCase(
writeRequest.start
)(writeFiber =>
receiveResponse(
mustClose,
doesntHaveBody = req.method == Method.HEAD,
idleTimeoutS,
idleRead
// We need to wait for the write to complete so that by the time we attempt to recycle the connection it is fully idle.
).map(response =>
Resource.make(F.pure(writeFiber))(_.join.attempt.void).as(response))) {
case (_, Outcome.Succeeded(_)) => F.unit
case (writeFiber, Outcome.Canceled() | Outcome.Errored(_)) => writeFiber.cancel
}

F.race(response, timeoutFiber.joinWithNever)
.flatMap[Response[F]] {
.flatMap {
case Left(r) =>
F.pure(r)
case Right(t) =>
Expand Down

0 comments on commit 453221b

Please sign in to comment.