Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way for StreamDeserializer to stream an array of values #404

Open
dtolnay opened this issue Jan 12, 2018 · 13 comments · May be fixed by #1091
Open

Provide a way for StreamDeserializer to stream an array of values #404

dtolnay opened this issue Jan 12, 2018 · 13 comments · May be fixed by #1091

Comments

@dtolnay
Copy link
Member

dtolnay commented Jan 12, 2018

It seems like a common situation to have a large top-level array in a JSON file.

[
  {...},
  {...},
  {...},
  ...
]

StreamDeserializer should expose a way to access the elements of such an array one at a time without holding the whole thing in memory.

@rofrol
Copy link

rofrol commented Jul 30, 2018

Example serde-rs/serde#1107 (comment)

17dec pushed a commit to 17dec/json that referenced this issue Mar 19, 2019
This mimics the StreamDeserializer API and implements issue serde-rs#404.
17dec pushed a commit to 17dec/json that referenced this issue Mar 19, 2019
This mimics the StreamDeserializer API and implements issue serde-rs#404.
17dec pushed a commit to 17dec/json that referenced this issue Mar 25, 2019
This mimics the StreamDeserializer API and implements issue serde-rs#404. Unlike
the StreamDeserializer, the ArrayDeserializer struct itself does not
keep track of the type of the array's elements, instead the next()
itself is generic to support deserialization of arrays with values of
different types. Unfortunately, this means we can't implement the
Iterator trait.
@davidOSUL
Copy link

davidOSUL commented Aug 14, 2020

This isn't a solution to the StreamDeserializer issue, but one workaround to be able to access elements of a large json array one at a time is to use something like a sync_channel with a bound of zero, and have the sender do what the visitor in this comment is doing. Here's a minimal example of that, I think you could probably also make the iterator and visitor generic on any type that is Deserializable as well:

use serde::de::{Deserializer, SeqAccess, Visitor};
use serde::Deserialize;
use serde_json;
use std::fs::File;
use std::io::{BufReader, Write};
use std::path::PathBuf;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::{fmt, thread};

type DeserializeResult = Result<MyJson, String>;

#[derive(Deserialize, Debug)]
struct MyJson {
    val1: String,
    val2: Vec<i32>,
}

struct MyJsonIterator {
    receiver: Receiver<DeserializeResult>,
}

struct MyJsonVisitor {
    sender: SyncSender<DeserializeResult>,
}

impl Iterator for MyJsonIterator {
    type Item = DeserializeResult;

    fn next(&mut self) -> Option<Self::Item> {
        self.receiver.recv().ok() //ok() because a RecvError implies we are done
    }
}

impl MyJsonIterator {
    pub fn new(path: PathBuf) -> Self {
        let (sender, receiver) = sync_channel::<DeserializeResult>(0);

        thread::spawn(move || {
            let reader = BufReader::new(File::open(path).unwrap()); //in real scenario may want to send error, instead of unwrapping
            let mut deserializer = serde_json::Deserializer::from_reader(reader);
            if let Err(e) = deserializer.deserialize_seq(MyJsonVisitor {sender: sender.clone()}) {
                let _ = sender.send(Err(e.to_string())); //let _ = because error from calling send just means receiver has disconnected
            }
        });

        Self { receiver }
    }
}

impl<'de> Visitor<'de> for MyJsonVisitor {
    type Value = ();

    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        formatter.write_str("array of MyJson")
    }

    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
    where
        A: SeqAccess<'de>,
    {
        while let Some(val) = seq.next_element::<MyJson>()? {
            if self.sender.send(Ok(val)).is_err() {
                break; //receiver has disconnected.
            }
        }
        Ok(())
    }
}

fn main() -> std::io::Result<()> {
    let file = setup_test_file()?;
    let iter = MyJsonIterator::new(file.path().to_owned());
    for my_json in iter {
        println!("{:?}", my_json.unwrap());
    }
    file.close()?;
    Ok(())
}

fn setup_test_file() -> std::io::Result<tempfile::NamedTempFile> {
    let mut file = tempfile::NamedTempFile::new()?;
    let json_str = r#"
[
  {
    "val1": "one",
    "val2": [
      0
    ]
  },
  {
    "val1": "two",
    "val2": [
      1
    ]
  },
  {
    "val1": "three",
    "val2": [
      2
    ]
  }
]
"#;
    file.write_all(json_str.as_bytes())?;
    Ok(file)
}

@leo60228
Copy link

leo60228 commented Mar 9, 2021

@dtolnay Unlike #653, my PR supports deserializing heterogeneous arrays. It doesn't provide a full streaming deserialization API, but I'm not convinced that that's in-scope for serde_json anyway. Streaming top-level arrays seems like a logical extension of existing functionality.

@dtolnay
Copy link
Member Author

dtolnay commented Mar 9, 2021

I think streaming subobjects is in scope for serde_json. :) I don't see why it wouldn't be.

Once that exists, anything we'd added that is entirely specific to top level arrays would be vestigial tech debt.

@leo60228
Copy link

leo60228 commented Mar 9, 2021

I don't see why it wouldn't be.

I was thinking of serde_json as "serde support for JSON," in which case I think it would make more sense to have streaming functionality in serde itself. If the intention is more "JSON built on top of serde," then that makes sense.

@hniksic
Copy link

hniksic commented Aug 4, 2021

For those who find this issue via a search engine, here is another workaround, the one from the accepted answer to the StackOverflow question from the previous comment:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}

