Skip to content

Commit

Permalink
Add webSocketCompression compat flag
Browse files Browse the repository at this point in the history
If this flag is not set, we will strip `Sec-WebSocket-Extensions`
from subrequests (including `new WebSocket()`) and responses.
  • Loading branch information
MellowYarker committed Oct 4, 2022
1 parent 0df9fc6 commit e8d8715
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 24 deletions.
7 changes: 4 additions & 3 deletions src/workerd/api/cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ jsg::Unimplemented Cache::addAll(kj::Array<Request::Info> requests) {
}

jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
jsg::Lock& js, Request::Info requestOrUrl, jsg::Optional<CacheQueryOptions> options) {
jsg::Lock& js, Request::Info requestOrUrl, jsg::Optional<CacheQueryOptions> options,
CompatibilityFlags::Reader flags) {
// TODO(someday): Implement Cache API in preview.
auto& context = IoContext::current();
if (context.isFiddle()) {
Expand All @@ -94,7 +95,7 @@ jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
kj::HttpMethod::GET, validateUrl(jsRequest->getUrl()), requestHeaders, uint64_t(0));

return context.awaitIo(js, kj::mv(nativeRequest.response),
[httpClient = kj::mv(httpClient), &context]
[httpClient = kj::mv(httpClient), &context, flags = kj::mv(flags)]
(jsg::Lock& js, kj::HttpClient::Response&& response)
mutable -> jsg::Optional<jsg::Ref<Response>> {
response.body = response.body.attach(kj::mv(httpClient));
Expand Down Expand Up @@ -132,7 +133,7 @@ jsg::Promise<jsg::Optional<jsg::Ref<Response>>> Cache::match(
return makeHttpResponse(
js, kj::HttpMethod::GET, {},
response.statusCode, response.statusText, *response.headers,
kj::mv(response.body), nullptr);
kj::mv(response.body), nullptr, flags);
});
});
}
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class Cache: public jsg::Object {
jsg::Unimplemented addAll(kj::Array<Request::Info> requests);

jsg::Promise<jsg::Optional<jsg::Ref<Response>>> match(
jsg::Lock& js, Request::Info request, jsg::Optional<CacheQueryOptions> options);
jsg::Lock& js, Request::Info request, jsg::Optional<CacheQueryOptions> options,
CompatibilityFlags::Reader flags);

