Skip to content

Commit

Permalink
Merge pull request #69 from MellowYarker/milan/enable-websocket-compr…
Browse files Browse the repository at this point in the history
…ession

Enable WebSocket Compression in `workerd`
  • Loading branch information
kentonv committed Oct 5, 2022
2 parents eac1795 + 5370f10 commit 41d09c1
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 37 deletions.
10 changes: 5 additions & 5 deletions WORKSPACE
Expand Up @@ -31,14 +31,14 @@ load("@rules_foreign_cc//foreign_cc:repositories.bzl", "rules_foreign_cc_depende
rules_foreign_cc_dependencies()

# ========================================================================================
# Simple dependenciess
# Simple dependencies

http_archive(
name = "capnp-cpp",
sha256 = "03494ebba861cbe6d141417232f5b306731a1317b81a8ff9830bdd52f60ba456",
strip_prefix = "capnproto-capnproto-54ce3da/c++",
sha256 = "0cb62c35736ab4202a3e2f245ef4ac34549b209cb79e070711e42293fc4daf1c",
strip_prefix = "capnproto-capnproto-253c18f/c++",
type = "tgz",
urls = ["https://github.com/capnproto/capnproto/tarball/54ce3daa0ff43146bec861ec28747ee15222f032"],
urls = ["https://github.com/capnproto/capnproto/tarball/253c18fc6d8e21bb1114c720ab778fc397115c41"],
)

http_archive(
Expand Down Expand Up @@ -154,7 +154,7 @@ load("//rust-deps/cxxbridge_crates:crates.bzl", cxxbridge_repositories = "crate_
cxxbridge_repositories()

# ========================================================================================
# V8 and its depnedencies
# V8 and its dependencies
#
# Note that googlesource does not generate tarballs deterministically, so we cannot use
# http_archive: https://github.com/google/gitiles/issues/84
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/api/cache.c++
Expand Up @@ -67,7 +67,8 @@ jsg::Unimplemented Cache::addAll(kj::Array<Request::Info> requests) {
}

jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
jsg::Lock& js, Request::Info requestOrUrl, jsg::Optional<CacheQueryOptions> options) {
jsg::Lock& js, Request::Info requestOrUrl, jsg::Optional<CacheQueryOptions> options,
CompatibilityFlags::Reader flags) {
// TODO(someday): Implement Cache API in preview.
auto& context = IoContext::current();
if (context.isFiddle()) {
Expand All @@ -94,7 +95,7 @@ jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
kj::HttpMethod::GET, validateUrl(jsRequest->getUrl()), requestHeaders, uint64_t(0));

return context.awaitIo(js, kj::mv(nativeRequest.response),
[httpClient = kj::mv(httpClient), &context]
[httpClient = kj::mv(httpClient), &context, flags = kj::mv(flags)]
(jsg::Lock& js, kj::HttpClient::Response&& response)
mutable -> jsg::Optional<jsg::Ref<Response>> {
response.body = response.body.attach(kj::mv(httpClient));
Expand Down Expand Up @@ -132,7 +133,7 @@ jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
return makeHttpResponse(
js, kj::HttpMethod::GET, {},
response.statusCode, response.statusText, *response.headers,
kj::mv(response.body), nullptr);
kj::mv(response.body), nullptr, flags);
});
});
}
Expand Down Expand Up @@ -277,7 +278,7 @@ jsg::Promise<void> Cache::put(
// We need to send the response to our serializer immediately in order to fulfill Cache.put()'s
// contract: the caller should be able to observe that the response body is disturbed as soon
// as put() returns.
auto serializePromise = jsResponse->send(js, serializer, {});
auto serializePromise = jsResponse->send(js, serializer, {}, nullptr);
auto payload = serializer.getPayload();

// TODO(someday): Implement Cache API in preview. This bail-out lives all the way down here,
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/cache.h
Expand Up @@ -43,7 +43,8 @@ class Cache: public jsg::Object {
jsg::Unimplemented addAll(kj::Array<Request::Info> requests);

jsg::Promise<jsg::Optional<jsg::Ref<Response>>> match(
jsg::Lock& js, Request::Info request, jsg::Optional<CacheQueryOptions> options);
jsg::Lock& js, Request::Info request, jsg::Optional<CacheQueryOptions> options,
CompatibilityFlags::Reader flags);

jsg::Promise<void> put(
jsg::Lock& js, Request::Info request, jsg::Ref<Response> response);
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/global-scope.c++
Expand Up @@ -243,7 +243,7 @@ kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(
return ioContext.awaitJs(promise->then(kj::implicitCast<jsg::Lock&>(lock),
ioContext.addFunctor(
[&response, allowWebSocket = headers.isWebSocket(),
canceled = kj::addRef(*canceled)]
canceled = kj::addRef(*canceled), &headers]
(jsg::Lock& js, jsg::Ref<Response> innerResponse)
-> IoOwn<kj::Promise<DeferredProxy<void>>> {
auto& context = IoContext::current();
Expand All @@ -253,7 +253,7 @@ kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(
return context.addObject(kj::heap(addNoopDeferredProxy(kj::READY_NOW)));
} else {
return context.addObject(kj::heap(innerResponse->send(
js, response, { .allowWebSocket = allowWebSocket })));
js, response, { .allowWebSocket = allowWebSocket }, headers)));
}
}))).attach(kj::defer([canceled = kj::mv(canceled)]() mutable { canceled->value = true; }))
.then([ownRequestBody = kj::mv(ownRequestBody), deferredNeuter = kj::mv(deferredNeuter)]
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/html-rewriter.c++
Expand Up @@ -1032,7 +1032,7 @@ jsg::Ref<Response> HTMLRewriter::transform(
auto ts = IdentityTransformStream::constructor(js);
auto bodySource = ts->getReadable()->removeSource(js);
auto body = jsg::alloc<ReadableStream>(ioContext, kj::mv(bodySource));
response = Response::constructor(js, kj::Maybe(kj::mv(body)), kj::mv(response));
response = Response::constructor(js, kj::Maybe(kj::mv(body)), kj::mv(response), featureFlags);

auto outputSink = ts->getWritable()->removeSink(js);

Expand Down
59 changes: 48 additions & 11 deletions src/workerd/api/http.c++
Expand Up @@ -950,7 +950,8 @@ kj::Maybe<kj::String> Request::serializeCfBlobJson(jsg::Lock& js) {
jsg::Ref<Response> Response::constructor(
jsg::Lock& js,
jsg::Optional<kj::Maybe<Body::Initializer>> optionalBodyInit,
jsg::Optional<Initializer> maybeInit) {
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags) {
auto bodyInit = kj::mv(optionalBodyInit).orDefault(nullptr);
Initializer init = kj::mv(maybeInit).orDefault(InitializerDict());

Expand Down Expand Up @@ -1087,11 +1088,12 @@ jsg::Ref<Response> Response::constructor(
}

return jsg::alloc<Response>(statusCode, KJ_ASSERT_NONNULL(kj::mv(statusText)), kj::mv(headers),
kj::mv(cf), kj::mv(body), nullptr, kj::mv(webSocket), bodyEncoding);
kj::mv(cf), kj::mv(body), flags, nullptr, kj::mv(webSocket),
bodyEncoding);
}

jsg::Ref<Response> Response::redirect(
jsg::Lock& js, kj::String url, jsg::Optional<int> status) {
jsg::Lock& js, kj::String url, jsg::Optional<int> status, CompatibilityFlags::Reader flags) {
auto statusCode = status.orDefault(302);
if (!isRedirectStatusCode(statusCode)) {
JSG_FAIL_REQUIRE(RangeError,
Expand Down Expand Up @@ -1119,13 +1121,14 @@ jsg::Ref<Response> Response::redirect(

auto statusText = KJ_ASSERT_NONNULL(defaultStatusText(statusCode));

return jsg::alloc<Response>(statusCode, kj::str(statusText), kj::mv(headers), nullptr, nullptr);
return jsg::alloc<Response>(statusCode, kj::str(statusText), kj::mv(headers), nullptr, nullptr, flags);
}

jsg::Ref<Response> Response::json_(
jsg::Lock& js,
v8::Local<v8::Value> any,
jsg::Optional<Initializer> maybeInit) {
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags) {

const auto maybeSetContentType = [](auto headers) {
if (!headers->hasLowerCase("content-type"_kj)) {
Expand Down Expand Up @@ -1180,10 +1183,10 @@ jsg::Ref<Response> Response::json_(
};
}
kj::String json = js.serializeJson(any);
return constructor(js, kj::Maybe(kj::mv(json)), kj::mv(maybeInit));
return constructor(js, kj::Maybe(kj::mv(json)), kj::mv(maybeInit), flags);
}

jsg::Ref<Response> Response::clone(jsg::Lock& js) {
jsg::Ref<Response> Response::clone(jsg::Lock& js, CompatibilityFlags::Reader flags) {
JSG_REQUIRE(webSocket == nullptr,
TypeError, "Cannot clone a response to a WebSocket handshake.");

Expand All @@ -1199,11 +1202,12 @@ jsg::Ref<Response> Response::clone(jsg::Lock& js) {

return jsg::alloc<Response>(
statusCode, kj::str(statusText), kj::mv(headersClone), kj::mv(cfClone), kj::mv(bodyClone),
kj::mv(urlListClone));
flags, kj::mv(urlListClone));
}

kj::Promise<DeferredProxy<void>> Response::send(
jsg::Lock& js, kj::HttpService::Response& outer, SendOptions options) {
jsg::Lock& js, kj::HttpService::Response& outer, SendOptions options,
kj::Maybe<const kj::HttpHeaders&> maybeReqHeaders) {
JSG_REQUIRE(!getBodyUsed(), TypeError, "Body has already been used. "
"It can only be used once. Use tee() first if you need to read it twice.");

Expand Down Expand Up @@ -1231,6 +1235,32 @@ kj::Promise<DeferredProxy<void>> Response::send(
"Worker tried to return a WebSocket in a response to a request "
"which did not contain the header \"Upgrade: websocket\".");

if (hasEnabledWebSocketCompression &&
outHeaders.get(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS) == nullptr) {
// Since workerd uses `MANUAL_COMPRESSION` mode for websocket compression, we need to
// pass the headers we want to support to `acceptWebSocket()`.
KJ_IF_MAYBE(config, (*ws)->getPreferredExtensions(kj::WebSocket::ExtensionsContext::RESPONSE)) {
// We try to get extensions for use in a response (i.e. for a server side websocket).
// This allows us to `optimizedPumpTo()` `webSocket`.
outHeaders.set(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS, *config);
} else {
// `webSocket` is not a WebSocketImpl, we want to support whatever valid config the client
// requested, so we'll just use the client's requested headers.
KJ_IF_MAYBE(reqHeaders, maybeReqHeaders) {
KJ_IF_MAYBE(value, reqHeaders->get(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS)) {
outHeaders.set(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS, *value);
}
}
}
}

if (!hasEnabledWebSocketCompression) {
// While we guard against an origin server including `Sec-WebSocket-Extensions` in a Response
// (we don't send the extension in an offer, and if the server includes it in a response we
// will reject the connection), a Worker could still explicitly add the header to a Response.
outHeaders.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}

auto clientSocket = outer.acceptWebSocket(outHeaders);
return (*ws)->couple(kj::mv(clientSocket));
} else KJ_IF_MAYBE(jsBody, getBody()) {
Expand Down Expand Up @@ -1436,6 +1466,11 @@ jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
urlList.back().toString(kj::Url::HTTP_PROXY_REQUEST).asBytes());

if (headers.isWebSocket()) {
if (!featureFlags.getWebSocketCompression()) {
// If we haven't enabled the websocket compression feature flag, strip the header from the
// subrequest.
headers.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}
return ioContext.awaitIo(js,
AbortSignal::maybeCancelWrap(signal, client->openWebSocket(url, headers)),
[fetcher = kj::mv(fetcher), featureFlags, jsRequest = kj::mv(jsRequest),
Expand Down Expand Up @@ -1464,6 +1499,7 @@ jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
response.statusCode, response.statusText, *response.headers,
kj::heap<NullInputStream>(),
jsg::alloc<WebSocket>(kj::mv(webSocket), WebSocket::REMOTE),
featureFlags,
Response::BodyEncoding::AUTO,
kj::mv(signal)));
}
Expand Down Expand Up @@ -1580,7 +1616,7 @@ jsg::Promise<jsg::Ref<Response>> handleHttpResponse(

auto result = makeHttpResponse(js, jsRequest->getMethodEnum(), kj::mv(urlList),
response.statusCode, response.statusText, *response.headers,
kj::mv(response.body), nullptr, Response::BodyEncoding::AUTO,
kj::mv(response.body), nullptr, featureFlags, Response::BodyEncoding::AUTO,
kj::mv(signal));

return js.resolvedPromise(kj::mv(result));
Expand Down Expand Up @@ -1673,6 +1709,7 @@ jsg::Ref<Response> makeHttpResponse(
jsg::Lock& js, kj::HttpMethod method, kj::Vector<kj::Url> urlListParam,
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Own<kj::AsyncInputStream> body, kj::Maybe<jsg::Ref<WebSocket>> webSocket,
CompatibilityFlags::Reader flags,
Response::BodyEncoding bodyEncoding,
kj::Maybe<jsg::Ref<AbortSignal>> signal) {
auto responseHeaders = jsg::alloc<Headers>(headers, Headers::Guard::RESPONSE);
Expand Down Expand Up @@ -1701,7 +1738,7 @@ jsg::Ref<Response> makeHttpResponse(
// TODO(someday): Fill response CF blob from somewhere?
return jsg::alloc<Response>(
statusCode, kj::str(statusText), kj::mv(responseHeaders),
nullptr, kj::mv(responseBody), kj::mv(urlList), kj::mv(webSocket));
nullptr, kj::mv(responseBody), flags, kj::mv(urlList), kj::mv(webSocket));
}

jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
Expand Down
20 changes: 14 additions & 6 deletions src/workerd/api/http.h
Expand Up @@ -652,6 +652,7 @@ class Response: public Body {

Response(int statusCode, kj::String statusText, jsg::Ref<Headers> headers,
kj::Maybe<jsg::V8Ref<v8::Object>> cf, kj::Maybe<Body::ExtractedBody> body,
CompatibilityFlags::Reader reader,
kj::Array<kj::String> urlList = {},
kj::Maybe<jsg::Ref<WebSocket>> webSocket = nullptr,
Response::BodyEncoding bodyEncoding = Response::BodyEncoding::AUTO)
Expand All @@ -662,7 +663,8 @@ class Response: public Body {
cf(kj::mv(cf)),
urlList(kj::mv(urlList)),
webSocket(kj::mv(webSocket)),
bodyEncoding(bodyEncoding) {}
bodyEncoding(bodyEncoding),
hasEnabledWebSocketCompression(reader.getWebSocketCompression()) {}

// ---------------------------------------------------------------------------
// JS API
Expand All @@ -687,7 +689,8 @@ class Response: public Body {
static jsg::Ref<Response> constructor(
jsg::Lock& js,
jsg::Optional<kj::Maybe<Body::Initializer>> bodyInit,
jsg::Optional<Initializer> maybeInit);
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags);
// Response's constructor has two arguments: an optional, nullable body that defaults to null, and
// an optional initializer property bag. Tragically, the only way to express the "optional,
// nullable body that defaults to null" is with an Optional<Maybe<Body::Initializer>>. The reason
Expand All @@ -698,7 +701,7 @@ class Response: public Body {
// an Optional, so we need an inner Maybe to inhibit string coercion to Body::Initializer.

static jsg::Ref<Response> redirect(
jsg::Lock& js, kj::String url, jsg::Optional<int> status);
jsg::Lock& js, kj::String url, jsg::Optional<int> status, CompatibilityFlags::Reader flags);
// Constructs a redirection response. `status` must be a redirect status if given, otherwise it
// defaults to 302 (technically a non-conformity, but both Chrome and Firefox use this default).
//
Expand All @@ -724,18 +727,20 @@ class Response: public Body {
// client. However, we were conserned about possible side-effects and incorrect
// error reporting.

jsg::Ref<Response> clone(jsg::Lock& js);
jsg::Ref<Response> clone(jsg::Lock& js, CompatibilityFlags::Reader flags);

static jsg::Ref<Response> json_(
jsg::Lock& js,
v8::Local<v8::Value> any,
jsg::Optional<Initializer> maybeInit);
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags);

struct SendOptions {
bool allowWebSocket = false;
};
kj::Promise<DeferredProxy<void>> send(
jsg::Lock& js, kj::HttpService::Response& outer, SendOptions options);
jsg::Lock& js, kj::HttpService::Response& outer, SendOptions options,
kj::Maybe<const kj::HttpHeaders&> maybeReqHeaders);
// Helper not exposed to JavaScript.

int getStatus();
Expand Down Expand Up @@ -822,6 +827,8 @@ class Response: public Body {
// If this response is already encoded and the user don't want to encode the
// body twice, they can specify encodeBody: "manual".

bool hasEnabledWebSocketCompression = false;

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(headers, webSocket, cf);
}
Expand Down Expand Up @@ -880,6 +887,7 @@ jsg::Ref<Response> makeHttpResponse(
jsg::Lock& js, kj::HttpMethod method, kj::Vector<kj::Url> urlList,
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Own<kj::AsyncInputStream> body, kj::Maybe<jsg::Ref<WebSocket>> webSocket,
CompatibilityFlags::Reader flags,
Response::BodyEncoding bodyEncoding = Response::BodyEncoding::AUTO,
kj::Maybe<jsg::Ref<AbortSignal>> signal = nullptr);

Expand Down
30 changes: 28 additions & 2 deletions src/workerd/api/web-socket.c++
Expand Up @@ -121,7 +121,8 @@ bool validProtoToken(const kj::StringPtr protocol) {
jsg::Ref<WebSocket> WebSocket::constructor(
jsg::Lock& js,
kj::String url,
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols) {
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols,
CompatibilityFlags::Reader flags) {

auto& context = IoContext::current();

Expand Down Expand Up @@ -184,6 +185,14 @@ jsg::Ref<WebSocket> WebSocket::constructor(
}

auto connUrl = urlRecord.toString();
auto ws = jsg::alloc<WebSocket>(kj::mv(url), Locality::REMOTE);

if (!flags.getWebSocketCompression()) {
// If we haven't enabled the websocket compression feature flag, strip the header from the
// subrequest.
headers.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}

auto prom = client->openWebSocket(connUrl, headers)
.then([client = kj::mv(client), &context, wsErr = kj::mv(wsErr)]
(kj::HttpClient::WebSocketResponse response) mutable -> kj::Promise<PackedWebSocket> {
Expand Down Expand Up @@ -222,7 +231,6 @@ jsg::Ref<WebSocket> WebSocket::constructor(
KJ_UNREACHABLE
});

auto ws = jsg::alloc<WebSocket>(kj::mv(url), Locality::REMOTE);

ws->initConnection(js, kj::mv(prom));

Expand Down Expand Up @@ -516,6 +524,24 @@ bool WebSocket::isReleased() {
return farNative->state.is<Released>();
}

kj::Maybe<kj::String> WebSocket::getPreferredExtensions(kj::WebSocket::ExtensionsContext ctx) {
KJ_SWITCH_ONEOF(farNative->state) {
KJ_CASE_ONEOF(ws, AwaitingConnection) {
return nullptr;
}
KJ_CASE_ONEOF(container, AwaitingAcceptanceOrCoupling) {
return container.ws->getPreferredExtensions(ctx);
}
KJ_CASE_ONEOF(container, Accepted) {
return container.ws->getPreferredExtensions(ctx);
}
KJ_CASE_ONEOF(container, Released) {
return nullptr;
}
}
return nullptr;
}

kj::Maybe<kj::StringPtr> WebSocket::getUrl() {
return url.map([](kj::StringPtr value){ return value; });
}
Expand Down

0 comments on commit 41d09c1

Please sign in to comment.