Skip to content

Commit

Permalink
catch invalid pipeline_name on sensors and schedules
Browse files Browse the repository at this point in the history
Summary: When we load the repository make sure that things that target pipelines via `pipelines_name` point at pipelines that actually exist in the same repo

Test Plan: added tests

Reviewers: max, dgibson

Reviewed By: max

Differential Revision: https://dagster.phacility.com/D7166
  • Loading branch information
alangenfeld committed Mar 29, 2021
1 parent c119e16 commit be7514f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 12 deletions.
56 changes: 51 additions & 5 deletions python_modules/dagster/dagster/core/definitions/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@


class _CacheingDefinitionIndex:
def __init__(self, definition_class, definition_class_name, definition_kind, definitions):
def __init__(
self,
definition_class,
definition_class_name,
definition_kind,
definitions,
validation_fn,
):

for key, definition in definitions.items():
check.invariant(
Expand All @@ -34,6 +41,7 @@ def __init__(self, definition_class, definition_class_name, definition_kind, def
self._definition_class = definition_class
self._definition_class_name = definition_class_name
self._definition_kind = definition_kind
self._validation_fn = validation_fn

self._definitions = definitions
self._definition_cache = {}
Expand Down Expand Up @@ -88,7 +96,7 @@ def get_definition(self, definition_name):
definition_source = self._definitions[definition_name]

if isinstance(definition_source, self._definition_class):
self._definition_cache[definition_name] = definition_source
self._definition_cache[definition_name] = self._validation_fn(definition_source)
return definition_source
else:
definition = definition_source()
Expand All @@ -112,7 +120,7 @@ def get_definition(self, definition_name):
definition_def_name=definition.name,
),
)
self._definition_cache[definition_name] = definition
self._definition_cache[definition_name] = self._validation_fn(definition)
return definition


Expand Down Expand Up @@ -154,10 +162,15 @@ def __init__(self, pipelines, partition_sets, schedules, sensors):
check.dict_param(sensors, "sensors", key_type=str)

self._pipelines = _CacheingDefinitionIndex(
PipelineDefinition, "PipelineDefinition", "pipeline", pipelines
PipelineDefinition, "PipelineDefinition", "pipeline", pipelines, self._validate_pipeline
)

self._schedules = _CacheingDefinitionIndex(
ScheduleDefinition, "ScheduleDefinition", "schedule", schedules
ScheduleDefinition,
"ScheduleDefinition",
"schedule",
schedules,
self._validate_schedule,
)
schedule_partition_sets = [
schedule.get_partition_set()
Expand All @@ -172,13 +185,18 @@ def __init__(self, pipelines, partition_sets, schedules, sensors):
{partition_set.name: partition_set for partition_set in schedule_partition_sets},
partition_sets,
),
self._validate_partition_set,
)
self._sensors = _CacheingDefinitionIndex(
SensorDefinition,
"SensorDefinition",
"sensor",
sensors,
self._validate_sensor,
)
# load all sensors to force validation
self._sensors.get_all_definitions()

self._all_pipelines = None
self._solids = None
self._all_solids = None
Expand Down Expand Up @@ -504,6 +522,34 @@ def solid_def_named(self, name):

return self._all_solids[name]

def _validate_pipeline(self, pipeline):
return pipeline

def _validate_schedule(self, schedule):
pipelines = self.get_pipeline_names()

if schedule.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'ScheduleDefinition "{schedule.name}" targets pipeline "{schedule.pipeline_name}" '
"which was not found in this repository."
)

return schedule

def _validate_sensor(self, sensor):
pipelines = self.get_pipeline_names()

if sensor.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'SensorDefinition "{sensor.name}" targets pipeline "{sensor.pipeline_name}" '
"which was not found in this repository."
)

return sensor

def _validate_partition_set(self, partition_set):
return partition_set


class RepositoryDefinition:
"""Define a repository that contains a collection of definitions.
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ def define_bar_schedules():
"foo_schedule": ScheduleDefinition(
"foo_schedule",
cron_schedule="* * * * *",
pipeline_name="test_pipeline",
pipeline_name="foo",
run_config={"fizz": "buzz"},
),
"foo_schedule_never_execute": ScheduleDefinition(
"foo_schedule_never_execute",
cron_schedule="* * * * *",
pipeline_name="test_pipeline",
pipeline_name="foo",
run_config={"fizz": "buzz"},
should_execute=lambda _context: False,
),
"foo_schedule_echo_time": ScheduleDefinition(
"foo_schedule_echo_time",
cron_schedule="* * * * *",
pipeline_name="test_pipeline",
pipeline_name="foo",
run_config_fn=lambda context: {
"passed_in_time": context.scheduled_execution_time.isoformat()
if context.scheduled_execution_time
Expand Down Expand Up @@ -142,13 +142,13 @@ def define_baz_partitions():
}


@sensor(pipeline_name="foo_pipeline")
@sensor(pipeline_name="foo")
def sensor_foo(_):
yield RunRequest(run_key=None, run_config={"foo": "FOO"}, tags={"foo": "foo_tag"})
yield RunRequest(run_key=None, run_config={"foo": "FOO"})


@sensor(pipeline_name="foo_pipeline")
@sensor(pipeline_name="foo")
def sensor_error(_):
raise Exception("womp womp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def define_bar_schedules():
"foo_schedule": ScheduleDefinition(
"foo_schedule",
cron_schedule="* * * * *",
pipeline_name="test_pipeline",
pipeline_name="foo",
run_config={},
),
"partitioned_schedule": partition_set.create_schedule_definition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
lambda_solid,
repository,
)
from dagster.core.definitions import sensor


def create_single_node_pipeline(name, called):
Expand Down Expand Up @@ -162,3 +163,38 @@ def some_repo():
assert len(some_repo.schedule_defs) == 1
assert len(some_repo.partition_set_defs) == 1
assert some_repo.get_partition_set_def("daily_foo_partitions")


def test_bad_schedule():
@daily_schedule(
pipeline_name="foo",
start_date=datetime.datetime(2020, 1, 1),
)
def daily_foo(_date):
return {}

with pytest.raises(
DagsterInvalidDefinitionError,
match='targets pipeline "foo" which was not found in this repository',
):

@repository
def _some_repo():
return [daily_foo]


def test_bad_sensor():
@sensor(
pipeline_name="foo",
)
def foo_sensor(_):
return {}

with pytest.raises(
DagsterInvalidDefinitionError,
match='targets pipeline "foo" which was not found in this repository',
):

@repository
def _some_repo():
return [foo_sensor]
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def define_bar_schedules():
"foo_schedule": ScheduleDefinition(
"foo_schedule",
cron_schedule="* * * * *",
pipeline_name="test_pipeline",
pipeline_name="foo",
run_config={},
)
}
Expand Down

1 comment on commit be7514f

@vercel
Copy link

@vercel vercel bot commented on be7514f Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.