jsg::Promise<void> put(
jsg::Lock& js, Request::Info request, jsg::Ref<Response> response);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/html-rewriter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ jsg::Ref<Response> HTMLRewriter::transform(
auto ts = IdentityTransformStream::constructor(js);
auto bodySource = ts->getReadable()->removeSource(js);
auto body = jsg::alloc<ReadableStream>(ioContext, kj::mv(bodySource));
response = Response::constructor(js, kj::Maybe(kj::mv(body)), kj::mv(response));
response = Response::constructor(js, kj::Maybe(kj::mv(body)), kj::mv(response), featureFlags);

auto outputSink = ts->getWritable()->removeSink(js);

Expand Down
41 changes: 30 additions & 11 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,8 @@ kj::Maybe<kj::String> Request::serializeCfBlobJson(jsg::Lock& js) {
jsg::Ref<Response> Response::constructor(
jsg::Lock& js,
jsg::Optional<kj::Maybe<Body::Initializer>> optionalBodyInit,
jsg::Optional<Initializer> maybeInit) {
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags) {
auto bodyInit = kj::mv(optionalBodyInit).orDefault(nullptr);
Initializer init = kj::mv(maybeInit).orDefault(InitializerDict());

Expand Down Expand Up @@ -1087,11 +1088,12 @@ jsg::Ref<Response> Response::constructor(
}

return jsg::alloc<Response>(statusCode, KJ_ASSERT_NONNULL(kj::mv(statusText)), kj::mv(headers),
kj::mv(cf), kj::mv(body), nullptr, kj::mv(webSocket), bodyEncoding);
kj::mv(cf), kj::mv(body), flags, nullptr, kj::mv(webSocket),
bodyEncoding);
}

jsg::Ref<Response> Response::redirect(
jsg::Lock& js, kj::String url, jsg::Optional<int> status) {
jsg::Lock& js, kj::String url, jsg::Optional<int> status, CompatibilityFlags::Reader flags) {
auto statusCode = status.orDefault(302);
if (!isRedirectStatusCode(statusCode)) {
JSG_FAIL_REQUIRE(RangeError,
Expand Down Expand Up @@ -1119,13 +1121,14 @@ jsg::Ref<Response> Response::redirect(

auto statusText = KJ_ASSERT_NONNULL(defaultStatusText(statusCode));

return jsg::alloc<Response>(statusCode, kj::str(statusText), kj::mv(headers), nullptr, nullptr);
return jsg::alloc<Response>(statusCode, kj::str(statusText), kj::mv(headers), nullptr, nullptr, flags);
}

jsg::Ref<Response> Response::json_(
jsg::Lock& js,
v8::Local<v8::Value> any,
jsg::Optional<Initializer> maybeInit) {
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags) {

const auto maybeSetContentType = [](auto headers) {
if (!headers->hasLowerCase("content-type"_kj)) {
Expand Down Expand Up @@ -1180,10 +1183,10 @@ jsg::Ref<Response> Response::json_(
};
}
kj::String json = js.serializeJson(any);
return constructor(js, kj::Maybe(kj::mv(json)), kj::mv(maybeInit));
return constructor(js, kj::Maybe(kj::mv(json)), kj::mv(maybeInit), flags);
}

jsg::Ref<Response> Response::clone(jsg::Lock& js) {
jsg::Ref<Response> Response::clone(jsg::Lock& js, CompatibilityFlags::Reader flags) {
JSG_REQUIRE(webSocket == nullptr,
TypeError, "Cannot clone a response to a WebSocket handshake.");

Expand All @@ -1199,7 +1202,7 @@ jsg::Ref<Response> Response::clone(jsg::Lock& js) {

return jsg::alloc<Response>(
statusCode, kj::str(statusText), kj::mv(headersClone), kj::mv(cfClone), kj::mv(bodyClone),
kj::mv(urlListClone));
flags, kj::mv(urlListClone));
}

kj::Promise<DeferredProxy<void>> Response::send(
Expand Down Expand Up @@ -1232,7 +1235,8 @@ kj::Promise<DeferredProxy<void>> Response::send(
"Worker tried to return a WebSocket in a response to a request "
"which did not contain the header \"Upgrade: websocket\".");

if (outHeaders.get(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS) == nullptr) {
if (hasEnabledWebSocketCompression &&
outHeaders.get(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS) == nullptr) {
// Since workerd uses `MANUAL_COMPRESSION` mode for websocket compression, we need to
// pass the headers we want to support to `acceptWebSocket()`.
KJ_IF_MAYBE(config, (*ws)->getPreferredExtensions(kj::WebSocket::ExtensionsContext::RESPONSE)) {
Expand All @@ -1249,6 +1253,14 @@ kj::Promise<DeferredProxy<void>> Response::send(
}
}
}

if (!hasEnabledWebSocketCompression) {
// While we guard against an origin server including `Sec-WebSocket-Extensions` in a Response
// (we don't send the extension in an offer, and if the server includes it in a response we
// will reject the connection), a Worker could still explicitly add the header to a Response.
outHeaders.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}

auto clientSocket = outer.acceptWebSocket(outHeaders);
return (*ws)->couple(kj::mv(clientSocket));
} else KJ_IF_MAYBE(jsBody, getBody()) {
Expand Down Expand Up @@ -1454,6 +1466,11 @@ jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
urlList.back().toString(kj::Url::HTTP_PROXY_REQUEST).asBytes());

if (headers.isWebSocket()) {
if (!featureFlags.getWebSocketCompression()) {
// If we haven't enabled the websocket compression feature flag, strip the header from the
// subrequest.
headers.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}
return ioContext.awaitIo(js,
AbortSignal::maybeCancelWrap(signal, client->openWebSocket(url, headers)),
[fetcher = kj::mv(fetcher), featureFlags, jsRequest = kj::mv(jsRequest),
Expand Down Expand Up @@ -1482,6 +1499,7 @@ jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
response.statusCode, response.statusText, *response.headers,
kj::heap<NullInputStream>(),
jsg::alloc<WebSocket>(kj::mv(webSocket), WebSocket::REMOTE),
featureFlags,
Response::BodyEncoding::AUTO,
kj::mv(signal)));
}
Expand Down Expand Up @@ -1598,7 +1616,7 @@ jsg::Promise<jsg::Ref<Response>> handleHttpResponse(

auto result = makeHttpResponse(js, jsRequest->getMethodEnum(), kj::mv(urlList),
response.statusCode, response.statusText, *response.headers,
kj::mv(response.body), nullptr, Response::BodyEncoding::AUTO,
kj::mv(response.body), nullptr, featureFlags, Response::BodyEncoding::AUTO,
kj::mv(signal));

return js.resolvedPromise(kj::mv(result));
Expand Down Expand Up @@ -1691,6 +1709,7 @@ jsg::Ref<Response> makeHttpResponse(
jsg::Lock& js, kj::HttpMethod method, kj::Vector<kj::Url> urlListParam,
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Own<kj::AsyncInputStream> body, kj::Maybe<jsg::Ref<WebSocket>> webSocket,
CompatibilityFlags::Reader flags,
Response::BodyEncoding bodyEncoding,
kj::Maybe<jsg::Ref<AbortSignal>> signal) {
auto responseHeaders = jsg::alloc<Headers>(headers, Headers::Guard::RESPONSE);
Expand Down Expand Up @@ -1719,7 +1738,7 @@ jsg::Ref<Response> makeHttpResponse(
// TODO(someday): Fill response CF blob from somewhere?
return jsg::alloc<Response>(
statusCode, kj::str(statusText), kj::mv(responseHeaders),
nullptr, kj::mv(responseBody), kj::mv(urlList), kj::mv(webSocket));
nullptr, kj::mv(responseBody), flags, kj::mv(urlList), kj::mv(webSocket));
}

jsg::Promise<jsg::Ref<Response>> fetchImplNoOutputLock(
Expand Down
17 changes: 12 additions & 5 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ class Response: public Body {

Response(int statusCode, kj::String statusText, jsg::Ref<Headers> headers,
kj::Maybe<jsg::V8Ref<v8::Object>> cf, kj::Maybe<Body::ExtractedBody> body,
CompatibilityFlags::Reader reader,
kj::Array<kj::String> urlList = {},
kj::Maybe<jsg::Ref<WebSocket>> webSocket = nullptr,
Response::BodyEncoding bodyEncoding = Response::BodyEncoding::AUTO)
Expand All @@ -662,7 +663,8 @@ class Response: public Body {
cf(kj::mv(cf)),
urlList(kj::mv(urlList)),
webSocket(kj::mv(webSocket)),
bodyEncoding(bodyEncoding) {}
bodyEncoding(bodyEncoding),
hasEnabledWebSocketCompression(reader.getWebSocketCompression()) {}

// ---------------------------------------------------------------------------
// JS API
Expand All @@ -687,7 +689,8 @@ class Response: public Body {
static jsg::Ref<Response> constructor(
jsg::Lock& js,
jsg::Optional<kj::Maybe<Body::Initializer>> bodyInit,
jsg::Optional<Initializer> maybeInit);
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags);
// Response's constructor has two arguments: an optional, nullable body that defaults to null, and
// an optional initializer property bag. Tragically, the only way to express the "optional,
// nullable body that defaults to null" is with an Optional<Maybe<Body::Initializer>>. The reason
Expand All @@ -698,7 +701,7 @@ class Response: public Body {
// an Optional, so we need an inner Maybe to inhibit string coercion to Body::Initializer.

static jsg::Ref<Response> redirect(
jsg::Lock& js, kj::String url, jsg::Optional<int> status);
jsg::Lock& js, kj::String url, jsg::Optional<int> status, CompatibilityFlags::Reader flags);
// Constructs a redirection response. `status` must be a redirect status if given, otherwise it
// defaults to 302 (technically a non-conformity, but both Chrome and Firefox use this default).
//
Expand All @@ -724,12 +727,13 @@ class Response: public Body {
// client. However, we were conserned about possible side-effects and incorrect
// error reporting.

jsg::Ref<Response> clone(jsg::Lock& js);
jsg::Ref<Response> clone(jsg::Lock& js, CompatibilityFlags::Reader flags);

static jsg::Ref<Response> json_(
jsg::Lock& js,
v8::Local<v8::Value> any,
jsg::Optional<Initializer> maybeInit);
jsg::Optional<Initializer> maybeInit,
CompatibilityFlags::Reader flags);

struct SendOptions {
bool allowWebSocket = false;
Expand Down Expand Up @@ -823,6 +827,8 @@ class Response: public Body {
// If this response is already encoded and the user don't want to encode the
// body twice, they can specify encodeBody: "manual".

bool hasEnabledWebSocketCompression = false;

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(headers, webSocket, cf);
}
Expand Down Expand Up @@ -881,6 +887,7 @@ jsg::Ref<Response> makeHttpResponse(
jsg::Lock& js, kj::HttpMethod method, kj::Vector<kj::Url> urlList,
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Own<kj::AsyncInputStream> body, kj::Maybe<jsg::Ref<WebSocket>> webSocket,
CompatibilityFlags::Reader flags,
Response::BodyEncoding bodyEncoding = Response::BodyEncoding::AUTO,
kj::Maybe<jsg::Ref<AbortSignal>> signal = nullptr);

Expand Down
12 changes: 10 additions & 2 deletions src/workerd/api/web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ bool validProtoToken(const kj::StringPtr protocol) {
jsg::Ref<WebSocket> WebSocket::constructor(
jsg::Lock& js,
kj::String url,
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols) {
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols,
CompatibilityFlags::Reader flags) {

auto& context = IoContext::current();

Expand Down Expand Up @@ -184,6 +185,14 @@ jsg::Ref<WebSocket> WebSocket::constructor(
}

auto connUrl = urlRecord.toString();
auto ws = jsg::alloc<WebSocket>(kj::mv(url), Locality::REMOTE);

if (!flags.getWebSocketCompression()) {
// If we haven't enabled the websocket compression feature flag, strip the header from the
// subrequest.
headers.unset(kj::HttpHeaderId::SEC_WEBSOCKET_EXTENSIONS);
}

auto prom = client->openWebSocket(connUrl, headers)
.then([client = kj::mv(client), &context, wsErr = kj::mv(wsErr)]
(kj::HttpClient::WebSocketResponse response) mutable -> kj::Promise<PackedWebSocket> {
Expand Down Expand Up @@ -222,7 +231,6 @@ jsg::Ref<WebSocket> WebSocket::constructor(
KJ_UNREACHABLE
});

auto ws = jsg::alloc<WebSocket>(kj::mv(url), Locality::REMOTE);

ws->initConnection(js, kj::mv(prom));

Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class WebSocket: public EventTarget {
// JS API.

static jsg::Ref<WebSocket> constructor(jsg::Lock& js, kj::String url,
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols);
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols,
CompatibilityFlags::Reader flags);
// Creates a new outbound WebSocket.

void accept(jsg::Lock& js);
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,13 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
obsolete19 @19 :Bool
$compatEnableFlag("durable_object_rename");
# Obsolete flag. Has no effect.

webSocketCompression @20 :Bool
$compatEnableFlag("web_socket_compression");
# Enables WebSocket compression. Without this flag, all attempts to negotiate compression will
# be refused, so WebSockets will never use compression. With this flag, the system will
# automatically negotiate the use of the permesssage-deflate extension where appropriate.
# The Worker can also request specific compression settings by specifying a valid
# Sec-WebSocket-Extensions header, or setting the header to the empty string to explicitly
# request that no compression be used.
}

0 comments on commit e8d8715

Please sign in to comment.