Skip to content

Commit

Permalink
Implement basic PoC demonstrating append-only do put endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Apr 30, 2024
1 parent c29e2b3 commit 71d523c
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clade/Cargo.toml
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
arrow-flight = { workspace = true }
prost = "0.12"
tonic = "0.11"

Expand Down
2 changes: 1 addition & 1 deletion clade/build.rs
Expand Up @@ -7,7 +7,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.file_descriptor_set_path(out_dir.join("clade_descriptor.bin"))
.build_server(true)
.build_client(true)
.compile(&["proto/schema.proto"], &["proto"])?;
.compile(&["proto/schema.proto", "proto/flight.proto"], &["proto"])?;

Ok(())
}
55 changes: 55 additions & 0 deletions clade/proto/flight.proto
@@ -0,0 +1,55 @@
syntax = "proto3";

package clade.flight;

message DoPutCommand {
enum CommandType {
// Protobuf pattern, not used
UNKNOWN = 0;

// Data: updated or inserted rows. If a column is omitted,
// this assumes it's unchanged.
UPSERT = 1;

// Data: columns forming the primary keys / replica identity
// of the deleted rows.
DELETE = 2;
}

CommandType type = 1;

// TODO: Table ref or URL of the Delta Table?
string table = 2;

// Primary key columns
repeated string pk_column = 3;

// Opaque string identifying PGD origin
optional string origin = 4;

// Monotonically-increasing transaction number (e.g. an LSN)
optional uint64 sequence_number = 5;
}

message DoPutResult {
enum AckType {
UNKNOWN = 0;

// Seafowl is overloaded and can't accept the change (e.g.
// can't flush fast enough). The client should wait and retry.
// TODO: we can block when receiving FlightData instead?
BACKPRESSURE = 1;

// Change acknowledged and is in-memory. Will get lost if
// the Seafowl instance is restarted
MEMORY = 2;

// Change, and all changes from this origin, has been flushed
// to durable storage (i.e. Delta Lake).
DURABLE = 3;
}

AckType type = 1;
}


27 changes: 27 additions & 0 deletions clade/src/lib.rs
Expand Up @@ -3,3 +3,30 @@ pub mod schema {
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("clade_descriptor");
}

pub mod flight {
use arrow_flight::sql::{Any, ProstMessageExt};
use prost::Message;

tonic::include_proto!("clade.flight");

// We need this since the native arrow-flight `do_put` mechanism relies on
// decoding the command into `Any` prior to running it:
// https://github.com/apache/arrow-rs/blob/a61f1dc8ea132add731e4426e11d341a1de5ca92/arrow-flight/src/sql/server.rs#L705-L706
//
// This in turn allows us to circumvent the default `do_put`, and use our
// custom message/fields in `do_put_fallback` to convey additional information
// about the command.
impl ProstMessageExt for DoPutCommand {
fn type_url() -> &'static str {
"DoPutCommand"
}

fn as_any(&self) -> Any {
Any {
type_url: "DoPutCommand".to_string(),
value: self.encode_to_vec().into(),
}
}
}
}
6 changes: 3 additions & 3 deletions src/context/delta.rs
Expand Up @@ -273,13 +273,13 @@ pub async fn plan_to_object_store(
.collect()
}

