-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DAS-1376 - Update HarmonyAdapter to perform many-to-one operations. #18
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,16 +5,20 @@ | |
|
||
Service adapter for converting NetCDF4 to Zarr | ||
""" | ||
|
||
from os import environ | ||
import shutil | ||
from os.path import join as path_join | ||
from shutil import rmtree | ||
from tempfile import mkdtemp | ||
|
||
from pystac import Asset | ||
from harmony import BaseHarmonyAdapter | ||
from harmony.util import generate_output_filename, HarmonyException | ||
|
||
import harmony | ||
from harmony.util import generate_output_filename, download, HarmonyException | ||
from .convert import netcdf_to_zarr, make_localstack_s3fs, make_s3fs | ||
from .download_utilities import download_granules | ||
from .stac_utilities import get_netcdf_urls, get_output_catalog | ||
|
||
|
||
ZARR_MEDIA_TYPES = ['application/zarr', 'application/x-zarr'] | ||
|
||
|
||
class ZarrException(HarmonyException): | ||
|
@@ -26,7 +30,7 @@ def __init__(self, message=None): | |
super().__init__(message, 'harmonyservices/netcdf-to-zarr') | ||
|
||
|
||
class NetCDFToZarrAdapter(harmony.BaseHarmonyAdapter): | ||
class NetCDFToZarrAdapter(BaseHarmonyAdapter): | ||
""" | ||
Translates NetCDF4 to Zarr | ||
""" | ||
|
@@ -52,68 +56,74 @@ def __init__(self, message, catalog=None, config=None): | |
self.s3 = make_s3fs() | ||
|
||
def invoke(self): | ||
""" | ||
Downloads, translates to Zarr, then re-uploads granules | ||
""" Downloads, translates to Zarr, then re-uploads granules. The | ||
`invoke` class method also validates the request by ensuring that | ||
the requested output format is Zarr, and a STAC catalog is provided | ||
to the service. | ||
|
||
""" | ||
if ( | ||
not self.message.format | ||
or not self.message.format.mime | ||
or self.message.format.mime not in ['application/zarr', 'application/x-zarr'] | ||
or self.message.format.mime not in ZARR_MEDIA_TYPES | ||
): | ||
self.logger.error(f'The Zarr formatter cannot convert to {self.message.format}, skipping') | ||
raise ZarrException('Request failed due to an incorrect service workflow') | ||
self.message.format.process('mime') | ||
return super().invoke() | ||
self.logger.error('The Zarr formatter cannot convert to ' | ||
f'{self.message.format}, skipping') | ||
raise ZarrException('Request failed due to an incorrect service ' | ||
'workflow') | ||
elif not self.catalog: | ||
raise ZarrException('Invoking NetCDF-to-Zarr without STAC catalog ' | ||
'is not supported.') | ||
else: | ||
self.message.format.process('mime') | ||
return (self.message, self.process_items_many_to_one()) | ||
|
||
def process_item(self, item, source): | ||
""" | ||
Converts an input STAC Item's data into Zarr, returning an output STAC item | ||
def process_items_many_to_one(self): | ||
""" Converts an input STAC Item's data into Zarr, returning an output | ||
STAC catalog. This is a many-to-one operation by default. For | ||
one-to-one operations, it is assumed that the `concatenate` query | ||
parameter is False, and Harmony will invoke this backend service | ||
once per input granule. Because of this, each backend invocation is | ||
expected to produce a single Zarr output. | ||
|
||
Parameters | ||
---------- | ||
item : pystac.Item | ||
the item that should be converted | ||
source : harmony.message.Source | ||
the input source defining the variables, if any, to subset from the item | ||
|
||
Returns | ||
------- | ||
pystac.Item | ||
a STAC item containing the Zarr output | ||
""" | ||
result = item.clone() | ||
result.assets = {} | ||
|
||
# Create a temporary dir for processing we may do | ||
workdir = mkdtemp() | ||
try: | ||
# Get the data file | ||
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or [])) | ||
input_filename = download( | ||
asset.href, | ||
workdir, | ||
logger=self.logger, | ||
access_token=self.message.accessToken, | ||
cfg=self.config | ||
) | ||
|
||
name = generate_output_filename(asset.href, ext='.zarr') | ||
root = self.message.stagingLocation + name | ||
|
||
try: | ||
store = self.s3.get_mapper(root=root, check=False, create=True) | ||
netcdf_to_zarr(input_filename, store) | ||
except Exception as e: | ||
# Print the real error and convert to user-facing error that's more digestible | ||
self.logger.error(e, exc_info=1) | ||
filename = asset.href.split('?')[0].rstrip('/').split('/')[-1] | ||
raise ZarrException(f'Could not convert file to Zarr: {filename}') from e | ||
|
||
# Update the STAC record | ||
result.assets['data'] = Asset(root, title=name, media_type='application/x-zarr', roles=['data']) | ||
|
||
# Return the STAC record | ||
return result | ||
items = list(self.catalog.get_items()) | ||
netcdf_urls = get_netcdf_urls(items) | ||
|
||
local_file_paths = download_granules(netcdf_urls, workdir, | ||
self.message.accessToken, | ||
self.config, self.logger) | ||
|
||
# Enable during DAS-1379: | ||
# dimensions_mapping = DimensionsMapping(local_file_paths) | ||
|
||
if len(local_file_paths) == 1: | ||
output_name = generate_output_filename(netcdf_urls[0], | ||
ext='.zarr') | ||
else: | ||
# Mimicking PO.DAAC Concise: for many-to-one the file name is | ||
# "<collection>_merged.zarr". | ||
collection = self._get_item_source(items[0]).collection | ||
output_name = f'{collection}_merged.zarr' | ||
|
||
zarr_root = path_join(self.message.stagingLocation, output_name) | ||
zarr_store = self.s3.get_mapper(root=zarr_root, check=False, | ||
create=True) | ||
|
||
if len(local_file_paths) == 1: | ||
# Temporarily retain old version of NetCDF-to-Zarr service | ||
netcdf_to_zarr(local_file_paths[0], zarr_store) | ||
else: | ||
# TODO: DAS-1379 - implement many-to-one Zarr output and | ||
# enable concatenation. | ||
raise NotImplementedError('Concatenation is not yet supported.') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused - isn't this where we initiate our aggregation processing? Above code addresses many-to-one options, why raise an exception here? Oh - I got it, from some conversation - this is still a mergeable solution but not the final solution (yet to come). Classic case for a ToDo comment, but not strictly necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah exactly. My aim here was to ensure that we could deploy this and continue service for a single granule input, but we've not yet actually done the bit to write many-to-one input (that's DAS-1379). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a short-lived TODO comment referring to DAS-1379. |
||
|
||
return get_output_catalog(self.catalog, zarr_root) | ||
except Exception as service_exception: | ||
self.logger.error(service_exception, exc_info=1) | ||
raise ZarrException('Could not create Zarr output: ' | ||
f'{str(service_exception)}') from service_exception | ||
finally: | ||
# Clean up any intermediate resources | ||
shutil.rmtree(workdir) | ||
rmtree(workdir) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
""" A utility module for performing multiple downloads simultaneously. | ||
This is largely derived from the PO.DAAC Concise service: | ||
|
||
https://github.com/podaac/concise/blob/develop/podaac/merger/harmony/download_worker.py | ||
|
||
""" | ||
from copy import deepcopy | ||
from logging import Logger | ||
from multiprocessing import Manager, Process, Queue | ||
from os import cpu_count | ||
from queue import Empty as QueueEmpty | ||
from typing import List | ||
|
||
from harmony.util import Config, download | ||
|
||
|
||
def download_granules(netcdf_urls: List[str], destination_directory: str, | ||
access_token: str, harmony_config: Config, | ||
logger: Logger, process_count: int = None) -> List[str]: | ||
""" A method which scales concurrent downloads to the number of available | ||
CPU cores. For further explanation, see documentation on "multi-track | ||
drifting" | ||
|
||
""" | ||
logger.info('Beginning granule downloads.') | ||
|
||
if process_count is None: | ||
process_count = cpu_count() | ||
else: | ||
process_count = min(process_count, cpu_count()) | ||
|
||
with Manager() as manager: | ||
download_queue = manager.Queue(len(netcdf_urls)) | ||
local_paths = manager.list() | ||
|
||
for netcdf_url in netcdf_urls: | ||
download_queue.put(netcdf_url) | ||
|
||
# Spawn a worker process for each CPU being used | ||
processes = [Process(target=_download_worker, | ||
args=(download_queue, local_paths, | ||
destination_directory, access_token, | ||
harmony_config, logger)) | ||
for _ in range(process_count)] | ||
|
||
for download_process in processes: | ||
download_process.start() | ||
|
||
# Ensure worker processes exit successfully | ||
for download_process in processes: | ||
download_process.join() | ||
if download_process.exitcode != 0: | ||
raise RuntimeError('Download failed - exit code: ' | ||
f'{download_process.exitcode}') | ||
|
||
download_process.close() | ||
|
||
# Copy paths so they persist outside of the Manager context. | ||
download_paths = deepcopy(local_paths) | ||
|
||
logger.info('Finished downloading granules') | ||
|
||
return download_paths | ||
|
||
|
||
def _download_worker(download_queue: Queue, local_paths: List, | ||
destination_dir: str, access_token: str, | ||
harmony_config: Config, logger: Logger): | ||
""" A method to be executed in a separate process. This will check for | ||
items in the queue, which correspond to URLs for NetCDF-4 files to | ||
download. If there is at least one URL left for download, then it is | ||
retrieved from the queue and the `harmony-py.util.download` function | ||
is used to retrieve the granule. Otherwise, the process is ended. All | ||
local paths of downloaded granules are added to a list that is | ||
administered by the process manager instance. | ||
|
||
""" | ||
while not download_queue.empty(): | ||
try: | ||
netcdf_url = download_queue.get_nowait() | ||
except QueueEmpty: | ||
break | ||
|
||
local_path = download(netcdf_url, destination_dir, logger=logger, | ||
access_token=access_token, cfg=harmony_config) | ||
|
||
local_paths.append(local_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume {collection} is concept_id, not just short_name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to dig around for this in the unit tests for
harmony-service-lib-py
to confirm. Looking at the unit tests there, it looks like you are right; the URL will likely contain the collection concept ID, not the short name. (Here's the class method, where it's essentially looking for the parent item in the STAC catalogue).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short-Name is preferable for end-user. Is this possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely agree that a short name is more end-user friendly! I'm not sure that the short name is available to us - this is making use of a method on the
BaseHarmonyAdapter
class, so outside of the scope of the service itself.I guess the other thing worth noting is that this is what the PO.DAAC Concise service is doing, so we're being consistent with existing services. I definitely think this is a good question, and think the short name would be preferable if available. This might be a conversation to float up to a higher level, to see if the collection short name can be made available to the services somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addendum (sorry, still mulling this over): I think, mainly, this all stems from what Harmony chooses to put in the input STAC catalogue sent to the backend service.
(An alternative might be to delve into the metadata of one of the granules to find a short name in the global attributes, but then we'd be inconsistent with Concise)