Skip to content

Commit

Permalink
Merge pull request #14 from cloudflare/kenton/sync-with-internal
Browse files Browse the repository at this point in the history
Synchronize changes with upstream repo.
  • Loading branch information
kentonv committed Sep 23, 2022
2 parents da70917 + a6390be commit 47bbfb0
Show file tree
Hide file tree
Showing 15 changed files with 579 additions and 38 deletions.
5 changes: 3 additions & 2 deletions src/workerd/api/global-scope.c++
Expand Up @@ -385,8 +385,9 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();

if (!jsg::isDoNotLogException(e.getDescription())) {
KJ_LOG(ERROR, e);
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
LOG_EXCEPTION("alarmRun"_kj, e);
}
EventOutcome outcome = EventOutcome::EXCEPTION;
KJ_IF_MAYBE(status, context.getLimitEnforcer().getLimitsExceeded()) {
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/api/r2-rpc.c++
Expand Up @@ -81,7 +81,11 @@ kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,
response.body.attach(kj::mv(client)), getContentEncoding(context, *response.headers),
context);
auto metadataSize = atoi((metadata).cStr());
KJ_REQUIRE(metadataSize <= 256 * 1024, "R2 metadata size seems way too large");
// R2 itself will try to stick to a cap of 256 KiB of response here. However for listing
// sometimes our heuristics have corner cases. This way we're more lenient in case someone
// finds a corner case for the heuristic so that we don't fail the GET with an opaque
// internal error.
KJ_REQUIRE(metadataSize <= 1024 * 1024, "R2 metadata size seems way too large");
auto metadataBuffer = kj::heapArray<char>(metadataSize);
auto promise = stream->tryRead((void*)metadataBuffer.begin(),
metadataBuffer.size(), metadataBuffer.size());
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/r2-rpc.h
Expand Up @@ -8,6 +8,10 @@
#include <workerd/jsg/jsg.h>

