IO managers for ops that receive DynamicOutput as an Input #20990
-
I'm trying implement the pattern similar to the one in the example, with the difference being the objects being transformed being Polars/Pandas DataFrames. The op/graph compiles, but during runtime, I see the following error:
I was wondering how it might be possible to implement an IO manager for a list of DataFrames? I don't see an @op(out=DynamicOut(pl.LazyFrame))
def load_pieces() -> Generator[DynamicOut, None, None]:
filenames = load_filenames()
for idx, fn in filenames:
yield DynamicOutput(fn, mapping_key=idx)
@op
def download_and_transform(filename: str) -> pl.LazyFrame:
return some_function_that_does_that(filename)
@op
def merge(dfs: list[pl.LazyFrame]) -> pl.LazyFrame:
return pl.concat(results)
@graph_asset
def dynamic_graph_asset():
pieces = load_pieces()
results = pieces.map(download_and_transform)
merge(results.collect()) Additionally, when I try to create my own IO manager to handle this type, I receive an error when I try to access any Custom IO Manager: class ExtendedPolarsParquetIOManager(ConfigurableIOManager):
def handle_output(self, context: OutputContext, obj: Any) -> None:
pass
def _get_path(self, asset_key: AssetKey) -> str:
return self.root_path + "/".join(asset_key.path)
def load_input(self, context: InputContext) -> Any:
context.log.info(context.asset_key)
context.log.info(context.assay_partition_key) Updated Dagster code: In(input_manager_key="extended_polars_parquet_io_manager")
def merge(dfs: list[pl.LazyFrame]) -> pl.LazyFrame:
return pl.concat(results) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
I hate doing this - bump. I can't seem to figure this one out myself. |
Beta Was this translation helpful? Give feedback.
-
hi @mihirsamdarshi Apologies for the delayed response. ReproThe initial code snippet provided in the question didn’t work for me out-of-the-box, so I made some modifications related to types. Here is my attempt using the built-in from typing import Generator, Tuple
import polars as pl
from dagster import (
Definitions,
DynamicOut,
DynamicOutput,
graph_asset,
op,
Out
)
from dagster_polars import PolarsParquetIOManager
#### dummy functions for repro purposes
def load_filenames() -> list[Tuple[str, str]]:
return [(str(i), f"file_{i}.csv") for i in range(10)]
def some_function_that_does_that(filename: str) -> pl.LazyFrame:
return pl.LazyFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
@op(out=DynamicOut(str))
def load_pieces() -> Generator[DynamicOut, None, None]:
filenames = load_filenames()
for idx, fn in filenames:
yield DynamicOutput(fn, mapping_key=idx)
@op(out=Out(io_manager_key="polars_parquet_io_manager"))
def download_and_transform(filename: str) -> pl.LazyFrame:
return some_function_that_does_that(filename)
@op
def merge(dfs: list[pl.LazyFrame]) -> pl.LazyFrame:
foo = pl.concat(dfs)
print(foo)
return foo
@graph_asset
def dynamic_graph_asset():
pieces = load_pieces()
results = pieces.map(download_and_transform)
return merge(results.collect())
defs = Definitions(
assets=[dynamic_graph_asset],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/tmp/polars_parquet_io_manager")
}
) Suggested updatesHere's a dummy but functioning code example of how you can customize your own
from typing import Generator, Tuple, List
import polars as pl
from dagster import (
Definitions,
DynamicOut,
DynamicOutput,
graph_asset,
op,
Out,
ConfigurableIOManager,InputContext,OutputContext
)
from dagster_polars import PolarsParquetIOManager
from upath import UPath
class ExtendedPolarsParquetIOManager(ConfigurableIOManager):
base_dir: str
def _make_directory(self, path: "UPath"):
"""Create a directory at the provided path.
Override as a no-op if the target backend doesn't use directories.
"""
path.mkdir(parents=True, exist_ok=True)
def handle_output(self, context: OutputContext, obj):
path = self._get_path(context)
self._make_directory(path.parent)
print("output path", path)
obj.collect().write_parquet(path)
def load_input(self, context: InputContext):
path = self._get_path(context.upstream_output) # get output's output context
print("input path", path)
return pl.scan_parquet(path)
def _get_path(self, context: OutputContext, idx=None):
return UPath(self.base_dir, *context.get_identifier()) # instead of context.asset_key, use context.get_identifier() because it's in op not asset.
#### dummy functions for repro purposes
def load_filenames() -> list[Tuple[str, str]]:
return [(str(i), f"file_{i}.csv") for i in range(2)]
def some_function_that_does_that(filename: str) -> pl.LazyFrame:
return pl.LazyFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
@op(out=DynamicOut(str))
def load_pieces() -> Generator[DynamicOut, None, None]:
filenames = load_filenames()
for idx, fn in filenames:
yield DynamicOutput(fn, mapping_key=idx)
@op(out=Out(io_manager_key="polars_parquet_io_manager")) # specify io_manager_key here so it handles both output storing and input loading. otherwise putting it on merge would have "asymmetrical" input manager which could complicate the scenario here.
def download_and_transform(filename: str) -> pl.LazyFrame:
return some_function_that_does_that(filename)
@op
def merge(dfs: List[pl.LazyFrame]) -> pl.LazyFrame:
print(dfs)
foo = pl.concat(dfs)
print("foo", foo)
return foo
@graph_asset
def dynamic_graph_asset():
pieces = load_pieces()
results = pieces.map(download_and_transform)
return merge(results.collect())
defs = Definitions(
assets=[dynamic_graph_asset],
resources={
"polars_parquet_io_manager": ExtendedPolarsParquetIOManager(base_dir="/tmp/polars_parquet_io_manager")
}
) |
Beta Was this translation helpful? Give feedback.
hi @mihirsamdarshi Apologies for the delayed response.
Repro
The initial code snippet provided in the question didn’t work for me out-of-the-box, so I made some modifications related to types.
Here is my attempt using the built-in
PolarsParquetIOManager
for code similar to what you provided. This repros theNotImplementedError: Can't load object for type annotation typing.List[polars.lazyframe.frame.LazyFrame]
error you mentioned. So, it looks like the out-of-the-boxPolarsParquetIOManager
doesn't support the dynamic out fan in case. Please note this is still an experimental integration that's built by community members.