Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ballista] Support to access remote object store, like HDFS, S3, etc #6

Open
yahoNanJing opened this issue Jan 29, 2022 · 16 comments
Open
Labels
enhancement New feature or request

Comments

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Jan 29, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

After introducing the object store API, to support to access remote object store for Ballista executors, there are still some gap. For example, as #22 and #10 mentioned, ballista is not able to support remote object store.

Describe the solution you'd like

Our workaround is to make the file path self described. For example, a local file path should be file://tmp/..., a hdfs file path should hdfs://localhost:xxx:/tmp/...

@yahoNanJing yahoNanJing added the enhancement New feature or request label Jan 29, 2022
@matthewmturner
Copy link
Contributor

@yahoNanJing fyi @seddonm1 and I have been working on https://github.com/datafusion-contrib/datafusion-objectstore-s3. Still early stages but would be great to have more use cases that we could steer development with.

@thinkharderdev
Copy link
Contributor

This PR would support this use case: apache/datafusion#1677

@yahoNanJing
Copy link
Contributor Author

Hi @matthewmturner, thanks for your info. It's really helpful example for creating another independent crate, like datafusion-objectstore-hdfs. However, it would better to avoid new object store registration for cases like providing sql service without any places for users to do manually registration. And it's not a good way to do the registration for each sql or each session, since it's quite stable for the data sources the service provides for.

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Feb 7, 2022

This PR would support this use case: apache/datafusion#1677

Thanks @thinkharderdev. I think it's better for the path to be self-described, like s3://hostname:port/xxx/... or hdfs://hostname:port/xxx/.... Then the object store management service can detect which remote object store will be used.

If possible, it's better to provide a way to avoid remote object store registration.

@matthewmturner
Copy link
Contributor

@yahoNanJing there actually already is a hdfs extension here https://github.com/datafusion-contrib/datafusion-hdfs-native.

Understood on your point - in that case what would be your ideal API for accessing S3? I would have thought the service could register the ObjectStore to share with clients and clients could access after registration at service level had been completed.

@thinkharderdev
Copy link
Contributor

This PR would support this use case: apache/datafusion#1677

Thanks @thinkharderdev. I think it's better for the path to be self-described, like s3://hostname:port/xxx/... or hdfs://hostname:port/xxx/.... Then the object store management service can detect which remote object store will be used.

If possible, it's better to provide a way to avoid remote object store registration.

Not sure I follow. I agree that the object store should be resolved from the file URI, but we would still have to register an ObjectStore in the ExecutionContext in order to use it right?

@yahoNanJing
Copy link
Contributor Author

Hi @matthewmturner and @thinkharderdev, what I'm thinking is to introduce some remote object store extensions as Datafusion features. These extensions will be inclusive crates and are managed independently so that this solution will not introduce additional maintenance effort for Datafusion.

However, this solution is blocked by apache/datafusion#1772. After the datasource module is splitted into an independent crate. Then the remote object store extensions will be able to just depend on that crate. In other places, we can setup whether to enable the extension features or not. By this way, we can avoid cyclic dependency.

Further to say, for the default object stores in the ObjectStoreRegistry, maybe better to make it configurable so that we can avoid remote object store registration manually.

@milenkovicm
Copy link

Had a quick look into this issue, and from what I can see, there is not nothing missing on datafusion side to have this functionality (apart from some hard work :)).

Team did a great job to add support for object store in datafusion:

use std::sync::Arc;
use datafusion::{
    datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl},
    prelude::SessionContext,
};
use log::info;
use object_store::aws::AmazonS3Builder;

    let ctx = SessionContext::new();

    let s3 = AmazonS3Builder::new()
        .with_region("us-east-1")
        .with_bucket_name("testbucket")
        .with_access_key_id("MINIO")
        .with_secret_access_key("MINIO/MINIO")
        .with_endpoint("http://localhost:9000")
        .with_allow_http(true)
        .build()
        .unwrap();

    let s3 = Arc::new(s3);

    ctx.runtime_env()
        .register_object_store("s3", "localhost:9000", s3);

    let url = ListingTableUrl::parse("s3://localhost:9000/testpath/").unwrap();

    let config = ListingTableConfig::new(url)
        .infer(&ctx.state())
        .await
        .unwrap();

    let table = ListingTable::try_new(config).unwrap();
    ctx.register_table("test", Arc::new(table)).unwrap();

    ctx.sql("SELECT * FROM test")
        .await
        .unwrap()
        .show()
        .await
        .unwrap();

I give quick try with ballista standalone, changing code a bit to expose RuntimeEnv on client, scheduler, and executor and registering store on each of them manually. At the end, it did produce correct result. Currently getting to a RuntimeEnv is not "walk in a park", few hacks here and there were needed, but it is not hard to make it easier. It would then be possible load object store providers from configuration files.

