Skip to content

Commit

Permalink
chore: update to fluvio 0.11.8
Browse files Browse the repository at this point in the history
  • Loading branch information
digikata committed May 18, 2024
1 parent 0ef68ff commit 5141126
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "_fluvio_python"
version = "0.16.1"
version = "0.16.2"
edition = "2021"
authors = ["Fluvio Contributors <team@fluvio.io>"]

Expand Down Expand Up @@ -37,7 +37,7 @@ futures = "0.3.30"
async-lock = "3.3.0"

fluvio-future = { version = "0.6.2", features = ["task", "io", "native2_tls", "subscriber"] }
fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.5" }
fluvio-types = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.6" }
fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.5" }
fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.5" }
fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.8" }
fluvio-types = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.8" }
fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.8" }
fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.11.8" }
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Fluvio {
partition: u32,
py: Python,
) -> PyResult<PartitionConsumer> {
#[allow(deprecated)]
Ok(PartitionConsumer(py.allow_threads(move || {
run_block_on(self.0.partition_consumer(topic, partition)).map_err(error_to_py_err)
})?))
Expand All @@ -140,6 +141,7 @@ impl Fluvio {
strategy: PartitionSelectionStrategy,
py: Python,
) -> PyResult<MultiplePartitionConsumer> {
#[allow(deprecated)]
Ok(MultiplePartitionConsumer(py.allow_threads(move || {
run_block_on(self.0.consumer(strategy.into_inner())).map_err(error_to_py_err)
})?))
Expand Down Expand Up @@ -434,6 +436,7 @@ impl Clone for PartitionConsumer {
#[pymethods]
impl PartitionConsumer {
fn stream(&self, offset: &Offset) -> Result<PartitionConsumerStream, FluvioError> {
#[allow(deprecated)]
Ok(PartitionConsumerStream {
inner: Box::pin(run_block_on(self.0.stream(offset.0.clone()))?),
})
Expand All @@ -442,6 +445,7 @@ impl PartitionConsumer {
let sl = self.clone();
let offset = offset.0.clone();
pyo3_asyncio::async_std::future_into_py(py, async move {
#[allow(deprecated)]
let stream =
sl.0.stream(offset)
.await
Expand All @@ -461,6 +465,7 @@ impl PartitionConsumer {
let config: NativeConsumerConfig = config.build()?.0;

Ok(py.allow_threads(move || {
#[allow(deprecated)]
run_block_on(self.0.stream_with_config(offset.0.clone(), config)).map(|stream| {
PartitionConsumerStream {
inner: Box::pin(stream),
Expand All @@ -478,6 +483,7 @@ impl PartitionConsumer {
let offset = offset.0.clone();
let config: NativeConsumerConfig = config.build()?.0;
pyo3_asyncio::async_std::future_into_py(py, async move {
#[allow(deprecated)]
let stream =
sl.0.stream_with_config(offset, config)
.await
Expand All @@ -501,6 +507,7 @@ impl Clone for MultiplePartitionConsumer {
#[pymethods]
impl MultiplePartitionConsumer {
fn stream(&self, offset: &Offset, py: Python) -> Result<PartitionConsumerStream, FluvioError> {
#[allow(deprecated)]
Ok(PartitionConsumerStream {
inner: Box::pin(
py.allow_threads(move || run_block_on(self.0.stream(offset.0.clone())))?,
Expand All @@ -510,6 +517,7 @@ impl MultiplePartitionConsumer {
fn async_stream<'b>(&'b self, offset: &Offset, py: Python<'b>) -> PyResult<&PyAny> {
let sl = self.clone();
let offset = offset.0.clone();
#[allow(deprecated)]
pyo3_asyncio::async_std::future_into_py(py, async move {
let stream =
sl.0.stream(offset)
Expand All @@ -530,6 +538,7 @@ impl MultiplePartitionConsumer {
let config: NativeConsumerConfig = config.build()?.0;

Ok(py.allow_threads(move || {
#[allow(deprecated)]
run_block_on(self.0.stream_with_config(offset.0.clone(), config)).map(|stream| {
PartitionConsumerStream {
inner: Box::pin(stream),
Expand All @@ -547,6 +556,7 @@ impl MultiplePartitionConsumer {
let offset = offset.0.clone();
let config: NativeConsumerConfig = config.build()?.0;
pyo3_asyncio::async_std::future_into_py(py, async move {
#[allow(deprecated)]
let stream =
sl.0.stream_with_config(offset, config)
.await
Expand Down Expand Up @@ -1121,6 +1131,7 @@ impl PartitionMap {
inner: NativePartitionMap {
id: partition,
replicas,
..Default::default()
},
}
}
Expand Down

0 comments on commit 5141126

Please sign in to comment.