Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow csv reader builder to do schema inference even when reading csv from stdin #1059

Closed
corneliusroemer opened this issue Dec 18, 2021 · 8 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@corneliusroemer
Copy link

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I would like to stream 100+GB of SARS-CoV-2 sequence data into .parquet with zstd compression (works really well on these sequences)

I would like to do this without having to hard-code the schema, for example through a CLI like https://github.com/domoritz/csv2parquet/blob/main/src/main.rs

However, that CLI requires me to provide a file and does not allow me to read from stdin. Why? Because the reader builder requires input to be seekable which stdin is not. I

Describe the solution you'd like
It'd be good if the reader builder could be more flexible and infer schema from the first say 100 lines that can still be kept in memory.

Describe alternatives you've considered
I could add a schema option to the CLI tool, but that's annoying and unnecessary because I just want a very simple schema: str/str. I could also do schema inference myself but again this is quite difficult and would be good to be provided from arrow-rs directly.

@domoritz
Copy link
Member

Is this related to #189?

@jorgecarleitao
Copy link
Member

I think that this may be achieved with the current code with a generic struct over R: Read that wraps a reader: R that

  1. implements Read and buffers the result.
  2. implements Seek to positions smaller than the buffer (and panics otherwise)
  3. when the read happens over the buffered result, it reads first from the buffer up to the length of the buffered result, and then from the inner reader

@corneliusroemer
Copy link
Author

@jorgecarleitao I agree that implementing a custom seekable Reader would be necessary to solve this without dropping the seekable requirement. But I'm not sure your suggestion is the way forward since we shouldn't read all of stdin into memory. Memory is even scarcer than storage.

If you have 1TB uncompressed that compresses to 1GB. I can uncompress the 1GB to storage, then read it in with a normal file reader (which is seekable). Problem: I need 1TB of space and write-time.

What doesn't work is read the 1TB into memory. No way.

Alternative: read 1GB into memory, infer schema, then stream.

Your suggestions seems to read it all into memory, doesn't it? When would you be allowed to drop early parts of the buffer?

Couldn't one drop all back-buffer once one has gone beyond max_read_records in builder = builder.infer_schema(opts.max_read_records);? Once the max has been read, there's no need for seeking anymore. Seeking shouldn't happen anymore so one can drop whatever is behind you.

@jorgecarleitao
Copy link
Member

jorgecarleitao commented Dec 18, 2021

Sorry that I did not express myself very well. I meant something like this:

use std::io::Read;

struct ReaderA<R: Read> {
    pub reader: R,
    position: Option<u64>,
    pub buffer: Vec<u8>,
}

impl<R: Read> Read for ReaderA<R> {
    #[inline]
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let length = buf.len();
        if let Some(position) = &mut self.position {
            let start = *position as usize;
            // we have seeked to somewhere in our buffer. Read from buffer first
            if start + length < self.buffer.len() {
                // our buffer fills `buf` => memcopy all
                buf.copy_from_slice(&self.buffer[start..start + length]);
                *position += length;
                Ok(length)
            } else if start <= self.buffer.len() {
                // edge case where the read covers `self.buffer` and `self.reader`:
                // read from both accordingly
                let buffer_remaining = self.buffer.len() - start;
                (&mut buf[..buffer_remaining])
                    .copy_from_slice(&self.buffer[start..start + buffer_remaining]);

                // read the remaining from the reader
                let read = self.reader.read(&mut buf[buffer_remaining..])?;
                *position += (buffer_remaining + read) as u64;

                if *position > self.buffer.len() {
                    // release memory
                    std::mem::swap(&mut self.buffer, &mut vec![]);
                }

                Ok(buffer_remaining + read)
            } else {
                // we are past `self.buffer`,
                self.reader.read(buf)
            }
        } else {
            // no seek was done so far, read to the buffer
            let start = self.buffer.len();
            self.buffer.extend(std::iter::repeat(0).take(length));
            let read = self.reader.read(&mut self.buffer[start..start + length])?;
            (&mut buf).copy_from_slice(&self.buffer[start..start + length]);
            Ok(read)
        }
    }
}

impl<R: Read> std::io::Seek for ReaderA<R> {
    #[inline]
    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
        match pos {
            std::io::SeekFrom::Start(position) => {
                self.position = position;
                Ok(self.position)
            }
            std::io::SeekFrom::End(_) => panic!("This reader does not support seeking from end"),
            std::io::SeekFrom::Current(position) => {
                self.position += position;
                Ok(self.position)
            }
        }
    }
}

fn main() {}

I.e. when we do not have seek and need to use data from a Read more than once, we can store it in a buffer and use it whenever it is requested (via seek to the back + read). This idiom uses as little memory as we need to store in a non-seek environment, namely the data that needs to be used twice (once for inference, one for reading).

I hope this is a bit more understandable.

Note that this is a struct to be used specifically for the CSV reader - it is quite useless otherwise since it incurs an extra memcopy and memory usage.

@domoritz
Copy link
Member

Could we use the buffered reader from the standard library if it had a way to set the buffer size?

@corneliusroemer
Copy link
Author

Thanks @jorgecarleitao. I'm a beginner at Rust so please bear with me. How do you decide how big the buffer will get? Will all of the input be read into this buffer?

Does your code allow to release the buffer once schema inference has been completed? That's how it would be best: read into memory until schema inference is done, then release. Schema inference options would determine how much is read into memory. This could be set by the user to say first 100 lines to keep buffer memory usage down.

@jorgecarleitao
Copy link
Member

I'm a beginner at Rust so please bear with me.

No worries, we are all learning :)

How do you decide how big the buffer will get?

I think we can't: a single CSV line can be megabytes long. Part of parsing CSV is deciding how much bytes are needed per "cell" (row, column).

The code above allocates for as long as the inference requires them (so, driven by the number of lines for inference + size of first lines in the specific CSV).

Does your code allow to release the buffer once schema inference has been completed? That's how it would be best: read into memory until schema inference is done, then release.

Not sure I understood: isn't it the idea that we want to both infer and parse those lines into arrow, but we can't seek to repeat the operation? If that is the case, I think that we can't release the buffer once the inference is done: we can only release it once the data has been infered and parsed into columns. The code above works under this hypothesis: once we move the position past the buffer size, we release (see // release memory comment on the code).

@Jefffrey
Copy link
Contributor

This issue seems good to close as current infer schema logic in arrow-csv takes in a Read generic param and also reads only a specified amount of rows for inference:

/// Infer schema of CSV records from the provided `reader`
///
/// If `max_records` is `None`, all records will be read, otherwise up to `max_records`
/// records are read to infer the schema
///
/// Returns inferred schema and number of records read
pub fn infer_schema<R: Read>(
&self,
reader: R,
max_records: Option<usize>,
) -> Result<(Schema, usize), ArrowError> {

cc @tustvold

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

5 participants