Skip to content

Commit

Permalink
Correct handling of datetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
Psykopear committed Sep 21, 2022
1 parent 86a4fe0 commit bb629e2
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 344 deletions.
482 changes: 230 additions & 252 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ bincode = { version = "1.3.3" }
chrono = { version = "0.4", features = [ "serde" ] }
futures = { version = "0.3.21" }
log = { version = "0.4" }
pyo3 = { version = "0.17.1" }
pyo3-chrono = { version = "0.5.0" }
pyo3-log = { version = "0.7.0" }
pyo3 = { git = "https://github.com/psykopear/pyo3", branch = "chrono", version = "0.17", features = ["macros", "chrono"] }
# pyo3-log = { version = "0.7" }
scopeguard = { version = "1.1.0" }
send_wrapper = { version = "0.6.0" }
serde = { version = "1.0.134" }
Expand All @@ -35,7 +34,7 @@ rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi-vendored", "
rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi", "ssl" ] }

[dev-dependencies]
pyo3 = { version = "0.17.1", default-features = false }
pyo3 = { git = "https://github.com/psykopear/pyo3", branch = "chrono", version = "0.17", default-features = false, features = ["macros", "chrono"] }

[features]
extension-module = ["pyo3/extension-module"]
Expand Down
8 changes: 5 additions & 3 deletions pytests/test_window.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from datetime import timedelta, datetime
from datetime import timedelta, datetime, timezone

from bytewax.dataflow import Dataflow
from bytewax.execution import run_main
Expand All @@ -17,7 +17,8 @@ def gen():

flow.input("inp", TestingInputConfig(gen()))

start_at = datetime(2022, 1, 1)
# Only utc datetime is accepted as the `start_at` parameter
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
# This will result in times for events of +0, +4, +8, +12.
clock_config = TestingClockConfig(item_incr=timedelta(seconds=4), start_at=start_at)
# And since the window is +10, we should get a window with value
Expand Down Expand Up @@ -61,7 +62,8 @@ def count(results, event):
results[event["type"]] += 1
return results

