Skip to content

Commit

Permalink
Adapt the POC to newer version of the do put extended protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed May 13, 2024
1 parent 65b83b7 commit 29708ef
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 175 deletions.
61 changes: 22 additions & 39 deletions clade/proto/flight.proto
Expand Up @@ -2,54 +2,37 @@ syntax = "proto3";

package clade.flight;

message DoPutCommand {
enum CommandType {
// Protobuf pattern, not used
UNKNOWN = 0;
import "schema.proto";

// Data: updated or inserted rows. If a column is omitted,
// this assumes it's unchanged.
UPSERT = 1;
message DataPutCommand {
// Path to the Delta Table, relative to the root URL of the store below.
// Will be created based on the schema of the RecordBatch if doesn't exist.
// e.g. path/to/table
string path = 1;

// Data: columns forming the primary keys / replica identity
// of the deleted rows.
DELETE = 2;
}
// Root URL (e.g. s3://my-bucket) and connection options for the table's path
schema.StorageLocation store = 2;

CommandType type = 1;

// TODO: Table ref or URL of the Delta Table?
string table = 2;

// Primary key columns
// Primary key columns. Must not be empty: the table must always
// have a replica identity / PK.
repeated string pk_column = 3;

// Opaque string identifying PGD origin
optional string origin = 4;
string origin = 4;

// Monotonically-increasing transaction number (e.g. an LSN)
optional uint64 sequence_number = 5;
// Monotonically-increasing transaction number (e.g. the LSN)
uint64 sequence_number = 5;
}

message DoPutResult {
enum AckType {
UNKNOWN = 0;

// Seafowl is overloaded and can't accept the change (e.g.
// can't flush fast enough). The client should wait and retry.
// TODO: we can block when receiving FlightData instead?
BACKPRESSURE = 1;
message DataPutResult {
// If false, Seafowl is overloaded and can't accept the change (e.g.
// can't flush fast enough). The client should wait and retry.
// TODO: also schema mismatch errors
bool accepted = 1;

// Change acknowledged and is in-memory. Will get lost if
// the Seafowl instance is restarted
MEMORY = 2;
// Sequence number up to which the changes are in Seafowl's memory.
uint64 memory_sequence_number = 2;

// Change, and all changes from this origin, has been flushed
// to durable storage (i.e. Delta Lake).
DURABLE = 3;
}

AckType type = 1;
// Sequence number up to which the changes are in Delta Lake.
uint64 durable_sequence_number = 3;
}


6 changes: 3 additions & 3 deletions clade/src/lib.rs
Expand Up @@ -17,14 +17,14 @@ pub mod flight {
// This in turn allows us to circumvent the default `do_put`, and use our
// custom message/fields in `do_put_fallback` to convey additional information
// about the command.
impl ProstMessageExt for DoPutCommand {
impl ProstMessageExt for DataPutCommand {
fn type_url() -> &'static str {
"DoPutCommand"
"DataPutCommand"
}

fn as_any(&self) -> Any {
Any {
type_url: "DoPutCommand".to_string(),
type_url: "DataPutCommand".to_string(),
value: self.encode_to_vec().into(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Expand Up @@ -34,7 +34,7 @@ pub struct Metastore {
pub tables: Arc<dyn TableStore>,
pub functions: Arc<dyn FunctionStore>,
staging_schema: Arc<MemorySchemaProvider>,
object_stores: Arc<ObjectStoreFactory>,
pub object_stores: Arc<ObjectStoreFactory>,
}

impl Metastore {
Expand Down
18 changes: 18 additions & 0 deletions src/config/schema.rs
Expand Up @@ -232,13 +232,31 @@ pub struct Frontend {
pub struct FlightFrontend {
pub bind_host: String,
pub bind_port: u16,
pub put_config: PutDataConfig,
}

impl Default for FlightFrontend {
fn default() -> Self {
Self {
bind_host: "127.0.0.1".to_string(),
bind_port: 47470,
put_config: Default::default(),
}
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(default)]
pub struct PutDataConfig {
pub max_in_memory_bytes_total: u64,
pub max_in_memory_bytes_table: u64,
}

impl Default for PutDataConfig {
fn default() -> Self {
Self {
max_in_memory_bytes_total: 3 * 1024 * 1024 * 1024,
max_in_memory_bytes_table: 1024 * 1024 * 1024,
}
}
}
Expand Down
155 changes: 115 additions & 40 deletions src/frontend/flight/handler.rs
@@ -1,23 +1,29 @@
use crate::context::delta::CreateDeltaTableDetails;
use crate::context::SeafowlContext;
use arrow::record_batch::RecordBatch;
use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
use arrow_flight::sql::SqlInfo;
use arrow_schema::SchemaRef;
use clade::flight::do_put_result::AckType;
use clade::flight::{DataPutCommand, DataPutResult};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{DataFusionError, TableReference};
use datafusion_common::DataFusionError;
use deltalake::kernel::Schema as DeltaSchema;
use deltalake::operations::create::CreateBuilder;
use deltalake::DeltaTable;
use lazy_static::lazy_static;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::metadata::MetadataMap;
use tonic::Status;
use tracing::{error, info};
use tracing::{debug, error, info};
use url::Url;

pub const SEAFOWL_PUT_DATA_UD_FLAG: &str = "__seafowl_ud";
const SEAFOWL_PUT_DATA_ORIGIN: &str = "origin";
const SEAFOWL_PUT_DATA_SEQUENCE_NUMBER: &str = "sequence";

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand All @@ -38,7 +44,7 @@ lazy_static! {
pub(super) struct SeafowlFlightHandler {
pub context: Arc<SeafowlContext>,
pub results: Arc<DashMap<String, Mutex<SendableRecordBatchStream>>>,
pub writes: Arc<DashMap<String, Vec<RecordBatch>>>,
pub writes: Arc<DashMap<String, (u64, Vec<RecordBatch>)>>,
}

impl SeafowlFlightHandler {
Expand Down Expand Up @@ -98,44 +104,113 @@ impl SeafowlFlightHandler {
Ok(batch_stream_mutex.into_inner())
}

pub async fn process_batches(
pub async fn process_put_cmd(
&self,
table: String,
cmd: DataPutCommand,
mut batches: Vec<RecordBatch>,
) -> Result<AckType> {
match self.writes.entry(table.clone()) {
) -> Result<DataPutResult> {
let store_loc = cmd.store.unwrap().clone();
let log_store = self
.context
.metastore
.object_stores
.get_log_store_for_table(
Url::parse(&store_loc.location).map_err(|e| {
DataFusionError::Execution(format!(
"Couldn't parse put location: {e}"
))
})?,
store_loc.options,
cmd.path,
)?;

let url = log_store.root_uri();
// Check if a table exists yet in the provided location, and if not create one
let dur_seq = if !log_store.is_delta_table_location().await? {
let schema = batches.first().unwrap().schema();
let delta_schema = DeltaSchema::try_from(schema.as_ref())?;

debug!(
"Creating new Delta table at location: {}",
log_store.root_uri()
);
CreateBuilder::new()
.with_log_store(log_store)
.with_columns(delta_schema.fields().clone())
.with_comment(format!("Created by Seafowl {}", env!("CARGO_PKG_VERSION")))
.with_metadata([
(
SEAFOWL_PUT_DATA_ORIGIN.to_string(),
Value::String(cmd.origin),
),
(
SEAFOWL_PUT_DATA_SEQUENCE_NUMBER.to_string(),
Value::Number(cmd.sequence_number.into()),
),
])
.await?;
None
} else {
let mut table = DeltaTable::new(log_store, Default::default());
table.load().await?;

// TODO: handle all edge cases with missing/un-parsable sequence numbers
let commit_infos = table.history(Some(1)).await?;
match commit_infos
.last()
.expect("Table has non-zero comits")
.info
.get(SEAFOWL_PUT_DATA_SEQUENCE_NUMBER)
{
Some(Value::Number(seq)) => seq.as_u64(),
_ => None,
}
};

let num_rows = batches
.iter()
.fold(0, |rows, batch| rows + batch.num_rows());
if num_rows == 0 {
debug!("Received empty batches, returning current sequence numbers");

let dur_seq = dur_seq.unwrap_or(0);
let mem_seq = self
.writes
.get(&url)
.map(|r| r.value().0)
.unwrap_or(dur_seq);

let result = DataPutResult {
accepted: false,
memory_sequence_number: mem_seq,
durable_sequence_number: dur_seq,
};
return Ok(result);
}

debug!("Processing data change with {num_rows} rows for url {url}");
let _size = batches
.iter()
.fold(0, |bytes, batch| bytes + batch.get_array_memory_size());

match self.writes.entry(url) {
Entry::Occupied(mut entry) => {
let schema = batches.first().unwrap().schema().clone();
entry.get_mut().append(&mut batches);

if entry
.get()
.iter()
.fold(0, |rows, batch| rows + batch.num_rows())
> 3
{
let batches = entry.remove();

let table_ref = TableReference::bare(table.clone());
self.context
.create_delta_table(
table_ref,
CreateDeltaTableDetails::EmptyTable(schema.as_ref().clone()),
)
.await?;

let plan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(&[batches], schema, None).unwrap());

self.context.plan_to_delta_table(table, &plan).await?;
Ok(AckType::Durable)
} else {
Ok(AckType::Memory)
}
entry.get_mut().1.append(&mut batches);
let result = DataPutResult {
accepted: true,
memory_sequence_number: cmd.sequence_number,
durable_sequence_number: 0,
};
Ok(result)
}
Entry::Vacant(entry) => {
entry.insert(batches);
Ok(AckType::Memory)
entry.insert((cmd.sequence_number, batches));
let result = DataPutResult {
accepted: true,
memory_sequence_number: cmd.sequence_number,
durable_sequence_number: 0,
};
Ok(result)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/flight/mod.rs
Expand Up @@ -14,6 +14,8 @@ use std::sync::Arc;

use tonic::transport::Server;

pub use handler::SEAFOWL_PUT_DATA_UD_FLAG;

pub async fn run_flight_server(
context: Arc<SeafowlContext>,
config: FlightFrontend,
Expand Down

0 comments on commit 29708ef

Please sign in to comment.