From bdeae53fe02022689c03317154f14f181be0dcbe Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Sat, 11 Feb 2023 21:51:29 +1100 Subject: [PATCH] Fix blocking IO in JsonTextWriter.CloseAsync and add IAsyncDisposable https://github.com/JamesNK/Newtonsoft.Json/pull/2695 --- src/Argon/JsonReader.Async.cs | 13 ++++ src/Argon/JsonTextWriter.Async.cs | 29 +++++++- src/Argon/JsonWriter.Async.cs | 8 +++ src/Tests/Issues/Issue2638.cs | 2 +- src/Tests/Issues/Issue2694.cs | 106 ++++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 src/Tests/Issues/Issue2694.cs diff --git a/src/Argon/JsonReader.Async.cs b/src/Argon/JsonReader.Async.cs index 6e397e92a..881ff7a8e 100644 --- a/src/Argon/JsonReader.Async.cs +++ b/src/Argon/JsonReader.Async.cs @@ -5,7 +5,20 @@ namespace Argon; public abstract partial class JsonReader + : IAsyncDisposable { + ValueTask IAsyncDisposable.DisposeAsync() + { + try + { + Dispose(true); + return default; + } + catch (Exception exc) + { + return ValueTask.FromException(exc); + } + } /// /// Asynchronously reads the next JSON token from the source. /// diff --git a/src/Argon/JsonTextWriter.Async.cs b/src/Argon/JsonTextWriter.Async.cs index 9a5163bb3..136d9b76f 100644 --- a/src/Argon/JsonTextWriter.Async.cs +++ b/src/Argon/JsonTextWriter.Async.cs @@ -111,7 +111,34 @@ async Task DoCloseAsync(CancellationToken cancellation) await WriteEndAsync(cancellation).ConfigureAwait(false); } - CloseBufferAndWriter(); + await CloseBufferAndWriterAsync().ConfigureAwait(false); + } + + private async Task CloseBufferAndWriterAsync() + { + if (writeBuffer != null) + { + BufferUtils.ReturnBuffer(writeBuffer); + writeBuffer = null; + } + + if (CloseOutput && writer != null) + { +#if HAVE_ASYNC_DISPOABLE + await _writer.DisposeAsync().ConfigureAwait(false); +#else + // DisposeAsync isn't available. Instead, flush any remaining content with FlushAsync + // to prevent Close/Dispose from making a blocking flush. + // + // No cancellation token on TextWriter.FlushAsync?! + await writer.FlushAsync().ConfigureAwait(false); +#if HAVE_STREAM_READER_WRITER_CLOSE + writer.Close(); +#else + writer.Dispose(); +#endif +#endif + } } /// diff --git a/src/Argon/JsonWriter.Async.cs b/src/Argon/JsonWriter.Async.cs index 2704da6d7..080c0ba84 100644 --- a/src/Argon/JsonWriter.Async.cs +++ b/src/Argon/JsonWriter.Async.cs @@ -7,7 +7,15 @@ namespace Argon; public abstract partial class JsonWriter + : IAsyncDisposable { + async ValueTask IAsyncDisposable.DisposeAsync() + { + if (currentState != State.Closed) + { + await CloseAsync().ConfigureAwait(false); + } + } internal Task AutoCompleteAsync(JsonToken tokenBeingWritten, CancellationToken cancellation) { var oldState = currentState; diff --git a/src/Tests/Issues/Issue2638.cs b/src/Tests/Issues/Issue2638.cs index f9cbf715b..4f986fc43 100644 --- a/src/Tests/Issues/Issue2638.cs +++ b/src/Tests/Issues/Issue2638.cs @@ -34,7 +34,7 @@ public void DeserializeUsesSharedDoubleZeros() static void Test(double value, bool expectSame) { - var obj = (JObject) JToken.Parse(@"{""x"": XXX, ""y"": XXX}".Replace("XXX", value.ToString("0.0###", CultureInfo.InvariantCulture))); + var obj = (JObject) JToken.Parse(@"{""x"": XXX, ""y"": XXX}".Replace("XXX", value.ToString("0.0###", InvariantCulture))); var x = ((JValue) obj["x"]).Value; var y = ((JValue) obj["y"]).Value; diff --git a/src/Tests/Issues/Issue2694.cs b/src/Tests/Issues/Issue2694.cs new file mode 100644 index 000000000..ebbba179c --- /dev/null +++ b/src/Tests/Issues/Issue2694.cs @@ -0,0 +1,106 @@ +// Copyright (c) 2007 James Newton-King. All rights reserved. +// Use of this source code is governed by The MIT License, +// as found in the license.md file. + +#if NET7_0 +public class Issue2694 : TestFixtureBase +{ + [Fact] + public async Task Test_Reader_DisposeAsync() + { + var reader = new JsonTextReader(new StringReader("{}")); + IAsyncDisposable asyncDisposable = reader; + await asyncDisposable.DisposeAsync(); + + var exception = await Assert.ThrowsAsync(() => reader.ReadAsync()); + Assert.Equal("Unexpected state: Closed. Path '', line 1, position 0.", exception.Message); + } + + [Fact] + public async Task Test_Writer_DisposeAsync() + { + var ms = new MemoryStream(); + Stream s = new AsyncOnlyStream(ms); + var sr = new StreamWriter(s, new UTF8Encoding(encoderShouldEmitUTF8Identifier: false), 2, leaveOpen: true); + await using (var writer = new JsonTextWriter(sr)) + { + await writer.WriteStartObjectAsync(); + } + + string json = Encoding.UTF8.GetString(ms.ToArray()); + Assert.Equal("{}", json); + } + + [Fact] + public async Task Test_Writer_CloseAsync() + { + var ms = new MemoryStream(); + Stream s = new AsyncOnlyStream(ms); + var sr = new StreamWriter(s, new UTF8Encoding(encoderShouldEmitUTF8Identifier: false), 2, leaveOpen: true); + var writer = new JsonTextWriter(sr); + await writer.WriteStartObjectAsync(); + + await writer.CloseAsync(); + + var json = Encoding.UTF8.GetString(ms.ToArray()); + Assert.Equal("{}", json); + } + + public class AsyncOnlyStream : Stream + { + private readonly Stream _innerStream; + private int _unflushedContentLength; + + public AsyncOnlyStream(Stream innerStream) => + _innerStream = innerStream; + + public override void Flush() + { + // It's ok to call Flush if the content was already processed with FlushAsync. + if (_unflushedContentLength > 0) + { + throw new($"Flush when there is {_unflushedContentLength} bytes buffered."); + } + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + _unflushedContentLength = 0; + return _innerStream.FlushAsync(cancellationToken); + } + + public override long Seek(long offset, SeekOrigin origin) => + _innerStream.Seek(offset, origin); + + public override void SetLength(long value) => + _innerStream.SetLength(value); + + public override int Read(byte[] buffer, int offset, int count) => + throw new NotSupportedException(); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _innerStream.ReadAsync(buffer, offset, count, cancellationToken); + + public override void Write(byte[] buffer, int offset, int count) => + throw new NotSupportedException(); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + _unflushedContentLength += count; + return _innerStream.WriteAsync(buffer, offset, count, cancellationToken); + } + + public override bool CanRead => _innerStream.CanRead; + public override bool CanSeek => _innerStream.CanSeek; + public override bool CanWrite => _innerStream.CanWrite; + public override long Length => _innerStream.Length; + + public override long Position + { + get => _innerStream.Position; + set => _innerStream.Position = value; + } + } +} + +#endif \ No newline at end of file