Skip to content

Commit

Permalink
Apply callback once per set of targets
Browse files Browse the repository at this point in the history
Apply the callbacks once per set of targets sharing a file.
This requires pytroll/satpy#2281
  • Loading branch information
gerritholl committed Nov 17, 2022
1 parent 1d0bbc7 commit b1567d9
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 41 deletions.
153 changes: 115 additions & 38 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

"""Trollflow2 plugins."""

import collections.abc
import os
import pathlib
from contextlib import contextmanager, suppress
Expand All @@ -34,6 +35,7 @@
import dpath.util
import rasterio
import dask
import dask.array as da
from dask.delayed import Delayed
from posttroll.message import Message
from posttroll.publisher import Publisher, NoisyPublisher
Expand All @@ -43,7 +45,7 @@
from rasterio.enums import Resampling
from satpy import Scene
from satpy.resample import get_area_def
from satpy.writers import compute_writer_results
from satpy.writers import compute_writer_results, group_results_by_output_file
from satpy.version import version as satpy_version
from pyresample.geometry import get_geostationary_bounding_box
from trollflow2.dict_tools import get_config_value, plist_iter
Expand Down Expand Up @@ -290,21 +292,25 @@ def save_datasets(job):
The product list may contain a ``call_on_done`` parameter.
This parameter has effect if and only if ``eager_writing`` is False
(which is the default). It should contain a list of references to
callabales. Upon computation time, each callable will be called
with three arguments: the result of ``save_dataset``, the full
job dictionary, and the dictionary describing the format config and
output filename that was written. The callables must return again the
``save_dataset`` return value again. This callback could be used, for
example, to ship products as soon as they are successfully produced.
Three callback functions are provided with trollflow2:
:func:`callback_log`, :func:`callback_move`, and :func:`callback_close`.
If using the geotiff writer, :func:`callback_close` should be used before
the others for correct results. When using :func:`callback_move`, the
user must also set ``early_moving`` to True and use a ``staging_zone``.
If both are used, :func:`callback_log` must be called AFTER
:func:`callback_move`, because :func:`callback_log` searches for the final
destination of the file and reports the size (so it accesses the metadata).
(which is the default). It should contain a list of references
to callables. Upon computation time, each callable will be called
with five arguments: the result of ``save_dataset``, sources (if
applicable), targets (if applicable), the full job dictionary, and
the dictionary describing the format config and output filename
that was written. The parameters sources and targets are set to
None if using a writer where :meth:`~satpy.Scene.save_datasets`
does not return those. The callables must return again the
``save_dataset`` return value (possibly altered). This callback
could be used, for example, to ship products as soon as they are
successfully produced. Three callback functions are provided
with trollflow2: :func:`callback_log`, :func:`callback_move`, and
:func:`callback_close`. If using the geotiff or ninjogeotiff writers,
:func:`callback_close` should be used before the others for correct
results. When using :func:`callback_move`, the user must also set
``early_moving`` to True and use a ``staging_zone``. If both are
used, :func:`callback_log` must be called AFTER :func:`callback_move`,
because :func:`callback_log` searches for the final destination of
the file and reports the size (so it accesses the metadata).
Other arguments defined in the job list (either directly under
``product_list``, or under ``formats``) are passed on to the satpy writer. The
Expand All @@ -325,21 +331,97 @@ def save_datasets(job):
callbacks = None
with renamed_files(not early_moving) as renames:
for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config):
if callbacks:
obj = callbacks[0](
save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing),
job, fmat_config)
for callback in callbacks[1:]:
obj = callback(obj, job, fmat_config)
else:
obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
if obj is not None:
objs.append(obj)
late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
late_saver = _maybe_apply_callbacks(late_saver, callbacks, job, fmat_config)
if late_saver is not None:
objs.append(late_saver)
job['produced_files'].put(fmat_config['filename'])
if not eager_writing:
compute_writer_results(objs)


def _maybe_apply_callbacks(late_saver, callbacks, *args):
"""Maybe apply callbacks.
If we are using callbacks via the ``call_on_done`` parameter, wrap
``late_saver`` with those iteratively. If not, return ``late_saver`` as is.
Here, ``late_saver`` is whatever :meth:`satpy.Scene.save_datasets`
returns.
"""
if callbacks is None:
return late_saver
if isinstance(late_saver, Delayed):
return _apply_callbacks_to_delayed(late_saver, callbacks, None, None, *args)
if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2:
if isinstance(late_saver[0], collections.abc.Sequence):
return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args)
return _apply_callbacks_to_source_and_target(late_saver, callbacks, *args)
raise ValueError(
"Unrecognised return value type from ``save_datasets``, "
"don't know how to apply wrappers.")