This code manually parses the opening [, the separating , and the closing ], but it doesn't require a separate thread nor channel communication, which makes it much faster than the previous workaround. In a microbenchmark, it parses 3 million entries in 2s compared to 16s for the channel version (benchmark code: with channels, without channels).

@tedinski
Copy link

There were some comments on a previous PR to add a limited ArrayDeserializer that I just wanted to throw my 2 cents at:

I am concerned that ArrayDeserializer is not good for handling arrays with mixed element type.

We've had good success with parsing mixed-type arrays by just defining an enum over those types and e.g. adding #[serde(untagged)] (or another enum tagging discipline if applicable).

I imagine the overwhelming majority of the use case here is people with a perfectly functioning parser for e.g. Vec<MyType> already, and they just need to swap over to streaming for liveness/memory/performance reasons.

Mainly I wonder how this API would be different if we wanted to support streaming nested arrays inside of arrays, arrays inside of objects, objects inside of objects, etc. Almost like a deserialization version of serde_json::ser::Formatter.

Once that exists, anything we'd added that is entirely specific to top level arrays would be vestigial tech debt.

I'm not sure this would be vestigial tech debt. I think even with the more general API, you'd want a convenience wrapper for a common case.

@brandonros
Copy link

brandonros commented Sep 29, 2022

I think this works:

fn parse_json_rows<T: for<'de> serde::Deserialize<'de>>(filename: &str) -> Vec<T> {
  let file = File::open(filename).unwrap();
  let reader = BufReader::new(file);
  let mut deserializer = serde_json::Deserializer::from_reader(reader);
  let mut rows: Vec<T> = vec![];
  let mut iterator = deserializer.into_iter::<Vec<Value>>();
  let top_level_array: Vec<Value> = iterator.next().unwrap().unwrap();
  for row_value in top_level_array.into_iter() {
    let row: T = serde_json::from_value::<T>(row_value).unwrap();
    rows.push(row);
  }
  return rows;
}

@Lucretiel
Copy link
Contributor

I believe that serde_json's Deserializer::into_iter is about deserializing a stream of concatenated json objects, resembling:

{"a": 1, "b": 2}{"a": 3, "b": 4}

This is separate from smartly converting a single array into a streaming deserialization.

Not quite sure why the expected deserialize_vec is missing/not implmeneted:

I'm not sure what you mean by this; deserialize_vec isn't a standard or conventional serde method. Generally you just call Vec::deserialize.

@commonsensesoftware
Copy link

I have a similar, but slightly more complex use case. I fundamentally suffer from the same issue with large files where I need to stream over tens of thousands of items in an array where the files can be hundreds of MB or even GB.

Consider the following:

{
    "id": "42",
    "metadata": ["meta-1"],
    "columns": {
        "key": "0",
        "field-1": "1",
        "field-2": "2"
    },
    "rows": [
        {
            "0": "12345",
            "1": "foo",
            "2": "bar"
        },
        {
            "0": "67890",
            "1": "i pity the foo",
            "2": "bar"
        }
    ]
}

I need to read the first few members that must also be forwarded to the rows member deserialization. At that point, the accumulated data is a monad that could be passed to an iterator implementation in the manner that @hniksic outlined.

I know this is possible, I'm not sure how to write it. In this particular case, all the values in columns and rows will have string values. They are defined as a map (not my design), but they don't need to be. The numeric value can be parsed and used to provide stable ordering in a simple unordered array (which I've done with custom deserialization before). Every item in a file is heterogenous so that also means I can ideally allocate a single Vec to represent the row and populate it as each item is visited/iterated like a cursor.

I've written a custom deserializer for other formats before so I have a rough idea of what's needed. Essentially, I believe I need a visitor that understands the specific attributes before rows (this is constant and well-known). Once rows is reached, this information would be provided to the Iterator implementation and follow the @hniksic approach.

Logically, it feels to me that what is needed is a visitor that is also an iterator. Parts of the JSON are visited and retained in the visitor prior to yielding the first value. After that, each visited item just yields the current item (obviously with whatever your custom logic is). I don't if that contributes to the larger thought process as to how to solve a general purpose StreamDeserializer, but that's how I think about the problem.

If there are any ideas on how this might be solved with what's possible today, it's greatly appreciated. If I can determine how to handle visiting the different elements and when to cut over to iteration in the deserialization process, I should be able to take it from there. I'm sure there are others that have a similar requirement.

@commonsensesoftware
Copy link

It took a couple of weeks, but I finally figured out how to stream an arbitrary JSON sequence when it's not the root element. My approach is heavily based on the foundation put forth by @hniksic (so thank you for that). Some key differences are:

  • Stream any sequence within the JSON, not just the root
    • It's probably not a stretch to even have multiple streams
  • Use the super secret Deserialize::deserialize_in_place approach instead of StreamDeserializer
    • This can afford a lot of benefit because only a single item need be allocated to represent each enumerated item
    • It is still possible to use Deserialize::deserialize or StreamDeserializer if you want to

I've extracted the relevant bits from my solution and put them up in the runnable Gist "JSON Streaming in Rust" that can be forked or cloned. May someone find that useful if you land here and are trying to stream part of a JSON document. Better still, perhaps it will plant a seed or spark an idea that will enable the crate to address this challenge intrinsically.

@jaskij
Copy link

jaskij commented Sep 8, 2023

As an alternative workaround, if you are preprocessing your files with jq, you can transform a top-level array of objects into a file suitable for streaming by simply doing jq '.[]' file.json.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment