Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Akka.IO / Akka.Streams: ByteString rewrite #7136

Draft
wants to merge 19 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3653,14 +3653,17 @@ namespace Akka.IO
BigEndian = 0,
LittleEndian = 1,
}
[System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")]
public sealed class ByteString : System.Collections.Generic.IEnumerable<byte>, System.Collections.IEnumerable, System.IEquatable<Akka.IO.ByteString>
[System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")]
public sealed class ByteString : System.IEquatable<Akka.IO.ByteString>
{
public int Count { get; }
public static Akka.IO.ByteString Empty { get; }
[System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")]
public bool IsCompact { get; }
public bool IsEmpty { get; }
public byte this[int index] { get; }
public System.ReadOnlyMemory<byte> Memory { get; }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public Akka.IO.ByteString Compact() { }
public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { }
public static Akka.IO.ByteString CopyFrom(byte[] array) { }
Expand All @@ -3672,9 +3675,13 @@ namespace Akka.IO
public static Akka.IO.ByteString CopyFrom(System.Span<byte> span, int offset, int count) { }
public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buffers) { }
public int CopyTo(byte[] buffer, int index, int count) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Memory<byte> buffer) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Memory<byte> buffer, int index, int count) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Span<byte> buffer) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Span<byte> buffer, int index, int count) { }
public override bool Equals(object obj) { }
public bool Equals(Akka.IO.ByteString other) { }
Expand All @@ -3684,7 +3691,6 @@ namespace Akka.IO
public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buffers) { }
public static Akka.IO.ByteString FromString(string str) { }
public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { }
public System.Collections.Generic.IEnumerator<byte> GetEnumerator() { }
public override int GetHashCode() { }
public bool HasSubstring(Akka.IO.ByteString other, int index) { }
public int IndexOf(byte b) { }
Expand All @@ -3694,12 +3700,16 @@ namespace Akka.IO
public byte[] ToArray() { }
public override string ToString() { }
public string ToString(System.Text.Encoding encoding) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public void WriteTo(System.IO.Stream stream) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { }
public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
public static Akka.IO.ByteString op_Explicit(byte[] bytes) { }
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static Akka.IO.ByteString op_Explicit(System.Memory<byte> memory) { }
public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory<byte> memory) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Expand Up @@ -3643,14 +3643,17 @@ namespace Akka.IO
BigEndian = 0,
LittleEndian = 1,
}
[System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")]
public sealed class ByteString : System.Collections.Generic.IEnumerable<byte>, System.Collections.IEnumerable, System.IEquatable<Akka.IO.ByteString>
[System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")]
public sealed class ByteString : System.IEquatable<Akka.IO.ByteString>
{
public int Count { get; }
public static Akka.IO.ByteString Empty { get; }
[System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")]
public bool IsCompact { get; }
public bool IsEmpty { get; }
public byte this[int index] { get; }
public System.ReadOnlyMemory<byte> Memory { get; }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public Akka.IO.ByteString Compact() { }
public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { }
public static Akka.IO.ByteString CopyFrom(byte[] array) { }
Expand All @@ -3662,9 +3665,13 @@ namespace Akka.IO
public static Akka.IO.ByteString CopyFrom(System.Span<byte> span, int offset, int count) { }
public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buffers) { }
public int CopyTo(byte[] buffer, int index, int count) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Memory<byte> buffer) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Memory<byte> buffer, int index, int count) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Span<byte> buffer) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public int CopyTo(ref System.Span<byte> buffer, int index, int count) { }
public override bool Equals(object obj) { }
public bool Equals(Akka.IO.ByteString other) { }
Expand All @@ -3674,7 +3681,6 @@ namespace Akka.IO
public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buffers) { }
public static Akka.IO.ByteString FromString(string str) { }
public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { }
public System.Collections.Generic.IEnumerator<byte> GetEnumerator() { }
public override int GetHashCode() { }
public bool HasSubstring(Akka.IO.ByteString other, int index) { }
public int IndexOf(byte b) { }
Expand All @@ -3684,12 +3690,16 @@ namespace Akka.IO
public byte[] ToArray() { }
public override string ToString() { }
public string ToString(System.Text.Encoding encoding) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public void WriteTo(System.IO.Stream stream) { }
[System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")]
public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { }
public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
public static Akka.IO.ByteString op_Explicit(byte[] bytes) { }
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static Akka.IO.ByteString op_Explicit(System.Memory<byte> memory) { }
public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory<byte> memory) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
8 changes: 5 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Expand Up @@ -424,16 +424,18 @@ public async Task GroupBy_must_work_under_fuzzing_stress_test()
await this.AssertAllStagesStoppedAsync(async () =>
{
var publisherProbe = this.CreateManualPublisherProbe<ByteString>();
var subscriber = this.CreateManualSubscriberProbe<IEnumerable<byte>>();
var subscriber = this.CreateManualSubscriberProbe<ByteString>();

var firstGroup = (Source<IEnumerable<byte>, NotUsed>)Source.FromPublisher(publisherProbe)
.GroupBy(256, element => element[0])
.Select(b => b.ToArray()) // have to convert to an array
.Select(b => b.Reverse())
.MergeSubstreams();
var secondGroup = (Source<IEnumerable<byte>, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First())
var secondGroup = (Source<ByteString, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First())
.Select(b => b.Reverse())
.Select(b => ByteString.FromBytes(b.ToArray()))
.MergeSubstreams();
var publisher = secondGroup.RunWith(Sink.AsPublisher<IEnumerable<byte>>(false), Materializer);
var publisher = secondGroup.RunWith(Sink.AsPublisher<ByteString>(false), Materializer);
publisher.Subscribe(subscriber);

var upstreamSubscription = await publisherProbe.ExpectSubscriptionAsync();
Expand Down