pub(super) enum CreateDeltaTableDetails {
pub enum CreateDeltaTableDetails {
EmptyTable(Schema),
FromPath(Path),
}

impl SeafowlContext {
pub(super) async fn create_delta_table<'a>(
pub async fn create_delta_table<'a>(
&'a self,
name: impl Into<TableReference<'a>>,
details: CreateDeltaTableDetails,
Expand Down Expand Up @@ -367,7 +367,7 @@ impl SeafowlContext {
}

/// Generate the Delta table builder and execute the write
pub(super) async fn plan_to_delta_table<'a>(
pub async fn plan_to_delta_table<'a>(
&self,
name: impl Into<TableReference<'a>>,
plan: &Arc<dyn ExecutionPlan>,
Expand Down
52 changes: 51 additions & 1 deletion src/frontend/flight/handler.rs
@@ -1,11 +1,17 @@
use crate::context::delta::CreateDeltaTableDetails;
use crate::context::SeafowlContext;
use arrow::record_batch::RecordBatch;
use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
use arrow_flight::sql::SqlInfo;
use arrow_schema::SchemaRef;
use clade::flight::do_put_result::AckType;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion_common::DataFusionError;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{DataFusionError, TableReference};
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -32,13 +38,15 @@ lazy_static! {
pub(super) struct SeafowlFlightHandler {
pub context: Arc<SeafowlContext>,
pub results: Arc<DashMap<String, Mutex<SendableRecordBatchStream>>>,
pub writes: Arc<DashMap<String, Vec<RecordBatch>>>,
}

impl SeafowlFlightHandler {
pub fn new(context: Arc<SeafowlContext>) -> Self {
Self {
context,
results: Arc::new(Default::default()),
writes: Arc::new(Default::default()),
}
}

Expand Down Expand Up @@ -89,4 +97,46 @@ impl SeafowlFlightHandler {

Ok(batch_stream_mutex.into_inner())
}

pub async fn process_batches(
&self,
table: String,
mut batches: Vec<RecordBatch>,
) -> Result<AckType> {
match self.writes.entry(table.clone()) {
Entry::Occupied(mut entry) => {
let schema = batches.first().unwrap().schema().clone();
entry.get_mut().append(&mut batches);

if entry
.get()
.iter()
.fold(0, |rows, batch| rows + batch.num_rows())
> 3
{
let batches = entry.remove();

let table_ref = TableReference::bare(table.clone());
self.context
.create_delta_table(
table_ref,
CreateDeltaTableDetails::EmptyTable(schema.as_ref().clone()),
)
.await?;

let plan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(&[batches], schema, None).unwrap());

self.context.plan_to_delta_table(table, &plan).await?;
Ok(AckType::Durable)
} else {
Ok(AckType::Memory)
}
}
Entry::Vacant(entry) => {
entry.insert(batches);
Ok(AckType::Memory)
}
}
}
}
49 changes: 47 additions & 2 deletions src/frontend/flight/sql.rs
@@ -1,17 +1,20 @@
use crate::frontend::flight::handler::{SeafowlFlightHandler, SEAFOWL_SQL_DATA};
use arrow::record_batch::RecordBatch;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::sql::server::FlightSqlService;
use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
use arrow_flight::sql::{
CommandGetSqlInfo, CommandStatementQuery, ProstMessageExt, SqlInfo,
Any, CommandGetSqlInfo, CommandStatementQuery, ProstMessageExt, SqlInfo,
TicketStatementQuery,
};
use arrow_flight::{
FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
Ticket,
};
use async_trait::async_trait;
use clade::flight::{DoPutCommand, DoPutResult};
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
Expand Down Expand Up @@ -145,4 +148,46 @@ impl FlightSqlService for SeafowlFlightHandler {
}

async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}

async fn do_put_fallback(
&self,
request: Request<PeekableFlightDataStream>,
message: Any,
) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
let cmd: DoPutCommand = Message::decode(&*message.value)
.map_err(|err| Status::unknown(format!("Couldn't decode command: {err}")))?;

if !cmd.pk_column.is_empty() {
return Err(Status::unimplemented(
"Append only changes supported for now",
));
}

let table = cmd.table;

let batches: Vec<RecordBatch> = FlightRecordBatchStream::new_from_flight_data(
request.into_inner().map_err(|e| e.into()),
)
.try_collect()
.await?;

let ack_type =
self.process_batches(table.clone(), batches)
.await
.map_err(|err| {
Status::internal(format!(
"Failed processing DoPut for table {table}: {err}"
))
})?;

Ok(Response::new(Box::pin(futures::stream::iter(vec![Ok(
arrow_flight::PutResult {
app_metadata: DoPutResult {
r#type: i32::from(ack_type),
}
.encode_to_vec()
.into(),
},
)]))))
}
}
105 changes: 105 additions & 0 deletions tests/flight/do_put.rs
@@ -0,0 +1,105 @@
use crate::flight::*;
use arrow::array::{Int32Array, StringArray};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_schema::{DataType, Field, Schema};
use clade::flight::do_put_result::AckType;
use clade::flight::{do_put_command::CommandType, DoPutCommand, DoPutResult};
use futures::StreamExt;

#[tokio::test]
async fn test_basic_upload() -> Result<()> {
let (_context, mut client) = flight_server().await;

let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::Int32, true),
Field::new("col_2", DataType::Utf8, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec![Some("one"), Some("two")])),
],
)
.unwrap();

let cmd = DoPutCommand {
r#type: CommandType::Upsert.into(),
table: "do_put_test".to_string(),
origin: Some("test-origin".to_string()),
pk_column: vec![],
sequence_number: Some(123),
};

let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
let flight_stream_builder =
FlightDataEncoderBuilder::new().with_flight_descriptor(Some(descriptor));
let flight_data = flight_stream_builder.build(futures::stream::iter(vec![Ok(batch)]));
let response = client.do_put(flight_data).await?.next().await.unwrap()?;

// Changes are still in memory
let ack_type = AckType::try_from(
DoPutResult::decode(response.app_metadata)
.expect("DoPutResult")
.r#type,
)
.expect("AckType");
assert_eq!(ack_type, AckType::Memory);

let err = get_flight_batches(&mut client, "SELECT * FROM do_put_test".to_string())
.await
.unwrap_err()
.to_string();
assert!(err.contains("Tonic error: status: Internal, message: \"Error during planning: table 'default.public.do_put_test' not found\""));

// Try to append some more data and pass the flush threshold
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec![Some("three"), Some("four")])),
],
)
.unwrap();

let cmd = DoPutCommand {
r#type: CommandType::Upsert.into(),
table: "do_put_test".to_string(),
origin: Some("test-origin".to_string()),
pk_column: vec![],
sequence_number: Some(456),
};

let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
let flight_stream_builder =
FlightDataEncoderBuilder::new().with_flight_descriptor(Some(descriptor));
let flight_data = flight_stream_builder.build(futures::stream::iter(vec![Ok(batch)]));
let response = client.do_put(flight_data).await?.next().await.unwrap()?;

// Changes are now flushed to storage
let ack_type = AckType::try_from(
DoPutResult::decode(response.app_metadata)
.expect("DoPutResult")
.r#type,
)
.expect("AckType");
assert_eq!(ack_type, AckType::Durable);

let results =
get_flight_batches(&mut client, "SELECT * FROM do_put_test".to_string()).await?;
let expected = [
"+-------+-------+",
"| col_1 | col_2 |",
"+-------+-------+",
"| 1 | one |",
"| 2 | two |",
"| 3 | three |",
"| 4 | four |",
"+-------+-------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

0 comments on commit 71d523c

Please sign in to comment.