start_at = datetime(2022, 1, 1)
# Only utc datetime is accepted as the `start_at` parameter
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
# This will result in times for events of +0, +4, +8, +12.
clock_config = TestingClockConfig(
item_incr=timedelta(seconds=4), start_at=start_at
Expand Down
3 changes: 1 addition & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub(crate) fn default_epoch_config() -> Py<EpochConfig> {
Python::with_gil(|py| {
PyCell::new(
py,
PeriodicEpochConfig::new(pyo3_chrono::Duration(chrono::Duration::seconds(10))),
PeriodicEpochConfig::new(chrono::Duration::seconds(10)),
)
.unwrap()
.extract()
Expand Down Expand Up @@ -173,7 +173,6 @@ where

let epoch_length = periodic_config
.epoch_length
.0
.to_std()
.map_err(|err| format!("Invalid epoch length: {err:?}"))?;

Expand Down
10 changes: 5 additions & 5 deletions src/execution/periodic_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ use super::EpochConfig;
#[pyo3(text_signature = "(epoch_length)")]
pub(crate) struct PeriodicEpochConfig {
#[pyo3(get)]
pub(crate) epoch_length: pyo3_chrono::Duration,
pub(crate) epoch_length: chrono::Duration,
}

#[pymethods]
impl PeriodicEpochConfig {
#[new]
#[args(epoch_length)]
pub(crate) fn new(epoch_length: pyo3_chrono::Duration) -> (Self, EpochConfig) {
pub(crate) fn new(epoch_length: chrono::Duration) -> (Self, EpochConfig) {
(Self { epoch_length }, EpochConfig {})
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::Duration) {
fn __getstate__(&self) -> (&str, chrono::Duration) {
("PeriodicEpochConfig", self.epoch_length)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::Duration,) {
(pyo3_chrono::Duration(chrono::Duration::zero()),)
fn __getnewargs__(&self) -> (chrono::Duration,) {
(chrono::Duration::zero(),)
}

/// Unpickle from tuple of arguments.
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn sleep_release_gil(py: Python, secs: u64) {
#[pymodule]
#[pyo3(name = "bytewax")]
fn mod_bytewax(py: Python, m: &PyModule) -> PyResult<()> {
pyo3_log::init();
// pyo3_log::init();

dataflow::register(py, m)?;
execution::register(py, m)?;
Expand Down
1 change: 0 additions & 1 deletion src/pyo3_extensions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Newtypes around PyO3 types which allow easier interfacing with
//! Timely or other Rust libraries we use.

use crate::py_unwrap;
use crate::recovery::StateKey;
use crate::try_unwrap;
Expand Down
23 changes: 11 additions & 12 deletions src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::recovery::{EpochData, StateBytes, StateKey};
use crate::recovery::{StateUpdate, StepId};
use crate::recovery::{StatefulLogic, StatefulUnary};
use crate::StringResult;
use chrono::{Duration, NaiveDateTime};
use chrono::{DateTime, Duration, Utc};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -105,8 +105,8 @@ pub(crate) fn build_clock_builder<V: 'static>(
if let Ok(testing_clock_config) = clock_config.downcast::<PyCell<TestingClockConfig>>() {
let testing_clock_config = testing_clock_config.borrow();

let item_incr = testing_clock_config.item_incr.0;
let start_at = testing_clock_config.start_at.0;
let item_incr = testing_clock_config.item_incr;
let start_at = testing_clock_config.start_at;

let builder = TestingClock::builder(item_incr, start_at);
Ok(Box::new(builder))
Expand Down Expand Up @@ -174,11 +174,10 @@ pub(crate) fn build_windower_builder(
if let Ok(tumbling_window_config) = window_config.downcast::<PyCell<TumblingWindowConfig>>() {
let tumbling_window_config = tumbling_window_config.borrow();

let length = tumbling_window_config.length.0;
let length = tumbling_window_config.length;
let start_at = tumbling_window_config
.start_at
.map(|t| t.0)
.unwrap_or_else(|| chrono::offset::Local::now().naive_local());
.unwrap_or_else(Utc::now);

let builder = TumblingWindower::builder(length, start_at);
Ok(Box::new(builder))
Expand All @@ -204,13 +203,13 @@ pub(crate) trait Clock<V> {
///
/// This can mutate the [`ClockState`] on every call to ensure
/// future calls are accurate.
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime;
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc>;

/// Get the time for an item.
///
/// This can mutate the [`ClockState`] if noting that an item has
/// arrived should advance the clock or something.
fn time_for(&mut self, value: &V) -> NaiveDateTime;
fn time_for(&mut self, value: &V) -> DateTime<Utc>;

/// Snapshot the internal state of this clock.
///
Expand Down Expand Up @@ -249,21 +248,21 @@ pub(crate) trait Windower {
/// This should update any internal state.
fn insert(
&mut self,
watermark: &NaiveDateTime,
item_time: NaiveDateTime,
watermark: &DateTime<Utc>,
item_time: DateTime<Utc>,
) -> Vec<Result<WindowKey, InsertError>>;

/// Look at the current watermark, determine which windows are now
/// closed, return them, and remove them from internal state.
fn drain_closed(&mut self, watermark: &NaiveDateTime) -> Vec<WindowKey>;
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey>;

/// Return how much system time should pass before the windowing
/// operator should be activated again, even if there is no
/// incoming values.
///
/// Use this to signal to Timely's execution when windows will be
/// closing so we can close them without data.
fn activate_after(&mut self, watermark: &NaiveDateTime) -> Option<Duration>;
fn activate_after(&mut self, watermark: &DateTime<Utc>) -> Option<Duration>;

/// Snapshot the internal state of this windower.
///
Expand Down
10 changes: 5 additions & 5 deletions src/window/system_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use pyo3::prelude::*;
use pyo3::{exceptions::PyValueError, PyResult};

Expand Down Expand Up @@ -55,15 +55,15 @@ impl SystemClock {
}

impl<V> Clock<V> for SystemClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
_ => chrono::offset::Local::now().naive_local(),
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => Utc::now(),
}
}

fn time_for(&mut self, item: &V) -> NaiveDateTime {
fn time_for(&mut self, item: &V) -> DateTime<Utc> {
let next_value = Poll::Ready(Some(item));
self.watermark(&next_value)
}
Expand Down
73 changes: 41 additions & 32 deletions src/window/testing_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::{Duration, NaiveDateTime};
use chrono::{DateTime, Duration, Utc};
use pyo3::{exceptions::PyValueError, prelude::*};

use super::{Clock, ClockConfig};
Expand Down Expand Up @@ -29,9 +29,9 @@ use crate::recovery::StateBytes;
#[pyo3(text_signature = "(item_incr)")]
pub(crate) struct TestingClockConfig {
#[pyo3(get)]
pub(crate) item_incr: pyo3_chrono::Duration,
pub(crate) item_incr: chrono::Duration,
#[pyo3(get)]
pub(crate) start_at: pyo3_chrono::NaiveDateTime,
pub(crate) start_at: DateTime<Utc>,
}

#[pymethods]
Expand All @@ -44,10 +44,7 @@ impl TestingClockConfig {

#[new]
#[args(item_incr, start_at)]
fn new(
item_incr: pyo3_chrono::Duration,
start_at: pyo3_chrono::NaiveDateTime,
) -> (Self, ClockConfig) {
fn new(item_incr: chrono::Duration, start_at: DateTime<Utc>) -> (Self, ClockConfig) {
(
Self {
item_incr,
Expand All @@ -58,16 +55,13 @@ impl TestingClockConfig {
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::Duration, pyo3_chrono::NaiveDateTime) {
fn __getstate__(&self) -> (&str, chrono::Duration, DateTime<Utc>) {
("TestingClockConfig", self.item_incr, self.start_at)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::Duration, pyo3_chrono::NaiveDateTime) {
(
pyo3_chrono::Duration(Duration::zero()),
pyo3_chrono::NaiveDateTime(chrono::naive::MAX_DATETIME),
)
fn __getnewargs__(&self) -> (chrono::Duration, DateTime<Utc>) {
(chrono::Duration::zero(), DateTime::<Utc>::MAX_UTC)
}

/// Unpickle from tuple of arguments.
Expand All @@ -87,13 +81,13 @@ impl TestingClockConfig {
/// Simulate system time for tests. Increment "now" after each item.
pub(crate) struct TestingClock {
item_incr: Duration,
current_time: NaiveDateTime,
current_time: DateTime<Utc>,
}

impl TestingClock {
pub(crate) fn builder<V>(
item_incr: Duration,
start_at: NaiveDateTime,
start_at: DateTime<Utc>,
) -> impl Fn(Option<StateBytes>) -> Box<dyn Clock<V>> {
move |resume_state_bytes: Option<StateBytes>| {
let current_time = resume_state_bytes.map(StateBytes::de).unwrap_or(start_at);
Expand All @@ -107,17 +101,20 @@ impl TestingClock {
}

impl<V> Clock<V> for TestingClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => self.current_time,
}
}

fn time_for(&mut self, _item: &V) -> NaiveDateTime {
fn time_for(&mut self, _item: &V) -> DateTime<Utc> {
let item_time = self.current_time;
self.current_time += self.item_incr;
self.current_time = self
.current_time
.checked_add_signed(self.item_incr)
.unwrap_or(DateTime::<Utc>::MAX_UTC);
item_time
}

Expand All @@ -128,34 +125,46 @@ impl<V> Clock<V> for TestingClock {

#[test]
fn test_testing_clock() {
use chrono::{NaiveDate, NaiveTime};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};

let mut clock = TestingClock {
item_incr: Duration::seconds(1),
current_time: NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 0, 0),
current_time: DateTime::from_utc(
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 0, 0),
),
Utc,
),
};
assert_eq!(
clock.time_for(&"x"),
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 0, 0)
DateTime::<Utc>::from_utc(
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 0, 0),
),
Utc
)
);
assert_eq!(
clock.time_for(&"y"),
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 1, 0)
DateTime::<Utc>::from_utc(
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 1, 0),
),
Utc
)
);
assert_eq!(
clock.time_for(&"z"),
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 2, 0)
DateTime::<Utc>::from_utc(
NaiveDateTime::new(
NaiveDate::from_ymd(2022, 1, 1),
NaiveTime::from_hms_milli(0, 0, 2, 0),
),
Utc
)
);
}

0 comments on commit bb629e2

Please sign in to comment.