diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..2bc89f310 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Created by https://www.gitignore.io/api/dart +# Edit at https://www.gitignore.io/?templates=dart + +### Dart ### +# See https://www.dartlang.org/guides/libraries/private-files + +# Files and directories created by pub +.dart_tool/ +.packages +build/ +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ + +# Avoid committing generated Javascript files: +*.dart.js +*.info.json # Produced by the --dump-info flag. +*.js # When generated by dart2js. Don't specify *.js if your + # project includes source files written in JavaScript. +*.js_ +*.js.deps +*.js.map + +# End of https://www.gitignore.io/api/dart + +# Generated protocol buffer files. +*.pb*.dart diff --git a/lib/src/util/length_delimited_transformer.dart b/lib/src/util/length_delimited_transformer.dart new file mode 100644 index 000000000..c516fb5fb --- /dev/null +++ b/lib/src/util/length_delimited_transformer.dart @@ -0,0 +1,114 @@ +// Copyright 2019 Google Inc. Use of this source code is governed by an +// MIT-style license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +import 'dart:async'; +import 'dart:math' as math; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; + +/// A [StreamChannelTransformer] that converts a channel that sends and receives +/// arbitrarily-chunked binary data to one that sends and receives packets of +/// set length using [lengthDelimitedEncoder] and [lengthDelimitedDecoder]. +final StreamChannelTransformer> lengthDelimited = + StreamChannelTransformer>(lengthDelimitedDecoder, + StreamSinkTransformer.fromStreamTransformer(lengthDelimitedEncoder)); + +/// A transformer that converts an arbitrarily-chunked byte stream where each +/// packet is prefixed with a 32-bit little-endian number indicating its length +/// into a stream of packet contents. +final lengthDelimitedDecoder = + StreamTransformer, Uint8List>.fromBind((stream) { + // The buffer into which the four-byte little-endian length of the next packet + // will be written. + var lengthBuffer = Uint8List(4); + + // The index of the next byte to write to [lengthBuffer]. Once this is equal + // to [lengthBuffer.length], the full length is available. + var lengthBufferIndex = 0; + + // The length of the next message, in bytes, read from [lengthBuffer] once + // it's full. + int nextMessageLength; + + // The buffer into which the packet data itself is written. Initialized once + // [nextMessageLength] is known. + Uint8List buffer; + + // The index of the next byte to write to [buffer]. Once this is equal to + // [buffer.length] (or equivalently [nextMessageLength]), the full packet is + // available. + int bufferIndex; + + // It seems a little silly to use a nested [StreamTransformer] here, but we + // need the outer one to establish a closure context so we can share state + // across different input chunks, and the inner one takes care of all the + // boilerplate of creating a new stream based on [stream]. + return stream + .transform(StreamTransformer.fromHandlers(handleData: (chunk, sink) { + // The index of the next byte to read from [chunk]. We have to track this + // because the chunk may contain the length *and* the message, or even + // multiple messages. + var i = 0; + + // Adds bytes from [chunk] to [destination] at [destinationIndex] without + // overflowing the bounds of [destination], and increments [i] for each byte + // written. + // + // Returns the number of bytes written. + int writeFromChunk(Uint8List destination, int destinationIndex) { + var bytesToWrite = + math.min(destination.length - destinationIndex, chunk.length - i); + destination.setRange( + destinationIndex, destinationIndex + bytesToWrite, chunk, i); + i += bytesToWrite; + return bytesToWrite; + } + + while (i < chunk.length) { + // We can be in one of two states here: + // + // * Both [nextMessageLength] and [buffer] are `null`, in which case we're + // waiting until we have four bytes in [lengthBuffer] to know how big of + // a buffer to allocate. + // + // * Neither [nextMessageLength] nor [buffer] are `null`, in which case + // we're waiting for [buffer] to have [nextMessageLength] in it before + // we send it to [queue.local.sink] and start waiting for the next + // message. + if (nextMessageLength == null) { + lengthBufferIndex += writeFromChunk(lengthBuffer, lengthBufferIndex); + if (lengthBufferIndex < 4) return; + + nextMessageLength = + ByteData.view(lengthBuffer.buffer).getUint32(0, Endian.little); + buffer = Uint8List(nextMessageLength); + bufferIndex = 0; + } + + bufferIndex += writeFromChunk(buffer, bufferIndex); + if (bufferIndex < nextMessageLength) return; + + sink.add(Uint8List.view(buffer.buffer, 0, nextMessageLength)); + lengthBufferIndex = 0; + nextMessageLength = null; + buffer = null; + bufferIndex = null; + } + })); +}); + +/// A transformer that adds 32-bit little-endian numbers indicating the length +/// of each packet, so that they can safely be sent over a medium that doesn't +/// preserve packet boundaries. +final lengthDelimitedEncoder = + StreamTransformer>.fromHandlers( + handleData: (message, sink) { + var messageLength = Uint8List(4); + ByteData.view(messageLength.buffer) + .setUint32(0, message.length, Endian.little); + sink.add(messageLength); + sink.add(message); +}); diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 000000000..1df7cb78e --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,15 @@ +name: sass_embedded +version: 1.0.0-dev +description: An implementation of the Sass embedded protocol using Dart Sass. +author: Sass Team +homepage: https://github.com/sass/dart-sass-embedded + +environment: + sdk: '>=2.4.0 <3.0.0' + +dependencies: + async: ">=1.13.0 <3.0.0" + stream_channel: ">=1.6.0 <3.0.0" + +dev_dependencies: + test: ^1.0.0 diff --git a/test/length_delimited_test.dart b/test/length_delimited_test.dart new file mode 100644 index 000000000..f309ea7eb --- /dev/null +++ b/test/length_delimited_test.dart @@ -0,0 +1,137 @@ +// Copyright 2019 Google Inc. Use of this source code is governed by an +// MIT-style license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:sass_embedded/src/util/length_delimited_transformer.dart'; + +import 'package:async/async.dart'; +import 'package:test/test.dart'; + +void main() { + group("encoder", () { + Sink> sink; + Stream> stream; + setUp(() { + var controller = StreamController>(); + sink = controller.sink; + stream = controller.stream.transform(lengthDelimitedEncoder); + }); + + test("encodes an empty message", () { + sink.add([]); + sink.close(); + expect(collectBytes(stream), completion(equals([0, 0, 0, 0]))); + }); + + test("encodes a message of length 1", () { + sink.add([123]); + sink.close(); + expect(collectBytes(stream), completion(equals([1, 0, 0, 0, 123]))); + }); + + test("encodes a message of length greater than 256", () { + sink.add(List.filled(300, 1)); + sink.close(); + expect(collectBytes(stream), + completion(equals([44, 1, 0, 0, ...List.filled(300, 1)]))); + }); + + test("encodes multiple messages", () { + sink.add([10]); + sink.add([20, 30]); + sink.add([40, 50, 60]); + sink.close(); + expect( + collectBytes(stream), + completion(equals( + [1, 0, 0, 0, 10, 2, 0, 0, 0, 20, 30, 3, 0, 0, 0, 40, 50, 60]))); + }); + }); + + group("decoder", () { + Sink> sink; + StreamQueue queue; + setUp(() { + var controller = StreamController>(); + sink = controller.sink; + queue = StreamQueue(controller.stream.transform(lengthDelimitedDecoder)); + }); + + group("decodes an empty message", () { + test("from a single chunk", () { + sink.add([0, 0, 0, 0]); + expect(queue, emits(isEmpty)); + }); + + test("from multiple chunks", () { + sink.add([0, 0]); + sink.add([0, 0]); + expect(queue, emits(isEmpty)); + }); + + test("from one chunk per byte", () { + sink..add([0])..add([0])..add([0])..add([0]); + expect(queue, emits(isEmpty)); + }); + + test("from a chunk that contains more data", () { + sink.add([0, 0, 0, 0, 1, 0, 0, 0, 100]); + expect(queue, emits(isEmpty)); + }); + }); + + group("decodes a longer message", () { + test("from a single chunk", () { + sink.add([4, 0, 0, 0, 1, 2, 3, 4]); + expect(queue, emits([1, 2, 3, 4])); + }); + + test("from multiple chunks", () { + sink..add([4, 0])..add([0, 0, 1, 2])..add([3, 4]); + expect(queue, emits([1, 2, 3, 4])); + }); + + test("from one chunk per byte", () { + for (var byte in [4, 0, 0, 0, 1, 2, 3, 4]) { + sink.add([byte]); + } + expect(queue, emits([1, 2, 3, 4])); + }); + + test("from a chunk that contains more data", () { + sink.add([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]); + expect(queue, emits([1, 2, 3, 4])); + }); + + test("of length greater than 256", () { + sink.add([44, 1, 0, 0, ...List.filled(300, 1)]); + expect(queue, emits(List.filled(300, 1))); + }); + }); + + group("decodes multiple messages", () { + test("from single chunk", () { + sink.add([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]); + expect(queue, emits([1, 2, 3, 4])); + expect(queue, emits([101, 102])); + }); + + test("from multiple chunks", () { + sink..add([4, 0])..add([0, 0, 1, 2, 3, 4, 2, 0])..add([0, 0, 101, 102]); + expect(queue, emits([1, 2, 3, 4])); + expect(queue, emits([101, 102])); + }); + + test("from one chunk per byte", () { + for (var byte in [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) { + sink.add([byte]); + } + expect(queue, emits([1, 2, 3, 4])); + expect(queue, emits([101, 102])); + }); + }); + }); +}