Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned scan for RegionEngine #3886

Closed
evenyag opened this issue May 8, 2024 · 1 comment
Closed

Partitioned scan for RegionEngine #3886

evenyag opened this issue May 8, 2024 · 1 comment
Assignees
Labels
C-performance Category Performance

Comments

@evenyag
Copy link
Contributor

evenyag commented May 8, 2024

What type of enhancement is this?

API improvement

What does the enhancement do?

The RegionEngine trait provides a handle_query() method to scan a region and returns a stream of RecordBatch.

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError>;

This method is easy to use but has some limitations:

  • The output concurrency is always 1
  • The engine can't return more information about the query to callers

To maximize parallelism in #2806, the engine should provide a way to return multiple streams to scan different partitions of a region concurrently.

Implementation challenges

This issue proposes to add a new method to the region engine which supports partitioned scan. The method returns a trait object that can create a stream according to a partition index.

pub struct ScannerProperties {
    // Properties of the scanner
    // e.g. number of partitions, range of partitions
}

pub trait RegionScanner {
    fn properties(&self) -> &ScannerProperties;

    fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError>;
}

pub type RegionScannerRef = Arc<dyn RegionScanner>;

pub trait RegionEngine {
    async fn handle_partitioned_query(
        &self,
        region_id: RegionId,
        request: ScanRequest,
    ) -> Result<RegionScannerRef, BoxedError>;
}

We could then use the scanner to implement a PhysicalPlan and let the query engine process multiple partitions. We might need to refactor the StreamScanAdapter as it assumes there is only one partition.

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));
let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: mem_usage_metrics,
span,
}))
}

@evenyag
Copy link
Contributor Author

evenyag commented May 20, 2024

closed by #3948

@evenyag evenyag closed this as completed May 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-performance Category Performance
Projects
Status: Done
Development

No branches or pull requests

1 participant