Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only use UTC datetimes #115

Merged
merged 6 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
479 changes: 237 additions & 242 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ bincode = { version = "1.3.3" }
chrono = { version = "0.4", features = [ "serde" ] }
futures = { version = "0.3.21" }
log = { version = "0.4" }
pyo3 = { version = "0.17.1" }
pyo3-chrono = { version = "0.5.0" }
pyo3-log = { version = "0.7.0" }
# TODO: PyO3 pinned to current latest main commit, which includes pyo3->chrono integration
# Change to version "0.17.2" as soon as it is released
# pyo3 = { version = "0.17.2", features = ["macros", "chrono"] }
pyo3 = { git = "https://github.com/pyo3/pyo3", rev = "86ce4d1a1", version = "0.17", features = ["macros", "chrono"] }
# TODO: pyo3-log too is pinned to a fork, so that it compiles together with the unreleased PyO3 version.
# Change this to the version "0.7" as soon as PyO3 "0.17.2" itself is released.
# The current pyo3-log version should work as it is, since it depends on pyo3 "~0.17".
# pyo3-log = "0.7"
pyo3-log = { git = "https://github.com/psykopear/pyo3-log", rev = "802b6f4", version = "0.7" }
scopeguard = { version = "1.1.0" }
send_wrapper = { version = "0.6.0" }
serde = { version = "1.0.134" }
Expand All @@ -35,7 +41,10 @@ rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi-vendored", "
rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi", "ssl" ] }

[dev-dependencies]
pyo3 = { version = "0.17.1", default-features = false }
# TODO: PyO3 pinned to current latest main commit, which includes pyo3->chrono integration
# Change to version "0.17.2" as soon as it is released
# pyo3 = { version = "0.17.2", default-features = false, features = ["macros", "chrono"] }
pyo3 = { git = "https://github.com/pyo3/pyo3", rev = "86ce4d1a1", version = "0.17", default-features = false, features = ["macros", "chrono"] }

[features]
extension-module = ["pyo3/extension-module"]
Expand Down
6 changes: 3 additions & 3 deletions pytests/test_operators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from threading import Event

from pytest import fixture, raises
Expand Down Expand Up @@ -342,7 +342,7 @@ def running_count(type_to_count, event):


def test_reduce_window(recovery_config):
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down Expand Up @@ -406,7 +406,7 @@ def add(acc, x):


def test_fold_window(recovery_config):
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down
4 changes: 2 additions & 2 deletions pytests/test_window.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import timedelta, datetime, timezone

from bytewax.dataflow import Dataflow
from bytewax.execution import run_main
Expand All @@ -9,7 +9,7 @@


def test_tumbling_window():
start_at = datetime(2022, 1, 1)
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)

