Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup deployment-start-time feature branch, add tests #7928

Merged
merged 38 commits into from Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
87eb843
Update docstring
serinamarie Dec 15, 2022
b108a7d
Cleanup start-in start-at param
serinamarie Dec 16, 2022
97d41d6
Move start_in, start_at below other params
serinamarie Dec 16, 2022
7ab9503
Move human_readable_dt_diff assigning
serinamarie Dec 16, 2022
637ad4a
Add test template
serinamarie Dec 16, 2022
bd0f1c3
Rename test
serinamarie Dec 16, 2022
ab00c3e
Rename tests
serinamarie Dec 16, 2022
2016455
Start implementing tests for scheduling behavior
zanieb Dec 16, 2022
3a709d3
Add test for invalid input
zanieb Dec 16, 2022
e5c73f0
Add more tests
serinamarie Dec 19, 2022
58c2217
Fix tests
serinamarie Dec 19, 2022
1c1f555
Rename long var name
serinamarie Dec 19, 2022
4ff7d46
Handle unexpected exceptions
serinamarie Dec 19, 2022
259bec9
Add more tests
serinamarie Dec 19, 2022
3eeeb55
Add exception handling when user input includes tz
serinamarie Dec 19, 2022
9370e2a
Update test_start_at_option_schedules_flow_run_in_future
serinamarie Dec 19, 2022
23d36dd
Modify tests
serinamarie Dec 20, 2022
be6ff76
Accommodate timezones
serinamarie Dec 20, 2022
5aa10f3
Some cleanup
serinamarie Dec 20, 2022
180fd45
Remove diff_for_humans because of 1 second lag
serinamarie Dec 20, 2022
8a77b92
Remove 2nd parsing for display, add future pref in dateparsing
serinamarie Dec 21, 2022
6b4f94d
add test, change expected values due to future dateparser setting
serinamarie Dec 21, 2022
1f9f23e
Fix tests, hopefully
serinamarie Dec 21, 2022
927deba
Remove 2nd calculation of local tz
serinamarie Dec 21, 2022
74503df
Cleanup tests
serinamarie Dec 22, 2022
55bfce4
Cleanup
serinamarie Dec 22, 2022
d089ddb
bump dateparser version
serinamarie Dec 22, 2022
3ef506a
Fix tests
serinamarie Dec 22, 2022
104e72b
rename tests, test var
serinamarie Dec 22, 2022
8912f52
Modify print statements, docstrings
serinamarie Dec 27, 2022
4b80352
Put parsing logic for start_time params in a helper function
serinamarie Dec 27, 2022
3320144
Rename helper function
serinamarie Dec 28, 2022
07c7cb7
Update src/prefect/cli/deployment.py
serinamarie Dec 28, 2022
b205827
Update src/prefect/cli/deployment.py
serinamarie Dec 28, 2022
02b9f14
Remove helper function
serinamarie Dec 28, 2022
358adb7
Add test to document start-in behavior
serinamarie Dec 28, 2022
eaa14c5
Correct test
serinamarie Dec 28, 2022
44f03b4
Modify exit error & corresponding test
serinamarie Dec 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 22 additions & 26 deletions src/prefect/cli/deployment.py
Expand Up @@ -384,7 +384,8 @@ async def run(
"""
Create a flow run for the given flow and deployment.

The flow run will be scheduled for now and an agent must execute it.
The flow run will be scheduled for now if start-in or start-at are unspecified;
and an agent must execute it.
serinamarie marked this conversation as resolved.
Show resolved Hide resolved

The flow run will not execute until an agent starts.
"""
Expand All @@ -411,15 +412,19 @@ async def run(
)
parameters = {**multi_params, **cli_params}

# Parse the start time if provided
if start_in:
start_time_raw = "in " + start_in
elif start_at:
start_time_raw = "at " + start_at
else:
start_time_raw = None
if start_in and start_at:
exit_with_error(
"Expected optional start_in field or start_at field but not both."
serinamarie marked this conversation as resolved.
Show resolved Hide resolved
)

if start_time_raw:
elif start_in is None and start_at is None:
scheduled_start_time = now
human_readable_dt_diff = " (now)"
else:
if start_in:
start_time_raw = "in " + start_in
else:
start_time_raw = "at " + start_at
with warnings.catch_warnings():
# PyTZ throws a warning based on dateparser usage of the library
# See https://github.com/scrapinghub/dateparser/issues/1089
Expand All @@ -433,9 +438,11 @@ async def run(
)
if start_time_parsed is None:
exit_with_error(f"Unable to parse scheduled start time {start_time_raw!r}.")

scheduled_start_time = pendulum.instance(start_time_parsed)
else:
scheduled_start_time = now
human_readable_dt_diff = (
" (" + pendulum.format_diff(scheduled_start_time.diff(now)) + ")"
)

async with get_client() as client:
deployment = await get_deployment(client, name, deployment_id)
Expand Down Expand Up @@ -474,21 +481,10 @@ async def run(
else:
run_url = "<no dashboard available>"

if start_in:
scheduled_display = (
scheduled_start_time.in_tz(
pendulum.tz.local_timezone()
).to_datetime_string()
+ " ("
+ pendulum.format_diff(scheduled_start_time.diff(now))
+ ")"
)
elif start_at:
scheduled_display = scheduled_start_time.in_tz(
pendulum.tz.local_timezone()
).to_datetime_string()
else:
scheduled_display = "now"
scheduled_display = (
scheduled_start_time.in_tz(pendulum.tz.local_timezone()).to_datetime_string()
+ human_readable_dt_diff
)

app.console.print(f"Created flow run {flow_run.name!r}.")
app.console.print(
Expand Down
212 changes: 212 additions & 0 deletions tests/cli/deployment/test_deployment_run.py
@@ -0,0 +1,212 @@
import dateparser
import pendulum
import pytest

import prefect
from prefect.testing.cli import invoke_and_assert
from prefect.utilities.asyncutils import run_sync_in_worker_thread


@pytest.fixture
async def deployment_name(deployment, orion_client):
flow = await orion_client.read_flow(deployment.flow_id)
return f"{flow.name}/{deployment.name}"


@pytest.fixture
def frozen_now(monkeypatch):
now = pendulum.now()
monkeypatch.setattr("pendulum.now", lambda *_: now)
yield now


def test_both_start_in_and_start_at_raises():
invoke_and_assert(
command=["deployment", "run", "--start-in", "foo", "--start-at", "bar"],
expected_code=1,
expected_output="Expected optional start_in field or start_at field but not both.",
)


@pytest.mark.parametrize(
"start_at,expected_start_time",
[
("12-1-22 5pm", "2022-12-01 17:00:00"),
("1/1/30", "2030-01-01 00:00:00"),
("13/1/20 13:31", "2020-01-13 13:31:00"),
("9pm December 31st 2022", "2022-12-31 21:00:00"),
("January 2nd 2023", "2023-01-02 00:00:00"),
],
)
async def test_start_at_option_schedules_flow_run_in_future(
deployment_name: str,
orion_client: prefect.OrionClient,
start_at: str,
expected_start_time,
):

await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"deployment",
"run",
deployment_name,
"--start-at",
start_at,
],
expected_output_contains=["Scheduled start time:", expected_start_time],
serinamarie marked this conversation as resolved.
Show resolved Hide resolved
)

flow_runs = await orion_client.read_flow_runs()
assert len(flow_runs) == 1
flow_run = flow_runs[0]

assert flow_run.state.is_scheduled()
serinamarie marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize(
"start_in,expected_duration",
[
("10 minutes", pendulum.duration(minutes=10)),
("5 days", pendulum.duration(days=5)),
("3 seconds", pendulum.duration(seconds=3)),
(None, pendulum.duration(seconds=0)),
("1 year and 3 months", pendulum.duration(years=1, months=3)),
("2 weeks & 1 day", pendulum.duration(weeks=2, days=1)),
("27 hours + 4 mins", pendulum.duration(days=1, hours=3, minutes=4)),
],
)
async def test_start_in_option_schedules_flow_run_in_future(
deployment_name: str,
frozen_now,
orion_client: prefect.OrionClient,
start_in: str,
expected_duration,
):
expected_start_time = frozen_now + expected_duration
expected_display = expected_start_time.in_tz(
pendulum.tz.local_timezone()
).to_datetime_string()

await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"deployment",
"run",
deployment_name,
"--start-in",
start_in,
],
expected_output_contains=f"Scheduled start time: {expected_display}",
)

