Skip to content

Commit

Permalink
Fix blocking IO in JsonTextWriter.CloseAsync and add IAsyncDisposable J…
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonCropp committed Feb 11, 2023
1 parent cc5d9db commit bdeae53
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 2 deletions.
13 changes: 13 additions & 0 deletions src/Argon/JsonReader.Async.cs
Expand Up @@ -5,7 +5,20 @@
namespace Argon;

public abstract partial class JsonReader
: IAsyncDisposable
{
ValueTask IAsyncDisposable.DisposeAsync()
{
try
{
Dispose(true);
return default;
}
catch (Exception exc)
{
return ValueTask.FromException(exc);
}
}
/// <summary>
/// Asynchronously reads the next JSON token from the source.
/// </summary>
Expand Down
29 changes: 28 additions & 1 deletion src/Argon/JsonTextWriter.Async.cs
Expand Up @@ -111,7 +111,34 @@ async Task DoCloseAsync(CancellationToken cancellation)
await WriteEndAsync(cancellation).ConfigureAwait(false);
}

CloseBufferAndWriter();
await CloseBufferAndWriterAsync().ConfigureAwait(false);
}

private async Task CloseBufferAndWriterAsync()
{
if (writeBuffer != null)
{
BufferUtils.ReturnBuffer(writeBuffer);
writeBuffer = null;
}

if (CloseOutput && writer != null)
{
#if HAVE_ASYNC_DISPOABLE
await _writer.DisposeAsync().ConfigureAwait(false);
#else
// DisposeAsync isn't available. Instead, flush any remaining content with FlushAsync
// to prevent Close/Dispose from making a blocking flush.
//
// No cancellation token on TextWriter.FlushAsync?!
await writer.FlushAsync().ConfigureAwait(false);
#if HAVE_STREAM_READER_WRITER_CLOSE
writer.Close();
#else
writer.Dispose();
#endif
#endif
}
}

/// <summary>
Expand Down
8 changes: 8 additions & 0 deletions src/Argon/JsonWriter.Async.cs
Expand Up @@ -7,7 +7,15 @@
namespace Argon;

public abstract partial class JsonWriter
: IAsyncDisposable
{
async ValueTask IAsyncDisposable.DisposeAsync()
{
if (currentState != State.Closed)
{
await CloseAsync().ConfigureAwait(false);
}
}
internal Task AutoCompleteAsync(JsonToken tokenBeingWritten, CancellationToken cancellation)
{
var oldState = currentState;
Expand Down
2 changes: 1 addition & 1 deletion src/Tests/Issues/Issue2638.cs
Expand Up @@ -34,7 +34,7 @@ public void DeserializeUsesSharedDoubleZeros()

static void Test(double value, bool expectSame)
{
var obj = (JObject) JToken.Parse(@"{""x"": XXX, ""y"": XXX}".Replace("XXX", value.ToString("0.0###", CultureInfo.InvariantCulture)));
var obj = (JObject) JToken.Parse(@"{""x"": XXX, ""y"": XXX}".Replace("XXX", value.ToString("0.0###", InvariantCulture)));
var x = ((JValue) obj["x"]).Value;
var y = ((JValue) obj["y"]).Value;

Expand Down
106 changes: 106 additions & 0 deletions src/Tests/Issues/Issue2694.cs
@@ -0,0 +1,106 @@
// Copyright (c) 2007 James Newton-King. All rights reserved.
// Use of this source code is governed by The MIT License,
// as found in the license.md file.

#if NET7_0
public class Issue2694 : TestFixtureBase
{
[Fact]
public async Task Test_Reader_DisposeAsync()
{
var reader = new JsonTextReader(new StringReader("{}"));
IAsyncDisposable asyncDisposable = reader;
await asyncDisposable.DisposeAsync();

var exception = await Assert.ThrowsAsync<JsonReaderException>(() => reader.ReadAsync());
Assert.Equal("Unexpected state: Closed. Path '', line 1, position 0.", exception.Message);
}

[Fact]
public async Task Test_Writer_DisposeAsync()
{
var ms = new MemoryStream();
Stream s = new AsyncOnlyStream(ms);
var sr = new StreamWriter(s, new UTF8Encoding(encoderShouldEmitUTF8Identifier: false), 2, leaveOpen: true);
await using (var writer = new JsonTextWriter(sr))
{
await writer.WriteStartObjectAsync();
}

string json = Encoding.UTF8.GetString(ms.ToArray());
Assert.Equal("{}", json);
}

[Fact]
public async Task Test_Writer_CloseAsync()
{
var ms = new MemoryStream();
Stream s = new AsyncOnlyStream(ms);
var sr = new StreamWriter(s, new UTF8Encoding(encoderShouldEmitUTF8Identifier: false), 2, leaveOpen: true);
var writer = new JsonTextWriter(sr);
await writer.WriteStartObjectAsync();

await writer.CloseAsync();

var json = Encoding.UTF8.GetString(ms.ToArray());
Assert.Equal("{}", json);
}

public class AsyncOnlyStream : Stream
{
private readonly Stream _innerStream;
private int _unflushedContentLength;

public AsyncOnlyStream(Stream innerStream) =>
_innerStream = innerStream;

public override void Flush()
{
// It's ok to call Flush if the content was already processed with FlushAsync.
if (_unflushedContentLength > 0)
{
throw new($"Flush when there is {_unflushedContentLength} bytes buffered.");
}
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
_unflushedContentLength = 0;
return _innerStream.FlushAsync(cancellationToken);
}

public override long Seek(long offset, SeekOrigin origin) =>
_innerStream.Seek(offset, origin);

public override void SetLength(long value) =>
_innerStream.SetLength(value);

public override int Read(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_innerStream.ReadAsync(buffer, offset, count, cancellationToken);

public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
_unflushedContentLength += count;
return _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => _innerStream.CanSeek;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;

public override long Position
{
get => _innerStream.Position;
set => _innerStream.Position = value;
}
}
}

#endif

0 comments on commit bdeae53

Please sign in to comment.