Skip to content

Commit

Permalink
Merge pull request #115 from bytewax/explore-datetimes-issues
Browse files Browse the repository at this point in the history
Only use UTC datetimes
  • Loading branch information
Psykopear committed Sep 27, 2022
2 parents 7ab9b67 + 07b5edd commit 4defb35
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 318 deletions.
479 changes: 237 additions & 242 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ bincode = { version = "1.3.3" }
chrono = { version = "0.4", features = [ "serde" ] }
futures = { version = "0.3.21" }
log = { version = "0.4" }
pyo3 = { version = "0.17.1" }
pyo3-chrono = { version = "0.5.0" }
pyo3-log = { version = "0.7.0" }
# TODO: PyO3 pinned to current latest main commit, which includes pyo3->chrono integration
# Change to version "0.17.2" as soon as it is released
# pyo3 = { version = "0.17.2", features = ["macros", "chrono"] }
pyo3 = { git = "https://github.com/pyo3/pyo3", rev = "86ce4d1a1", version = "0.17", features = ["macros", "chrono"] }
# TODO: pyo3-log too is pinned to a fork, so that it compiles together with the unreleased PyO3 version.
# Change this to the version "0.7" as soon as PyO3 "0.17.2" itself is released.
# The current pyo3-log version should work as it is, since it depends on pyo3 "~0.17".
# pyo3-log = "0.7"
pyo3-log = { git = "https://github.com/psykopear/pyo3-log", rev = "802b6f4", version = "0.7" }
scopeguard = { version = "1.1.0" }
send_wrapper = { version = "0.6.0" }
serde = { version = "1.0.134" }
Expand All @@ -35,7 +41,10 @@ rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi-vendored", "
rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi", "ssl" ] }

[dev-dependencies]
pyo3 = { version = "0.17.1", default-features = false }
# TODO: PyO3 pinned to current latest main commit, which includes pyo3->chrono integration
# Change to version "0.17.2" as soon as it is released
# pyo3 = { version = "0.17.2", default-features = false, features = ["macros", "chrono"] }
pyo3 = { git = "https://github.com/pyo3/pyo3", rev = "86ce4d1a1", version = "0.17", default-features = false, features = ["macros", "chrono"] }

[features]
extension-module = ["pyo3/extension-module"]
Expand Down
6 changes: 3 additions & 3 deletions pytests/test_operators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from threading import Event

from pytest import fixture, raises
Expand Down Expand Up @@ -342,7 +342,7 @@ def running_count(type_to_count, event):


def test_reduce_window(recovery_config):
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down Expand Up @@ -406,7 +406,7 @@ def add(acc, x):


def test_fold_window(recovery_config):
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down
4 changes: 2 additions & 2 deletions pytests/test_window.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import timedelta, datetime, timezone

from bytewax.dataflow import Dataflow
from bytewax.execution import run_main
Expand All @@ -9,7 +9,7 @@