def _apply_callbacks_to_delayed(delayed, callbacks, *args):
"""Recursively apply the callbacks to the delayed object.
Args:
delayed: dask Delayed object to which callbacks are applied
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
delayed type with callbacks applied
"""
delayed = callbacks[0](delayed, *args)
for callback in callbacks[1:]:
delayed = callback(delayed, *args)
return delayed


def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args):
"""Apply callbacks to multiple sources/targets pairs.
Taking source/target pairs such as returned by
:meth:`satpy.Scene.save_datasets`, split those by file and turn them all in
delayed types by calling :func:`dask.array.store`, then apply callbacks.
Args:
late_saver: tuple of ``(sources, targets)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
list of delayed types
"""
delayeds = []
for (src, targ) in group_results_by_output_file(*late_saver):
delayed = da.store(src, targ, compute=False)
delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, src, targ, *args))
return delayeds


def _apply_callbacks_to_source_and_target(late_saver, callbacks, *args):
"""Apply callbacks to single source/target pairs.
Taking a single source/target pair such as may be returned by
:meth:`satpy.Scene.save_datasets`, turn this into a delayed type
type by calling :func:`dask.array.store`, then apply callbacks.
Args:
late_saver: tuple of ``(source, target)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
delayed types
"""
(src, targ) = late_saver
delayed = da.store(src, targ, compute=False)
return _apply_callbacks_to_delayed(delayed, callbacks, [src], [targ], *args)


def product_missing_from_scene(product, scene):
"""Check if product is missing from the scene."""
if not isinstance(product, (tuple, list)):
Expand Down Expand Up @@ -928,7 +1010,7 @@ def _product_meets_min_valid_data_fraction(
return True


def callback_log(obj, job, fmat_config):
def callback_log(obj, srcs, targs, job, fmat_config):
"""Logging callback for save_datasets call_on_done.
Callback function that can be used with the :func:`save_datasets`
Expand All @@ -947,7 +1029,7 @@ def callback_log(obj, job, fmat_config):
return obj


def callback_move(obj, job, fmat_config):
def callback_move(obj, srcs, targs, job, fmat_config):
"""Mover callback for save_datasets call_on_done.
Callback function that can be used with the :func:`save_datasets`
Expand All @@ -962,8 +1044,6 @@ def callback_move(obj, job, fmat_config):
of the file, not the temporary one.
"""

# due to early moving, I've already changed the filename to the destination
# reverse logic also
destfile = pathlib.Path(fmat_config["filename"])
srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"])
srcfile = srcdir / destfile.name
Expand All @@ -972,7 +1052,7 @@ def callback_move(obj, job, fmat_config):
return obj


def callback_close(obj, job, fmat_config):
def callback_close(obj, srcs, targs, job, fmat_config):
"""Callback closing files where needed.
When using callbacks with writers that return a ``(src, target)`` pair for
Expand All @@ -985,10 +1065,7 @@ def callback_close(obj, job, fmat_config):
If passed a ``dask.Delayed`` object, this callback does nothing. If passed
a ``(src, targ)`` pair, it closes the target.
"""
if isinstance(obj, Delayed):
return
try:
obj[1].close()
except AttributeError:
for ob in obj[1]:
ob.close()
if targs:
for targ in targs:
targ.close()
return obj
6 changes: 3 additions & 3 deletions trollflow2/tests/test_trollflow2.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ def test_save_datasets_callback(tmp_path, caplog, fake_scene):

logger = logging.getLogger("testlogger")

def testlog(obj, job, fmat_config):
def testlog(obj, srcs, targs, job, fmat_config):
"""Toy function doing some logging"""
filename = fmat_config["filename"]
# ensure computation has indeed completed and file was flushed
Expand Down Expand Up @@ -2156,7 +2156,7 @@ def test_callback_log(caplog, tmp_path):
fp.write("x" * 10)
obj = object()
with caplog.at_level(logging.INFO):
res = callback_log(obj, {}, {"filename": os.fspath(srcfile)})
res = callback_log(obj, None, None, {}, {"filename": os.fspath(srcfile)})
assert res is obj
assert f"Wrote {srcfile!s} successfully, total 10 bytes." in caplog.text

Expand All @@ -2178,7 +2178,7 @@ def test_callback_move(caplog, tmp_path):
"output_dir": os.fspath(destdir)}}}
fname_config = {"filename": os.fspath(destfile)}
with caplog.at_level(logging.DEBUG):
res = callback_move(obj, job, fname_config)
res = callback_move(obj, None, None, job, fname_config)
assert res is obj
assert not srcfile.exists()
assert destfile.exists()
Expand Down

0 comments on commit b1567d9

Please sign in to comment.