flow = Dataflow()
Expand Down
3 changes: 1 addition & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub(crate) fn default_epoch_config() -> Py<EpochConfig> {
Python::with_gil(|py| {
PyCell::new(
py,
PeriodicEpochConfig::new(pyo3_chrono::Duration(chrono::Duration::seconds(10))),
PeriodicEpochConfig::new(chrono::Duration::seconds(10)),
)
.unwrap()
.extract()
Expand Down Expand Up @@ -173,7 +173,6 @@ where

let epoch_length = periodic_config
.epoch_length
.0
.to_std()
.map_err(|err| format!("Invalid epoch length: {err:?}"))?;

Expand Down
10 changes: 5 additions & 5 deletions src/execution/periodic_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ use super::EpochConfig;
#[pyo3(text_signature = "(epoch_length)")]
pub(crate) struct PeriodicEpochConfig {
#[pyo3(get)]
pub(crate) epoch_length: pyo3_chrono::Duration,
pub(crate) epoch_length: chrono::Duration,
}

#[pymethods]
impl PeriodicEpochConfig {
#[new]
#[args(epoch_length)]
pub(crate) fn new(epoch_length: pyo3_chrono::Duration) -> (Self, EpochConfig) {
pub(crate) fn new(epoch_length: chrono::Duration) -> (Self, EpochConfig) {
(Self { epoch_length }, EpochConfig {})
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::Duration) {
fn __getstate__(&self) -> (&str, chrono::Duration) {
("PeriodicEpochConfig", self.epoch_length)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::Duration,) {
(pyo3_chrono::Duration(chrono::Duration::zero()),)
fn __getnewargs__(&self) -> (chrono::Duration,) {
(chrono::Duration::zero(),)
}

/// Unpickle from tuple of arguments.
Expand Down
1 change: 0 additions & 1 deletion src/pyo3_extensions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Newtypes around PyO3 types which allow easier interfacing with
//! Timely or other Rust libraries we use.

use crate::py_unwrap;
use crate::recovery::StateKey;
use crate::try_unwrap;
Expand Down
19 changes: 9 additions & 10 deletions src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::recovery::{EpochData, StateBytes, StateKey};
use crate::recovery::{StateUpdate, StepId};
use crate::recovery::{StatefulLogic, StatefulUnary};
use crate::StringResult;
use chrono::{Duration, NaiveDateTime};
use chrono::{DateTime, Duration, Utc};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -173,11 +173,10 @@ pub(crate) fn build_windower_builder(
if let Ok(tumbling_window_config) = window_config.downcast::<PyCell<TumblingWindowConfig>>() {
let tumbling_window_config = tumbling_window_config.borrow();

let length = tumbling_window_config.length.0;
let length = tumbling_window_config.length;
let start_at = tumbling_window_config
.start_at
.map(|t| t.0)
.unwrap_or_else(|| chrono::offset::Local::now().naive_local());
.unwrap_or_else(Utc::now);

let builder = TumblingWindower::builder(length, start_at);
Ok(Box::new(builder))
Expand All @@ -203,13 +202,13 @@ pub(crate) trait Clock<V> {
///
/// This can mutate the [`ClockState`] on every call to ensure
/// future calls are accurate.
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime;
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc>;

/// Get the time for an item.
///
/// This can mutate the [`ClockState`] if noting that an item has
/// arrived should advance the clock or something.
fn time_for(&mut self, value: &V) -> NaiveDateTime;
fn time_for(&mut self, value: &V) -> DateTime<Utc>;

/// Snapshot the internal state of this clock.
///
Expand Down Expand Up @@ -248,21 +247,21 @@ pub(crate) trait Windower {
/// This should update any internal state.
fn insert(
&mut self,
watermark: &NaiveDateTime,
item_time: NaiveDateTime,
watermark: &DateTime<Utc>,
item_time: DateTime<Utc>,
) -> Vec<Result<WindowKey, InsertError>>;

/// Look at the current watermark, determine which windows are now
/// closed, return them, and remove them from internal state.
fn drain_closed(&mut self, watermark: &NaiveDateTime) -> Vec<WindowKey>;
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey>;

/// Return how much system time should pass before the windowing
/// operator should be activated again, even if there is no
/// incoming values.
///
/// Use this to signal to Timely's execution when windows will be
/// closing so we can close them without data.
fn activate_after(&mut self, watermark: &NaiveDateTime) -> Option<Duration>;
fn activate_after(&mut self, watermark: &DateTime<Utc>) -> Option<Duration>;

/// Snapshot the internal state of this windower.
///
Expand Down
10 changes: 5 additions & 5 deletions src/window/system_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use pyo3::prelude::*;
use pyo3::{exceptions::PyValueError, PyResult};

Expand Down Expand Up @@ -55,15 +55,15 @@ impl SystemClock {
}

impl<V> Clock<V> for SystemClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
_ => chrono::offset::Local::now().naive_local(),
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => Utc::now(),
}
}

fn time_for(&mut self, item: &V) -> NaiveDateTime {
fn time_for(&mut self, item: &V) -> DateTime<Utc> {
let next_value = Poll::Ready(Some(item));
self.watermark(&next_value)
}
Expand Down
30 changes: 15 additions & 15 deletions src/window/testing_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use pyo3::{exceptions::PyValueError, prelude::*};

use super::{Clock, ClockConfig};
Expand All @@ -22,7 +22,7 @@ use crate::recovery::StateBytes;
pub(crate) struct PyTestingClock {
/// Modify this to change the current "now".
#[pyo3(get, set)]
now: pyo3_chrono::NaiveDateTime,
now: DateTime<Utc>,
}

impl PyTestingClock {
Expand All @@ -32,7 +32,7 @@ impl PyTestingClock {
PyCell::new(
py,
PyTestingClock {
now: pyo3_chrono::NaiveDateTime(chrono::naive::MIN_DATETIME),
now: DateTime::<Utc>::MIN_UTC,
},
)
.unwrap()
Expand All @@ -50,18 +50,18 @@ impl PyTestingClock {

#[new]
#[args(init_datetime)]
fn new(init_datetime: pyo3_chrono::NaiveDateTime) -> Self {
fn new(init_datetime: DateTime<Utc>) -> Self {
Self { now: init_datetime }
}

/// Pickle as a tuple.
fn __getstate__(&self) -> (&str, pyo3_chrono::NaiveDateTime) {
fn __getstate__(&self) -> (&str, DateTime<Utc>) {
("TestingClock", self.now)
}

/// Egregious hack see [`SqliteRecoveryConfig::__getnewargs__`].
fn __getnewargs__(&self) -> (pyo3_chrono::NaiveDateTime,) {
(pyo3_chrono::NaiveDateTime(chrono::naive::MIN_DATETIME),)
fn __getnewargs__(&self) -> (DateTime<Utc>,) {
(DateTime::<Utc>::MIN_UTC,)
}

/// Unpickle from tuple of arguments.
Expand Down Expand Up @@ -152,9 +152,9 @@ impl TestingClock {
// all windows' times.
let py_clock = py_clock.clone();

if let Some(now) = resume_state_bytes.map(StateBytes::de::<NaiveDateTime>) {
if let Some(now) = resume_state_bytes.map(StateBytes::de::<DateTime<Utc>>) {
Python::with_gil(|py| {
py_clock.borrow_mut(py).now = pyo3_chrono::NaiveDateTime(now);
py_clock.borrow_mut(py).now = now;
})
}

Expand All @@ -164,23 +164,23 @@ impl TestingClock {
}

impl<V> Clock<V> for TestingClock {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> NaiveDateTime {
fn watermark(&mut self, next_value: &Poll<Option<V>>) -> DateTime<Utc> {
match next_value {
// If there will be no more values, close out all windows.
Poll::Ready(None) => chrono::naive::MAX_DATETIME,
Poll::Ready(None) => DateTime::<Utc>::MAX_UTC,
_ => Python::with_gil(|py| {
let py_clock = self.py_clock.borrow(py);
py_clock.now.0.clone()
py_clock.now
}),
}
}

fn time_for(&mut self, item: &V) -> NaiveDateTime {
fn time_for(&mut self, item: &V) -> DateTime<Utc> {
self.watermark(&Poll::Ready(Some(item)))
}

fn snapshot(&self) -> StateBytes {
let now = Python::with_gil(|py| self.py_clock.borrow(py).now.0);
StateBytes::ser::<NaiveDateTime>(&now)
let now = Python::with_gil(|py| self.py_clock.borrow(py).now);
StateBytes::ser::<DateTime<Utc>>(&now)
}
}