def test_tumbling_window():
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down
3 changes: 1 addition & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub(crate) fn default_epoch_config() -> Py<EpochConfig> {
Python::with_gil(|py| {
PyCell::new(
py,
PeriodicEpochConfig::new(pyo3_chrono::Duration(chrono::Duration::seconds(10))),
PeriodicEpochConfig::new(chrono::Duration::seconds(10)),
)
.unwrap()
.extract()
Expand Down Expand Up @@ -173,7 +173,6 @@ where

let epoch_length = periodic_config
.epoch_length
.0
.to_std()
.map_err(|err| format!("Invalid epoch length: {err:?}"))?;

Expand Down
10 changes: 5 additions & 5 deletions src/execution/periodic_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ use super::EpochConfig;
#[pyo3(text_signature = "(epoch_length)")]
pub(crate) struct PeriodicEpochConfig {
#[pyo3(get)]
pub(crate) epoch_length: pyo3_chrono::Duration,
pub(crate) epoch_length: chrono::Duration,
}

#[pymethods]
impl PeriodicEpochConfig {
#[new]
#[args(epoch_length)]
pub(crate) fn new(epoch_length: pyo3_chrono::Duration) -> (Self, EpochConfig) {
pub(crate) fn new(epoch_length: chrono::Duration) -> (Self, EpochConfig) {
(Self { epoch_length }, EpochConfig {})
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::Duration) {
fn __getstate__(&self) -> (&str, chrono::Duration) {
("PeriodicEpochConfig", self.epoch_length)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::Duration,) {
(pyo3_chrono::Duration(chrono::Duration::zero()),)
fn __getnewargs__(&self) -> (chrono::Duration,) {
(chrono::Duration::zero(),)
}

/// Unpickle from tuple of arguments.
Expand Down
1 change: 0 additions & 1 deletion src/pyo3_extensions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Newtypes around PyO3 types which allow easier interfacing with
//! Timely or other Rust libraries we use.

use crate::py_unwrap;
use crate::recovery::StateKey;
use crate::try_unwrap;
Expand Down
19 changes: 9 additions & 10 deletions src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::recovery::{EpochData, StateBytes, StateKey};
use crate::recovery::{StateUpdate, StepId};
use crate::recovery::{StatefulLogic, StatefulUnary};
use crate::StringResult;
use chrono::{Duration, NaiveDateTime};
use chrono::{DateTime, Duration, Utc};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -173,11 +173,10 @@ pub(crate) fn build_windower_builder(
if let Ok(tumbling_window_config) = window_config.downcast::<PyCell<TumblingWindowConfig>>() {
let tumbling_window_config = tumbling_window_config.borrow();

let length = tumbling_window_config.length.0;
let length = tumbling_window_config.length;
let start_at = tumbling_window_config
.start_at
.map(|t| t.0)
.unwrap_or_else(|| chrono::offset::Local::now().naive_local());
.unwrap_or_else(Utc::now);

let builder = TumblingWindower::builder(length, start_at);
Ok(Box::new(builder))
Expand All @@ -203,13 +202,13 @@ pub(crate) trait Clock<V> {
///
/// This can mutate the [`ClockState`] on every call to ensure
/// future calls are accurate.
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime;
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc>;

/// Get the time for an item.
///
/// This can mutate the [`ClockState`] if noting that an item has
/// arrived should advance the clock or something.
fn time_for(&mut self, value: &V) -> NaiveDateTime;
fn time_for(&mut self, value: &V) -> DateTime<Utc>;

/// Snapshot the internal state of this clock.
///
Expand Down Expand Up @@ -248,21 +247,21 @@ pub(crate) trait Windower {
/// This should update any internal state.
fn insert(
&mut self,
watermark: &NaiveDateTime,
item_time: NaiveDateTime,
watermark: &DateTime<Utc>,
item_time: DateTime<Utc>,
) -> Vec<Result<WindowKey, InsertError>>;

/// Look at the current watermark, determine which windows are now
/// closed, return them, and remove them from internal state.
fn drain_closed(&mut self, watermark: &NaiveDateTime) -> Vec<WindowKey>;
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey>;

/// Return how much system time should pass before the windowing
/// operator should be activated again, even if there is no
/// incoming values.
///
/// Use this to signal to Timely's execution when windows will be
/// closing so we can close them without data.
fn activate_after(&mut self, watermark: &NaiveDateTime) -> Option<Duration>;
fn activate_after(&mut self, watermark: &DateTime<Utc>) -> Option<Duration>;

/// Snapshot the internal state of this windower.
///
Expand Down
10 changes: 5 additions & 5 deletions src/window/system_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use pyo3::prelude::*;
use pyo3::{exceptions::PyValueError, PyResult};

Expand Down Expand Up @@ -55,15 +55,15 @@ impl SystemClock {
}

impl<V> Clock<V> for SystemClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
_ => chrono::offset::Local::now().naive_local(),
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => Utc::now(),
}
}

fn time_for(&mut self, item: &V) -> NaiveDateTime {
fn time_for(&mut self, item: &V) -> DateTime<Utc> {
let next_value = Poll::Ready(Some(item));
self.watermark(&next_value)
}
Expand Down
30 changes: 15 additions & 15 deletions src/window/testing_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use pyo3::{exceptions::PyValueError, prelude::*};

use super::{Clock, ClockConfig};
Expand All @@ -22,7 +22,7 @@ use crate::recovery::StateBytes;
pub(crate) struct PyTestingClock {
/// Modify this to change the current "now".
#[pyo3(get, set)]
now: pyo3_chrono::NaiveDateTime,
now: DateTime<Utc>,
}

impl PyTestingClock {
Expand All @@ -32,7 +32,7 @@ impl PyTestingClock {
PyCell::new(
py,
PyTestingClock {
now: pyo3_chrono::NaiveDateTime(chrono::naive::MIN_DATETIME),
now: DateTime::<Utc>::MIN_UTC,
},
)
.unwrap()
Expand All @@ -50,18 +50,18 @@ impl PyTestingClock {

#[new]
#[args(init_datetime)]
fn new(init_datetime: pyo3_chrono::NaiveDateTime) -> Self {
fn new(init_datetime: DateTime<Utc>) -> Self {
Self { now: init_datetime }
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::NaiveDateTime) {
fn __getstate__(&self) -> (&str, DateTime<Utc>) {
("TestingClock", self.now)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::NaiveDateTime,) {
(pyo3_chrono::NaiveDateTime(chrono::naive::MIN_DATETIME),)
fn __getnewargs__(&self) -> (DateTime<Utc>,) {
(DateTime::<Utc>::MIN_UTC,)
}

/// Unpickle from tuple of arguments.
Expand Down Expand Up @@ -152,9 +152,9 @@ impl TestingClock {
// all windows' times.
let py_clock = py_clock.clone();

if let Some(now) = resume_state_bytes.map(StateBytes::de::<NaiveDateTime>) {
if let Some(now) = resume_state_bytes.map(StateBytes::de::<DateTime<Utc>>) {
Python::with_gil(|py| {
py_clock.borrow_mut(py).now = pyo3_chrono::NaiveDateTime(now);
py_clock.borrow_mut(py).now = now;
})
}

Expand All @@ -164,23 +164,23 @@ impl TestingClock {
}

impl<V> Clock<V> for TestingClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => Python::with_gil(|py| {
let py_clock = self.py_clock.borrow(py);
py_clock.now.0.clone()
py_clock.now
}),
}
}

fn time_for(&mut self, item: &V) -> NaiveDateTime {
fn time_for(&mut self, item: &V) -> DateTime<Utc> {
self.watermark(&Poll::Ready(Some(item)))
}

fn snapshot(&self) -> StateBytes {
let now = Python::with_gil(|py| self.py_clock.borrow(py).now.0);
StateBytes::ser::<NaiveDateTime>(&now)
let now = Python::with_gil(|py| self.py_clock.borrow(py).now);
StateBytes::ser::<DateTime<Utc>>(&now)
}
}

0 comments on commit 4defb35

Please sign in to comment.