Alternatively register_object_store can be provided directly on the BallistaContext and then somehow object store configuration may be magically serialized and handled on other actors in the system. AmazonS3Builder should probably be modified so it can be serialized.

@avantgardnerio
Copy link
Contributor

avantgardnerio commented Sep 1, 2022

@yahoNanJing this issue seems related to apache/datafusion#3311 where we are working towards allowing users to register delta-rs tables dynamically through SQL at runtime in Ballista.

@milenkovicm
Copy link

@avantgardnerio SQL would be perfect fit

@avantgardnerio
Copy link
Contributor

avantgardnerio commented Sep 1, 2022

ObjectStore in the ExecutionContext in order to use it right?

I think the problem is that this must happen dynamically in the case of a DataFusion executor in Ballista. The solution I am proposing is a TableProviderFactory in apache/datafusion#3311

Edit: by dynamically, I mean the name of the table and the path to it will not be known at compile time.

@saikrishna1-bidgely
Copy link
Contributor

saikrishna1-bidgely commented Jan 4, 2023

Is loading the data from S3 possible with a distributed setup right now? If so, can you provide a small example? I tried with a path on S3 and it failed with "no object store available for s3" error. This happened after I added the s3 feature to my project. Looks like s3 feature hasn't been added to scheduler and executor.

@ahmedriza
Copy link

ahmedriza commented Feb 10, 2023

@saikrishna1-bidgely, here's an example of what I tried and this worked. I'm not 100% sure that this is how it is supposed to be :-)

  • Build the scheduler with the s3 feature enabled, i.e. modify its Cargo.toml:
ballista-core = { path = "../core", version = "0.10.0" , features = ["s3"] }
  • Define the following environment variables (I tested with a MinIO instance), and ensure that they are available to the scheduler and executor processes.
AWS_DEFAULT_REGION
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_ENDPOINT

You may need to define additional AWS environment variables depending on your S3 service.

  • Start scheduler and executor

I tested with the following sample code:

use ballista::prelude::BallistaContext;
use ballista_core::config::BallistaConfig;
use datafusion::prelude::ParquetReadOptions;

#[tokio::main]
pub async fn main()  {
    let config = BallistaConfig::builder().build().unwrap();
    let ctx = BallistaContext::remote("localhost", 50050, &config).await.unwrap();
    let filename = "s3://foo/test.parquet";
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?;
    let rows = df.count().await?;
    println!("rows: {}", rows);
}

The code correctly returns the number of rows in the Parquet file:

rows: 15309

Last bit of logging from the scheduler process:

2023-02-10T23:16:11.345868Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::display: === [pRdAqhp/2] Stage finished, physical plan with metrics ===
ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, repart_time=1ns, write_time=721.748µs]
  AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalescePartitionsExec, metrics=[]
      ShuffleReaderExec: partitions=1, metrics=[]


2023-02-10T23:16:11.346163Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::state::execution_graph: Job pRdAqhp is success, finalizing output partitions
2023-02-10T23:16:11.346354Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::scheduler_server::query_stage_scheduler: Job pRdAqhp success

From the executor process:

2023-02-10T23:16:11.241689Z  INFO          task_runner ThreadId(22) ballista_executor::metrics: === [pRdAqhp/2/0] Physical plan with metrics ===
ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, write_time=721.748µs, repart_time=1ns]
  AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalescePartitionsExec, metrics=[]
      ShuffleReaderExec: partitions=1, metrics=[]

Hope this helps.

@saikrishna1-bidgely
Copy link
Contributor

@ahmedriza can you make this into a PR pls. That would be a lot helpful.

@saikrishna1-bidgely
Copy link
Contributor

@ahmedriza I tried what you suggested. I built the scheduler and executor with the s3 feature added to ballista-core dependency in cargo.toml. But I'm getting the same error: Error: DataFusionError(Execution("No object store available for s3://ballista-test-bucket/temp.csv")). I'm running this on Windows 10. Also, I'm running the code in another repo with this as cargo.toml:

[package]
name = "ballista-test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ballista = "0.11.0"
datafusion = "18.0.0"
tokio = "1.0"
parquet = "29.0.0"

@YuriyGavrilov
Copy link

YuriyGavrilov commented Oct 6, 2023

There is also could be nice to have an uplink support of storj network https://github.com/storj/uplink (Rust bindings for libuplink https://github.com/storj-thirdparty/uplink-rust). It provide a direct access to the storj network avoiding s3 gateway bottleneck. (https://github.com/storj/storj/wiki/Libuplink-Walkthrough)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

9 participants