flow_runs = await orion_client.read_flow_runs()
assert len(flow_runs) == 1
flow_run = flow_runs[0]

assert flow_run.state.is_scheduled()
scheduled_time = flow_run.state.state_details.scheduled_time

assert scheduled_time == expected_start_time


@pytest.mark.parametrize(
serinamarie marked this conversation as resolved.
Show resolved Hide resolved
"start_in, expected_display",
[
("10 minutes", "in 10 minutes"),
("5 days", "in 5 days"),
("3 seconds", "in a few seconds"),
(None, "now"),
("1 year and 3 months", "in 1 year"),
("2 weeks & 1 day", "in 2 weeks"),
("27 hours + 4 mins", "in 1 day"),
],
)
async def test_start_in_displays_scheduled_start_time(
deployment_name: str,
orion_client: prefect.OrionClient,
start_in: str,
expected_display: str,
):

await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"deployment",
"run",
deployment_name,
"--start-in",
start_in,
],
expected_output_contains=["Scheduled start time:", expected_display],
)

serinamarie marked this conversation as resolved.
Show resolved Hide resolved
flow_runs = await orion_client.read_flow_runs()
assert len(flow_runs) == 1
flow_run = flow_runs[0]

assert flow_run.state.is_scheduled()


def test_start_at_displays_scheduled_start_time(deployment_name: str, monkeypatch):
monkeypatch.setattr(
"pendulum.now",
lambda *_: pendulum.instance(dateparser.parse("January 1st 2023")),
)
invoke_and_assert(
command=[
"deployment",
"run",
deployment_name,
"--start-at",
"January 1st 2022",
],
expected_output_contains="Scheduled start time: 2022-01-01 00:00:00 (1 year ago)",
)


def test_start_at_invalid_input(deployment_name: str):
invoke_and_assert(
command=[
"deployment",
"run",
deployment_name,
"--start-at",
"foobar",
serinamarie marked this conversation as resolved.
Show resolved Hide resolved
],
expected_code=1,
expected_output="Unable to parse scheduled start time 'at foobar'.",
)


def test_start_in_invalid_input(deployment_name: str):
serinamarie marked this conversation as resolved.
Show resolved Hide resolved
invoke_and_assert(
command=[
"deployment",
"run",
deployment_name,
"--start-in",
"foobar",
],
expected_code=1,
expected_output="Unable to parse scheduled start time 'in foobar'.",
)


async def test_run_deployment_only_creates_one_flow_run(
deployment_name: str, orion_client: prefect.OrionClient, deployment
):
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"deployment",
"run",
deployment_name,
],
)

flow_runs = await orion_client.read_flow_runs()
assert len(flow_runs) == 1
flow_run = flow_runs[0]

assert flow_run.deployment_id == deployment.id