Skip to content

Commit

Permalink
Add a length-delimited StreamChannelTransformer (#1)
Browse files Browse the repository at this point in the history
This implements the packet scheme that the embedded protocol uses when
communicating over stdin and stdout.
  • Loading branch information
nex3 committed Oct 29, 2019
1 parent 0f2c993 commit be35dbb
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 0 deletions.
30 changes: 30 additions & 0 deletions .gitignore
@@ -0,0 +1,30 @@
# Created by https://www.gitignore.io/api/dart
# Edit at https://www.gitignore.io/?templates=dart

### Dart ###
# See https://www.dartlang.org/guides/libraries/private-files

# Files and directories created by pub
.dart_tool/
.packages
build/
# If you're building an application, you may want to check-in your pubspec.lock
pubspec.lock

# Directory created by dartdoc
# If you don't generate documentation locally you can remove this line.
doc/api/

# Avoid committing generated Javascript files:
*.dart.js
*.info.json # Produced by the --dump-info flag.
*.js # When generated by dart2js. Don't specify *.js if your
# project includes source files written in JavaScript.
*.js_
*.js.deps
*.js.map

# End of https://www.gitignore.io/api/dart

# Generated protocol buffer files.
*.pb*.dart
114 changes: 114 additions & 0 deletions lib/src/util/length_delimited_transformer.dart
@@ -0,0 +1,114 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

import 'dart:async';
import 'dart:math' as math;
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';

/// A [StreamChannelTransformer] that converts a channel that sends and receives
/// arbitrarily-chunked binary data to one that sends and receives packets of
/// set length using [lengthDelimitedEncoder] and [lengthDelimitedDecoder].
final StreamChannelTransformer<Uint8List, List<int>> lengthDelimited =
StreamChannelTransformer<Uint8List, List<int>>(lengthDelimitedDecoder,
StreamSinkTransformer.fromStreamTransformer(lengthDelimitedEncoder));

/// A transformer that converts an arbitrarily-chunked byte stream where each
/// packet is prefixed with a 32-bit little-endian number indicating its length
/// into a stream of packet contents.
final lengthDelimitedDecoder =
StreamTransformer<List<int>, Uint8List>.fromBind((stream) {
// The buffer into which the four-byte little-endian length of the next packet
// will be written.
var lengthBuffer = Uint8List(4);

// The index of the next byte to write to [lengthBuffer]. Once this is equal
// to [lengthBuffer.length], the full length is available.
var lengthBufferIndex = 0;

// The length of the next message, in bytes, read from [lengthBuffer] once
// it's full.
int nextMessageLength;

// The buffer into which the packet data itself is written. Initialized once
// [nextMessageLength] is known.
Uint8List buffer;

// The index of the next byte to write to [buffer]. Once this is equal to
// [buffer.length] (or equivalently [nextMessageLength]), the full packet is
// available.
int bufferIndex;

// It seems a little silly to use a nested [StreamTransformer] here, but we
// need the outer one to establish a closure context so we can share state
// across different input chunks, and the inner one takes care of all the
// boilerplate of creating a new stream based on [stream].
return stream
.transform(StreamTransformer.fromHandlers(handleData: (chunk, sink) {
// The index of the next byte to read from [chunk]. We have to track this
// because the chunk may contain the length *and* the message, or even
// multiple messages.
var i = 0;

// Adds bytes from [chunk] to [destination] at [destinationIndex] without
// overflowing the bounds of [destination], and increments [i] for each byte
// written.
//
// Returns the number of bytes written.
int writeFromChunk(Uint8List destination, int destinationIndex) {
var bytesToWrite =
math.min(destination.length - destinationIndex, chunk.length - i);
destination.setRange(
destinationIndex, destinationIndex + bytesToWrite, chunk, i);
i += bytesToWrite;
return bytesToWrite;
}

while (i < chunk.length) {
// We can be in one of two states here:
//
// * Both [nextMessageLength] and [buffer] are `null`, in which case we're
// waiting until we have four bytes in [lengthBuffer] to know how big of
// a buffer to allocate.
//
// * Neither [nextMessageLength] nor [buffer] are `null`, in which case
// we're waiting for [buffer] to have [nextMessageLength] in it before
// we send it to [queue.local.sink] and start waiting for the next
// message.
if (nextMessageLength == null) {
lengthBufferIndex += writeFromChunk(lengthBuffer, lengthBufferIndex);
if (lengthBufferIndex < 4) return;

nextMessageLength =
ByteData.view(lengthBuffer.buffer).getUint32(0, Endian.little);
buffer = Uint8List(nextMessageLength);
bufferIndex = 0;
}

bufferIndex += writeFromChunk(buffer, bufferIndex);
if (bufferIndex < nextMessageLength) return;

sink.add(Uint8List.view(buffer.buffer, 0, nextMessageLength));
lengthBufferIndex = 0;
nextMessageLength = null;
buffer = null;
bufferIndex = null;
}
}));
});

/// A transformer that adds 32-bit little-endian numbers indicating the length
/// of each packet, so that they can safely be sent over a medium that doesn't
/// preserve packet boundaries.
final lengthDelimitedEncoder =
StreamTransformer<Uint8List, List<int>>.fromHandlers(
handleData: (message, sink) {
var messageLength = Uint8List(4);
ByteData.view(messageLength.buffer)
.setUint32(0, message.length, Endian.little);
sink.add(messageLength);
sink.add(message);
});
15 changes: 15 additions & 0 deletions pubspec.yaml
@@ -0,0 +1,15 @@
name: sass_embedded
version: 1.0.0-dev
description: An implementation of the Sass embedded protocol using Dart Sass.
author: Sass Team
homepage: https://github.com/sass/dart-sass-embedded

environment:
sdk: '>=2.4.0 <3.0.0'

dependencies:
async: ">=1.13.0 <3.0.0"
stream_channel: ">=1.6.0 <3.0.0"

dev_dependencies:
test: ^1.0.0
137 changes: 137 additions & 0 deletions test/length_delimited_test.dart
@@ -0,0 +1,137 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

import 'dart:async';
import 'dart:typed_data';

import 'package:sass_embedded/src/util/length_delimited_transformer.dart';

import 'package:async/async.dart';
import 'package:test/test.dart';

void main() {
group("encoder", () {
Sink<List<int>> sink;
Stream<List<int>> stream;
setUp(() {
var controller = StreamController<List<int>>();
sink = controller.sink;
stream = controller.stream.transform(lengthDelimitedEncoder);
});

test("encodes an empty message", () {
sink.add([]);
sink.close();
expect(collectBytes(stream), completion(equals([0, 0, 0, 0])));
});

test("encodes a message of length 1", () {
sink.add([123]);
sink.close();
expect(collectBytes(stream), completion(equals([1, 0, 0, 0, 123])));
});

test("encodes a message of length greater than 256", () {
sink.add(List.filled(300, 1));
sink.close();
expect(collectBytes(stream),
completion(equals([44, 1, 0, 0, ...List.filled(300, 1)])));
});

test("encodes multiple messages", () {
sink.add([10]);
sink.add([20, 30]);
sink.add([40, 50, 60]);
sink.close();
expect(
collectBytes(stream),
completion(equals(
[1, 0, 0, 0, 10, 2, 0, 0, 0, 20, 30, 3, 0, 0, 0, 40, 50, 60])));
});
});

group("decoder", () {
Sink<List<int>> sink;
StreamQueue<Uint8List> queue;
setUp(() {
var controller = StreamController<List<int>>();
sink = controller.sink;
queue = StreamQueue(controller.stream.transform(lengthDelimitedDecoder));
});

group("decodes an empty message", () {
test("from a single chunk", () {
sink.add([0, 0, 0, 0]);
expect(queue, emits(isEmpty));
});

test("from multiple chunks", () {
sink.add([0, 0]);
sink.add([0, 0]);
expect(queue, emits(isEmpty));
});

test("from one chunk per byte", () {
sink..add([0])..add([0])..add([0])..add([0]);
expect(queue, emits(isEmpty));
});

test("from a chunk that contains more data", () {
sink.add([0, 0, 0, 0, 1, 0, 0, 0, 100]);
expect(queue, emits(isEmpty));
});
});

group("decodes a longer message", () {
test("from a single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4]);
expect(queue, emits([1, 2, 3, 4]));
});

test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2])..add([3, 4]);
expect(queue, emits([1, 2, 3, 4]));
});

test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4]) {
sink.add([byte]);
}
expect(queue, emits([1, 2, 3, 4]));
});

test("from a chunk that contains more data", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]);
expect(queue, emits([1, 2, 3, 4]));
});

test("of length greater than 256", () {
sink.add([44, 1, 0, 0, ...List.filled(300, 1)]);
expect(queue, emits(List.filled(300, 1)));
});
});

group("decodes multiple messages", () {
test("from single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]);
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
});

test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2, 3, 4, 2, 0])..add([0, 0, 101, 102]);
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
});

test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) {
sink.add([byte]);
}
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
});
});
});
}

0 comments on commit be35dbb

Please sign in to comment.