From f1c951b0e18d4e20e4e30b81153917b9d2c21517 Mon Sep 17 00:00:00 2001 From: Sophie Tauchert Date: Sun, 31 Jul 2022 21:36:18 +0500 Subject: [PATCH] Implement tokio-based asynchronous reader --- Cargo.toml | 16 +- Changelog.md | 2 + src/reader/async_tokio.rs | 388 ++++++++++++++++++++++++++++++++++ src/reader/buffered_reader.rs | 3 + src/reader/mod.rs | 33 ++- src/reader/ns_reader.rs | 20 +- src/reader/slice_reader.rs | 8 + tests/async-tokio.rs | 20 ++ 8 files changed, 468 insertions(+), 22 deletions(-) create mode 100644 src/reader/async_tokio.rs create mode 100644 tests/async-tokio.rs diff --git a/Cargo.toml b/Cargo.toml index f41bb994..af05e661 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,14 @@ documentation = "https://docs.rs/quick-xml" repository = "https://github.com/tafia/quick-xml" keywords = ["xml", "serde", "parser", "writer", "html"] -categories = ["encoding", "parsing", "parser-implementations"] +categories = ["asynchronous", "encoding", "parsing", "parser-implementations"] license = "MIT" [dependencies] document-features = { version = "0.2", optional = true } encoding_rs = { version = "0.8", optional = true } serde = { version = "1.0", optional = true } +tokio = { version = "1.20", optional = true, default-features = false, features = ["io-util"] } memchr = "2.5" [dev-dependencies] @@ -23,6 +24,8 @@ pretty_assertions = "1.2" regex = "1" serde = { version = "1.0", features = ["derive"] } serde-value = "0.7" +tokio = { version = "1.20", default-features = false, features = ["macros", "rt"] } +tokio-test = "0.4" [lib] bench = false @@ -37,6 +40,13 @@ harness = false [features] default = [] + +## Enables support for asynchronous reading from `tokio`'s IO-Traits by enabling +## [reading events] from types implementing [`tokio::io::AsyncBufRead`]. +## +## [reading events]: crate::reader::Reader::read_event_into_async +async-tokio = ["tokio"] + ## Enables support of non-UTF-8 encoded documents. Encoding will be inferred from ## the XML declaration if it will be found, otherwise UTF-8 is assumed. ## @@ -123,3 +133,7 @@ required-features = ["serialize"] [[test]] name = "serde-migrated" required-features = ["serialize"] + +[[test]] +name = "async-tokio" +required-features = ["async-tokio"] diff --git a/Changelog.md b/Changelog.md index a53fecba..d34e59a7 100644 --- a/Changelog.md +++ b/Changelog.md @@ -39,6 +39,7 @@ |`attribute_namespace` |`resolve_attribute` - [#439]: Added utilities `detect_encoding()`, `decode()`, and `decode_with_bom_removal()` under the `quick-xml::encoding` namespace. +- [#450]: Added support of asynchronous [tokio](https://tokio.rs/) readers ### Bug Fixes @@ -216,6 +217,7 @@ [#437]: https://github.com/tafia/quick-xml/pull/437 [#439]: https://github.com/tafia/quick-xml/pull/439 [#440]: https://github.com/tafia/quick-xml/pull/440 +[#450]: https://github.com/tafia/quick-xml/pull/450 ## 0.23.0 -- 2022-05-08 diff --git a/src/reader/async_tokio.rs b/src/reader/async_tokio.rs new file mode 100644 index 00000000..28784fd9 --- /dev/null +++ b/src/reader/async_tokio.rs @@ -0,0 +1,388 @@ +//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`] +//! as underlying byte stream. This reader fully implements async/await so reading +//! can use non-blocking I/O. + +use std::future::Future; +use std::pin::Pin; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt}; + +use crate::events::Event; +use crate::name::{QName, ResolveResult}; +use crate::reader::buffered_reader::impl_buffered_source; +use crate::reader::{is_whitespace, BangType, NsReader, ParseState, ReadElementState, Reader}; +use crate::{Error, Result}; + +/// A struct for read XML asynchronously from an [`AsyncBufRead`]. +/// +/// Having own struct allows us to implement anything without risk of name conflicts +/// and does not suffer from the impossibility of having `async` in traits. +struct TokioAdapter<'a, R>(&'a mut R); + +impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> { + impl_buffered_source!('b, 0, async, await); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +impl Reader { + /// An asynchronous version of [`read_event_into()`]. Reads the next event into + /// given buffer. + /// + /// > This function should be defined as + /// > ```ignore + /// > pub async fn read_event_into_async<'b>( + /// > &mut self, + /// > buf: &'b mut Vec + /// > ) -> Result>; + /// > ``` + /// > but Rust does not allow to write that for recursive asynchronous function + /// + /// This is the main entry point for reading XML `Event`s when using an async reader. + /// + /// See the documentation of [`read_event_into()`] for more information. + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::events::Event; + /// use quick_xml::Reader; + /// + /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered + /// // reader instead of relying on the zero-copy optimizations for reading + /// // from byte slices, which is provides the sync interface anyway. + /// let mut reader = Reader::from_reader(r#" + /// + /// Test + /// Test 2 + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// + /// let mut count = 0; + /// let mut buf = Vec::new(); + /// let mut txt = Vec::new(); + /// loop { + /// match reader.read_event_into_async(&mut buf).await { + /// Ok(Event::Start(_)) => count += 1, + /// Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()), + /// Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e), + /// Ok(Event::Eof) => break, + /// _ => (), + /// } + /// buf.clear(); + /// } + /// assert_eq!(count, 3); + /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); + /// # }) // tokio_test::block_on + /// ``` + /// + /// [`read_event_into()`]: Reader::read_event_into + pub fn read_event_into_async<'r, 'b: 'r>( + &'r mut self, + buf: &'b mut Vec, + ) -> Pin>> + 'r>> { + Box::pin(async move { + read_event_impl!(self, buf, read_until_open_async, read_until_close_async, await) + }) + } + + /// An asynchronous version of [`read_to_end_into()`]. + /// Reads asynchronously until end element is found using provided buffer as + /// intermediate storage for events content. This function is supposed to be + /// called after you already read a [`Start`] event. + /// + /// See the documentation of [`read_to_end_into()`] for more information. + /// + /// # Examples + /// + /// This example shows, how you can skip XML content after you read the + /// start event. + /// + /// ``` + /// # tokio_test::block_on(async { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::events::{BytesStart, Event}; + /// use quick_xml::Reader; + /// + /// let mut reader = Reader::from_reader(r#" + /// + /// + /// + /// + /// + /// + /// + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// let mut buf = Vec::new(); + /// + /// let start = BytesStart::new("outer"); + /// let end = start.to_end().into_owned(); + /// + /// // First, we read a start event... + /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start)); + /// + /// //...then, we could skip all events to the corresponding end event. + /// // This call will correctly handle nested elements. + /// // Note, however, that this method does not handle namespaces. + /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap(); + /// + /// // At the end we should get an Eof event, because we ate the whole XML + /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof); + /// # }) // tokio_test::block_on + /// ``` + /// + /// [`read_to_end_into()`]: Self::read_to_end_into + /// [`Start`]: Event::Start + pub async fn read_to_end_into_async<'n>( + &mut self, + // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033` + end: QName<'n>, + buf: &mut Vec, + ) -> Result<()> { + read_to_end!(self, end, buf, read_event_into_async, { buf.clear(); }, await) + } + + /// Read until '<' is found and moves reader to an `Opened` state. + /// + /// Return a `StartText` event if `first` is `true` and a `Text` event otherwise + async fn read_until_open_async<'b>( + &mut self, + buf: &'b mut Vec, + first: bool, + ) -> Result> { + read_until_open!(self, buf, first, TokioAdapter(&mut self.reader), read_event_into_async, await) + } + + /// Private function to read until `>` is found. This function expects that + /// it was called just after encounter a `<` symbol. + async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec) -> Result> { + read_until_close!(self, buf, TokioAdapter(&mut self.reader), await) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +impl NsReader { + /// An asynchronous version of [`read_event_into()`]. Reads the next event into + /// given buffer. + /// + /// This method manages namespaces but doesn't resolve them automatically. + /// You should call [`resolve_element()`] if you want to get a namespace. + /// + /// You also can use [`read_resolved_event_into_async()`] instead if you want + /// to resolve namespace as soon as you get an event. + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::events::Event; + /// use quick_xml::name::{Namespace, ResolveResult::*}; + /// use quick_xml::NsReader; + /// + /// let mut reader = NsReader::from_reader(r#" + /// + /// Test + /// Test 2 + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// + /// let mut count = 0; + /// let mut buf = Vec::new(); + /// let mut txt = Vec::new(); + /// loop { + /// match reader.read_event_into_async(&mut buf).await.unwrap() { + /// Event::Start(e) => { + /// count += 1; + /// let (ns, local) = reader.resolve_element(e.name()); + /// match local.as_ref() { + /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))), + /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))), + /// _ => unreachable!(), + /// } + /// } + /// Event::Text(e) => { + /// txt.push(e.unescape().unwrap().into_owned()) + /// } + /// Event::Eof => break, + /// _ => (), + /// } + /// buf.clear(); + /// } + /// assert_eq!(count, 3); + /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); + /// # }) // tokio_test::block_on + /// ``` + /// + /// [`read_event_into()`]: NsReader::read_event_into + /// [`resolve_element()`]: Self::resolve_element + /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async + pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec) -> Result> { + self.pop(); + let event = self.reader.read_event_into_async(buf).await; + self.process_event(event) + } + + /// An asynchronous version of [`read_to_end_into()`]. + /// Reads asynchronously until end element is found using provided buffer as + /// intermediate storage for events content. This function is supposed to be + /// called after you already read a [`Start`] event. + /// + /// See the documentation of [`read_to_end_into()`] for more information. + /// + /// # Examples + /// + /// This example shows, how you can skip XML content after you read the + /// start event. + /// + /// ``` + /// # tokio_test::block_on(async { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::name::{Namespace, ResolveResult}; + /// use quick_xml::events::{BytesStart, Event}; + /// use quick_xml::NsReader; + /// + /// let mut reader = NsReader::from_reader(r#" + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// let mut buf = Vec::new(); + /// + /// let ns = Namespace(b"namespace 1"); + /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5); + /// let end = start.to_end().into_owned(); + /// + /// // First, we read a start event... + /// assert_eq!( + /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(), + /// (ResolveResult::Bound(ns), Event::Start(start)) + /// ); + /// + /// //...then, we could skip all events to the corresponding end event. + /// // This call will correctly handle nested elements. + /// // Note, however, that this method does not handle namespaces. + /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap(); + /// + /// // At the end we should get an Eof event, because we ate the whole XML + /// assert_eq!( + /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(), + /// (ResolveResult::Unbound, Event::Eof) + /// ); + /// # }) // tokio_test::block_on + /// ``` + /// + /// [`read_to_end_into()`]: Self::read_to_end_into + /// [`Start`]: Event::Start + pub async fn read_to_end_into_async<'n>( + &mut self, + // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033` + end: QName<'n>, + buf: &mut Vec, + ) -> Result<()> { + // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should + // match literally the start name. See `Reader::check_end_names` documentation + self.reader.read_to_end_into_async(end, buf).await + } + + /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next + /// event into given buffer asynchronously and resolves its namespace (if applicable). + /// + /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events. + /// For all other events the concept of namespace is not defined, so + /// a [`ResolveResult::Unbound`] is returned. + /// + /// If you are not interested in namespaces, you can use [`read_event_into_async()`] + /// which will not automatically resolve namespaces for you. + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::events::Event; + /// use quick_xml::name::{Namespace, QName, ResolveResult::*}; + /// use quick_xml::NsReader; + /// + /// let mut reader = NsReader::from_reader(r#" + /// + /// Test + /// Test 2 + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// + /// let mut count = 0; + /// let mut buf = Vec::new(); + /// let mut txt = Vec::new(); + /// loop { + /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() { + /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => { + /// count += 1; + /// assert_eq!(e.local_name(), QName(b"tag1").into()); + /// } + /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => { + /// count += 1; + /// assert_eq!(e.local_name(), QName(b"tag2").into()); + /// } + /// (_, Event::Start(_)) => unreachable!(), + /// + /// (_, Event::Text(e)) => { + /// txt.push(e.unescape().unwrap().into_owned()) + /// } + /// (_, Event::Eof) => break, + /// _ => (), + /// } + /// buf.clear(); + /// } + /// assert_eq!(count, 3); + /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); + /// # }) // tokio_test::block_on + /// ``` + /// + /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into + /// [`Start`]: Event::Start + /// [`Empty`]: Event::Empty + /// [`End`]: Event::End + /// [`read_event_into_async()`]: Self::read_event_into_async + pub async fn read_resolved_event_into_async<'ns, 'b>( + // Name 'ns lifetime, because otherwise we get an error + // "implicit elided lifetime not allowed here" on ResolveResult + &'ns mut self, + buf: &'b mut Vec, + ) -> Result<(ResolveResult<'ns>, Event<'b>)> { + let event = self.read_event_into_async(buf).await; + self.resolve_event(event) + } +} + +#[cfg(test)] +mod test { + use super::TokioAdapter; + use crate::reader::test::check; + + check!( + #[tokio::test] + read_event_into_async, + read_until_close_async, + TokioAdapter, + &mut Vec::new(), + async, await + ); +} diff --git a/src/reader/buffered_reader.rs b/src/reader/buffered_reader.rs index ab33020b..f09ba706 100644 --- a/src/reader/buffered_reader.rs +++ b/src/reader/buffered_reader.rs @@ -211,6 +211,9 @@ macro_rules! impl_buffered_source { }; } +// Make it public for use in async implementations +pub(super) use impl_buffered_source; + /// Implementation of `XmlSource` for any `BufRead` reader using a user-given /// `Vec` as buffer that will be borrowed by events. impl<'b, R: BufRead> XmlSource<'b, &'b mut Vec> for R { diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 14ccc5ab..73ff5061 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -152,22 +152,22 @@ macro_rules! read_event_impl { macro_rules! read_until_open { ( $self:ident, $buf:ident, $first:ident, + $reader:expr, $read_event:ident $(, $await:ident)? ) => {{ $self.parser.state = ParseState::OpenedTag; if $self.parser.trim_text_start { - $self.reader.skip_whitespace(&mut $self.parser.offset) $(.$await)? ?; + $reader.skip_whitespace(&mut $self.parser.offset) $(.$await)? ?; } // If we already at the `<` symbol, do not try to return an empty Text event - if $self.reader.skip_one(b'<', &mut $self.parser.offset) $(.$await)? ? { + if $reader.skip_one(b'<', &mut $self.parser.offset) $(.$await)? ? { return $self.$read_event($buf) $(.$await)?; } - match $self - .reader + match $reader .read_bytes_until(b'<', $buf, &mut $self.parser.offset) $(.$await)? { @@ -180,15 +180,15 @@ macro_rules! read_until_open { macro_rules! read_until_close { ( - $self:ident, $buf:ident + $self:ident, $buf:ident, + $reader:expr $(, $await:ident)? ) => {{ $self.parser.state = ParseState::ClosedTag; - match $self.reader.peek_one() $(.$await)? { + match $reader.peek_one() $(.$await)? { // ` match $self - .reader + Ok(Some(b'!')) => match $reader .read_bang_element($buf, &mut $self.parser.offset) $(.$await)? { @@ -197,8 +197,7 @@ macro_rules! read_until_close { Err(e) => Err(e), }, // ` match $self - .reader + Ok(Some(b'/')) => match $reader .read_bytes_until(b'>', $buf, &mut $self.parser.offset) $(.$await)? { @@ -207,8 +206,7 @@ macro_rules! read_until_close { Err(e) => Err(e), }, // ` match $self - .reader + Ok(Some(b'?')) => match $reader .read_bytes_until(b'>', $buf, &mut $self.parser.offset) $(.$await)? { @@ -217,8 +215,7 @@ macro_rules! read_until_close { Err(e) => Err(e), }, // `<...` - opening or self-closed tag - Ok(Some(_)) => match $self - .reader + Ok(Some(_)) => match $reader .read_element($buf, &mut $self.parser.offset) $(.$await)? { @@ -264,6 +261,8 @@ macro_rules! read_to_end { }}; } +#[cfg(feature = "async-tokio")] +mod async_tokio; mod buffered_reader; mod ns_reader; mod parser; @@ -455,7 +454,7 @@ impl Reader { /// let xml = r#" /// Test /// Test 2 - /// "#; + /// "#; /// let mut reader = Reader::from_reader(Cursor::new(xml.as_bytes())); /// let mut buf = Vec::new(); /// @@ -552,7 +551,7 @@ impl Reader { where R: XmlSource<'i, B>, { - read_until_open!(self, buf, first, read_event_impl) + read_until_open!(self, buf, first, self.reader, read_event_impl) } /// Private function to read until `>` is found. This function expects that @@ -561,7 +560,7 @@ impl Reader { where R: XmlSource<'i, B>, { - read_until_close!(self, buf) + read_until_close!(self, buf, self.reader) } } diff --git a/src/reader/ns_reader.rs b/src/reader/ns_reader.rs index a7ecc0a6..3f56d248 100644 --- a/src/reader/ns_reader.rs +++ b/src/reader/ns_reader.rs @@ -19,7 +19,7 @@ use crate::reader::{Reader, XmlSource}; /// Consumes a [`BufRead`] and streams XML `Event`s. pub struct NsReader { /// An XML reader - reader: Reader, + pub(super) reader: Reader, /// Buffer that contains names of namespace prefixes (the part between `xmlns:` /// and an `=`) and namespace values. buffer: Vec, @@ -63,14 +63,14 @@ impl NsReader { self.process_event(event) } - fn pop(&mut self) { + pub(super) fn pop(&mut self) { if self.pending_pop { self.ns_resolver.pop(&mut self.buffer); self.pending_pop = false; } } - fn process_event<'i>(&mut self, event: Result>) -> Result> { + pub(super) fn process_event<'i>(&mut self, event: Result>) -> Result> { match event { Ok(Event::Start(e)) => { self.ns_resolver.push(&e, &mut self.buffer); @@ -93,7 +93,7 @@ impl NsReader { } } - fn resolve_event<'i>( + pub(super) fn resolve_event<'i>( &mut self, event: Result>, ) -> Result<(ResolveResult, Event<'i>)> { @@ -538,6 +538,10 @@ impl<'i> NsReader<&'i [u8]> { /// You also can use [`read_resolved_event()`] instead if you want to resolve namespace /// as soon as you get an event. /// + /// There is no asynchronous `read_event_async()` version of this function, + /// because it is not necessary -- the contents are already in memory and no IO + /// is needed, therefore there is no potential for blocking. + /// /// # Examples /// /// ``` @@ -595,6 +599,10 @@ impl<'i> NsReader<&'i [u8]> { /// If you are not interested in namespaces, you can use [`read_event()`] /// which will not automatically resolve namespaces for you. /// + /// There is no asynchronous `read_resolved_event_async()` version of this function, + /// because it is not necessary -- the contents are already in memory and no IO + /// is needed, therefore there is no potential for blocking. + /// /// # Examples /// /// ``` @@ -661,6 +669,10 @@ impl<'i> NsReader<&'i [u8]> { /// encoding_. It is good practice to always get that parameter using /// [`BytesStart::to_end()`] method. /// + /// There is no asynchronous `read_to_end_async()` version of this function, + /// because it is not necessary -- the contents are already in memory and no IO + /// is needed, therefore there is no potential for blocking. + /// /// # Namespaces /// /// Unlike [`Reader::read_to_end()`], this method resolves namespace diff --git a/src/reader/slice_reader.rs b/src/reader/slice_reader.rs index 9852dcb8..d4bf0d7e 100644 --- a/src/reader/slice_reader.rs +++ b/src/reader/slice_reader.rs @@ -34,6 +34,10 @@ impl<'a> Reader<&'a [u8]> { /// Read an event that borrows from the input rather than a buffer. /// + /// There is no asynchronous `read_event_async()` version of this function, + /// because it is not necessary -- the contents are already in memory and no IO + /// is needed, therefore there is no potential for blocking. + /// /// # Examples /// /// ``` @@ -83,6 +87,10 @@ impl<'a> Reader<&'a [u8]> { /// The correctness of the skipped events does not checked, if you disabled /// the [`check_end_names`] option. /// + /// There is no asynchronous `read_to_end_async()` version of this function, + /// because it is not necessary -- the contents are already in memory and no IO + /// is needed, therefore there is no potential for blocking. + /// /// # Namespaces /// /// While the [`Reader`] does not support namespace resolution, namespaces diff --git a/tests/async-tokio.rs b/tests/async-tokio.rs new file mode 100644 index 00000000..4d811580 --- /dev/null +++ b/tests/async-tokio.rs @@ -0,0 +1,20 @@ +use quick_xml::events::Event::*; +use quick_xml::Reader; + +#[tokio::test] +async fn test_sample() { + let src = include_str!("documents/sample_rss.xml"); + let mut reader = Reader::from_reader(src.as_bytes()); + let mut buf = Vec::new(); + let mut count = 0; + loop { + match reader.read_event_into_async(&mut buf).await.unwrap() { + Start(_) => count += 1, + Decl(e) => println!("{:?}", e.version()), + Eof => break, + _ => (), + } + buf.clear(); + } + println!("{}", count); +}