namespace workerd::api {
// NOTE: We don't currently actually use this as a structured object (hence the `kj::Own<R2Error>`
// that we see pop up).
// TODO(soon): Switch to structured objects and use jsg::Ref<R2Error> instead of kj::Own<R2Error>
// to maintain ownership.
class R2Error: public jsg::Object {
public:
R2Error(uint v4Code, kj::String message): v4Code(v4Code), message(kj::mv(message)) {}
Expand Down
193 changes: 186 additions & 7 deletions src/workerd/api/streams/queue.c++
Expand Up @@ -188,6 +188,16 @@ void ValueQueue::handleRead(
}
}

bool ValueQueue::handleMaybeClose(
jsg::Lock&js,
ConsumerImpl::Ready& state,
ConsumerImpl& consumer,
QueueImpl& queue) {
// If the value queue is not yet empty we have to keep waiting for more reads to consume it.
// Return false to indicate that we cannot close yet.
return false;
}

size_t ValueQueue::getConsumerCount() { return impl.getConsumerCount(); }

#pragma endregion ValueQueue
Expand All @@ -208,11 +218,23 @@ void maybeInvalidateByobRequest(kj::Maybe<ByteQueue::ByobRequest&>& req) {
} // namespace

void ByteQueue::ReadRequest::resolveAsDone(jsg::Lock& js) {
pullInto.store.trim(pullInto.store.size() - pullInto.filled);
resolver.resolve(ReadResult {
.value = js.v8Ref(pullInto.store.createHandle(js)),
.done = true
});
if (pullInto.filled > 0) {
// There's been at least some data written, we need to respond but not
// set done to true since that's what the streams spec requires.
pullInto.store.trim(pullInto.store.size() - pullInto.filled);
resolver.resolve(ReadResult {
.value = js.v8Ref(pullInto.store.createHandle(js)),
.done = false
});
} else {
// Otherwise, we set the length to zero
pullInto.store.trim(pullInto.store.size());
KJ_ASSERT(pullInto.store.size() == 0);
resolver.resolve(ReadResult {
.value = js.v8Ref(pullInto.store.createHandle(js)),
.done = true
});
}
maybeInvalidateByobRequest(byobReadRequest);
}

Expand Down Expand Up @@ -427,7 +449,9 @@ v8::Local<v8::Uint8Array> ByteQueue::ByobRequest::getView(jsg::Lock& js) {

ByteQueue::ByteQueue(size_t highWaterMark) : impl(highWaterMark) {}

void ByteQueue::close(jsg::Lock& js) { impl.close(js); }
void ByteQueue::close(jsg::Lock& js) {
impl.close(js);
}

ssize_t ByteQueue::desiredSize() const { return impl.desiredSize(); }

Expand Down Expand Up @@ -688,7 +712,6 @@ void ByteQueue::handleRead(
if (state.queueTotalSize > 0 && consume(state.queueTotalSize)) {
return request.resolveAsDone(js);
}

return pendingRead();
}

Expand Down Expand Up @@ -720,6 +743,162 @@ void ByteQueue::handleRead(
}
}

bool ByteQueue::handleMaybeClose(
jsg::Lock&js,
ConsumerImpl::Ready& state,
ConsumerImpl& consumer,
QueueImpl& queue) {
// This is called when we know that we are closing and we still have data in
// the queue. We want to see if we can drain as much of it into pending reads
// as possible. If we're able to drain all of it, then yay! We can go ahead and
// close. Otherwise we stay open and wait for more reads to consume the rest.

// We should only be here if there is data remaining in the queue.
KJ_ASSERT(state.queueTotalSize > 0);

// We should also only be here if the consumer is closing.
KJ_ASSERT(consumer.isClosing());

const auto consume = [&] {
// Consume will copy as much of the remaining data in the buffer as possible
// to the next pending read. If the remaining data can fit into the remaining
// space in the read, awesome, we've consumed everything and we will return
// true. If the remaining data cannot fit into the remaining space in the read,
// then we'll return false to indicate that there's more data to consume. In
// either case, the pending read is popped off the pending queue and resolved.

KJ_ASSERT(!state.readRequests.empty());
auto& pending = state.readRequests.front();

while (!state.buffer.empty()) {
auto& next = state.buffer.front();
KJ_SWITCH_ONEOF(next) {
KJ_CASE_ONEOF(c, ConsumerImpl::Close) {
// We've reached the end! queueTotalSize should be zero. We need to
// resolve and pop the current read and return true to indicate that
// we're all done.
//
// Technically, we really shouldn't get here but the case is covered
// just in case.
KJ_ASSERT(state.queueTotalSize == 0);
pending.resolve(js);
state.readRequests.pop_front();
return true;
}
KJ_CASE_ONEOF(entry, QueueEntry) {
auto sourcePtr = entry.entry->toArrayPtr();
auto sourceSize = sourcePtr.size() - entry.offset;

auto destPtr = pending.pullInto.store.asArrayPtr().begin() + pending.pullInto.filled;
auto destAmount = pending.pullInto.store.size() - pending.pullInto.filled;

// There should be space available to copy into and data to copy from, or
// something else went wrong.
KJ_ASSERT(destAmount > 0);
KJ_ASSERT(sourceSize > 0);

// sourceSize is the amount of data remaining in the current entry to copy.
// destAmount is the amount of space remaining to be filled in the pending read.
auto amountToCopy = kj::min(sourceSize, destAmount);

auto sourceStart = sourcePtr.begin() + entry.offset;
auto sourceEnd = sourceStart + amountToCopy;

// It shouldn't be possible for sourceEnd to extend past the sourcePtr.end()
// but let's make sure just to be safe.
KJ_ASSERT(sourceEnd <= sourcePtr.end());

// Safely copy amountToCopy bytes from the source into the destination.
std::copy(sourceStart, sourceEnd, destPtr);

pending.pullInto.filled += amountToCopy;
state.queueTotalSize -= amountToCopy;
entry.offset += amountToCopy;

KJ_ASSERT(entry.offset <= sourcePtr.size());

if (sourceEnd == sourcePtr.end()) {
// If sourceEnd is equal to sourcePtr.end(), we've consumed the entire entry
// and we can free it.
auto released = kj::mv(next);
state.buffer.pop_front();

if (amountToCopy == destAmount) {
// If the amountToCopy is equal to destAmount, then we've completely filled
// this read request with the data remaining. Resolve the read request. If
// state.queueTotalSize happens to be zero, we can safely indicate that we
// have read the remaining data as this may have been the last actual value
// entry in the buffer.
pending.resolve(js);
state.readRequests.pop_front();

if (state.queueTotalSize == 0) {
// If the queueTotalSize is zero at this point, the next item in the queue
// must be a close and we can return true. All of the data has been consumed.
KJ_ASSERT(state.buffer.front().is<ConsumerImpl::Close>());
return true;
}

// Otherwise, there's still data to consume, return false here to move on
// to the next pending read (if any).
return false;
}

// We know that amountToCopy cannot be greater than destAmount because
// of the kj::min above.

// Continuing here means that our pending read still has space to fill
// and we might still have value entries to fill it. We'll iterate around
// and see where we get.
continue;
}

// This read did not consume everything in this entry but doesn't have
// any more space to fill. We will resolve this read and return false
// to indicate that the outer loop should continue with the next read
// request if there is one.

// At this point, it should be impossible for state.queueTotalSize to
// be zero because there is still data remaining to be consumed in this
// buffer.
KJ_ASSERT(state.queueTotalSize > 0);

pending.resolve(js);
state.readRequests.pop_front();
return false;
}
}
KJ_UNREACHABLE;
}

return state.queueTotalSize == 0;
};

// We can only consume here if there are pending reads!
while (!state.readRequests.empty()) {
// We ignore the read request atLeast here since we are closing. Our goal is to
// consume as much of the data as possible.

if (consume()) {
// If consume returns true, we reached the end and have no more data to
// consume. That's a good thing! It means we can go ahead and close down.
return true;
}

// If consume() returns false, there is still data left to consume in the queue.
// We will loop around and try again so long as there are still read requests
// pending.
}

// At this point, we shouldn't have any read requests and there should be data
// left in the queue. We have to keep waiting for more reads to consume the
// remaining data.
KJ_ASSERT(state.queueTotalSize > 0);
KJ_ASSERT(state.readRequests.empty());

return false;
}

kj::Maybe<kj::Own<ByteQueue::ByobRequest>> ByteQueue::nextPendingByobReadRequest() {
KJ_IF_MAYBE(state, impl.getState()) {
while (!state->pendingByobReadRequests.empty()) {
Expand Down
24 changes: 20 additions & 4 deletions src/workerd/api/streams/queue.h
Expand Up @@ -549,10 +549,18 @@ class ConsumerImpl final {
// released it.
}
} else {
// Otherwise, if the buffer is empty isClosing() is true, resolve the
// remaining read promises with close indicators and update the state
// to closed. If the buffer is not empty, do nothing.
if (empty() && isClosing()) {
// Otherwise, if isClosing() is true...
if (isClosing()) {
if (!empty() && !Self::handleMaybeClose(js, *ready, *this, queue)) {
// If the queue is not empty, we'll have the implementation see
// if it can drain the remaining data into pending reads. If handleMaybeClose
// returns false, then it could not and we can't yet close. If it returns true,
// yay! Our queue is empty and we can continue closing down.
KJ_ASSERT(!empty()); // We're still not empty
return;
}

KJ_ASSERT(empty());
KJ_REQUIRE(ready->buffer.size() == 1); // The close should be the only item remaining.
for (auto& request : ready->readRequests) {
request.resolveAsDone(js);
Expand Down Expand Up @@ -698,6 +706,10 @@ class ValueQueue final {
ConsumerImpl& consumer,
QueueImpl& queue,
ReadRequest request);
static bool handleMaybeClose(jsg::Lock& js,
ConsumerImpl::Ready& state,
ConsumerImpl& consumer,
QueueImpl& queue);

friend ConsumerImpl;
};
Expand Down Expand Up @@ -892,6 +904,10 @@ class ByteQueue final {
ConsumerImpl& consumer,
QueueImpl& queue,
ReadRequest request);
static bool handleMaybeClose(jsg::Lock& js,
ConsumerImpl::Ready& state,
ConsumerImpl& consumer,
QueueImpl& queue);

friend ConsumerImpl;
friend class Consumer;
Expand Down
33 changes: 18 additions & 15 deletions src/workerd/api/urlpattern.c++
Expand Up @@ -72,8 +72,8 @@ namespace {

using RegexAndNameList = std::pair<jsg::V8Ref<v8::RegExp>, kj::Array<jsg::UsvString>>;

constexpr const char* SYNTAX_ERROR = "Syntax error in URLPattern.";
constexpr const char* BASEURL_ERROR = "A baseURL is not allowed when input is an object";
constexpr const char* SYNTAX_ERROR = "Syntax error in URLPattern";
constexpr const char* BASEURL_ERROR = "A baseURL is not allowed when input is an object.";

struct Common {
jsg::UsvString DUMMY_PROTOCOL = jsg::usv('d', 'u', 'm', 'm', 'y');
Expand Down Expand Up @@ -387,19 +387,22 @@ Part::Modifier maybeTokenToModifier(kj::Maybe<Token&> modifierToken) {
// ordered to make it so the *most likely* matches will be checked first.
// TODO (later): Investigate whether there is a more efficient way to handle this.
bool protocolComponentMatchesSpecialScheme(jsg::Lock& js, URLPatternComponent& component) {
#define V(name) \
do { \
auto handle = component.regex.getHandle(js); \
auto result = \
jsg::check(handle->Exec(js.v8Isolate->GetCurrentContext(), \
jsg::v8Str(js.v8Isolate, #name))); \
if (!result->IsNullOrUndefined()) { \
return true; \
} \
} while (false);
SPECIAL_SCHEME(V)
auto handle = component.regex.getHandle(js);
auto context = js.v8Isolate->GetCurrentContext();

const auto checkIt = [&handle, &js, &context](const char* name) {
return !jsg::check(handle->Exec(context, jsg::v8Str(js.v8Isolate, name)))->IsNullOrUndefined();
};

return js.tryCatch([&] {
#define V(name) if (checkIt(#name)) return true;
SPECIAL_SCHEME(V)
#undef V
return false;
return false;
}, [&](auto&& exception) {
// We ignore the exception here and just return false;
return false;
});
}
#undef SPECIAL_SCHEME

Expand Down Expand Up @@ -436,7 +439,7 @@ jsg::UsvString canonicalizeProtocol(jsg::UsvStringPtr input, kj::Maybe<jsg::UsvS
auto result = JSG_REQUIRE_NONNULL(
url::URL::parse(str, nullptr, dummyUrl),
TypeError,
"Invalid protocol scheme");
"Invalid protocol scheme.");

return kj::mv(result.scheme);
}
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/io/worker-entrypoint.c++
Expand Up @@ -228,7 +228,11 @@ kj::Promise<void> WorkerEntrypoint::request(
// `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type
// instead of `loggedExceptionEarlier`. It would save use some work.
auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription());
exception.setDescription(kj::str("remote.", description));
if (!description.startsWith("remote.")) {
// If we already were annotated as remote from some other worker entrypoint, no point
// adding an additional prefix.
exception.setDescription(kj::str("remote.", description));
}
return kj::mv(exception);
}

Expand Down

0 comments on commit 47bbfb0

Please sign in to comment.