Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAS-1376 - Update HarmonyAdapter to perform many-to-one operations. #18

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ You can now run a workflow in your local Harmony stack and it will execute using

#### Testing & running the Service Independently

This will require credentials for the Harmony Sandbox NGAPShApplicationDeveloper
to be present in your `~/.aws/credentials` file.

Run tests with coverage reports:

$ make test
Expand Down
130 changes: 70 additions & 60 deletions harmony_netcdf_to_zarr/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

Service adapter for converting NetCDF4 to Zarr
"""

from os import environ
import shutil
from os.path import join as path_join
from shutil import rmtree
from tempfile import mkdtemp

from pystac import Asset
from harmony import BaseHarmonyAdapter
from harmony.util import generate_output_filename, HarmonyException

import harmony
from harmony.util import generate_output_filename, download, HarmonyException
from .convert import netcdf_to_zarr, make_localstack_s3fs, make_s3fs
from .download_utilities import download_granules
from .stac_utilities import get_netcdf_urls, get_output_catalog


ZARR_MEDIA_TYPES = ['application/zarr', 'application/x-zarr']


class ZarrException(HarmonyException):
Expand All @@ -26,7 +30,7 @@ def __init__(self, message=None):
super().__init__(message, 'harmonyservices/netcdf-to-zarr')


class NetCDFToZarrAdapter(harmony.BaseHarmonyAdapter):
class NetCDFToZarrAdapter(BaseHarmonyAdapter):
"""
Translates NetCDF4 to Zarr
"""
Expand All @@ -52,68 +56,74 @@ def __init__(self, message, catalog=None, config=None):
self.s3 = make_s3fs()

def invoke(self):
"""
Downloads, translates to Zarr, then re-uploads granules
""" Downloads, translates to Zarr, then re-uploads granules. The
`invoke` class method also validates the request by ensuring that
the requested output format is Zarr, and a STAC catalog is provided
to the service.

"""
if (
not self.message.format
or not self.message.format.mime
or self.message.format.mime not in ['application/zarr', 'application/x-zarr']
or self.message.format.mime not in ZARR_MEDIA_TYPES
):
self.logger.error(f'The Zarr formatter cannot convert to {self.message.format}, skipping')
raise ZarrException('Request failed due to an incorrect service workflow')
self.message.format.process('mime')
return super().invoke()
self.logger.error('The Zarr formatter cannot convert to '
f'{self.message.format}, skipping')
raise ZarrException('Request failed due to an incorrect service '
'workflow')
elif not self.catalog:
raise ZarrException('Invoking NetCDF-to-Zarr without STAC catalog '
'is not supported.')
else:
self.message.format.process('mime')
return (self.message, self.process_items_many_to_one())

def process_item(self, item, source):
"""
Converts an input STAC Item's data into Zarr, returning an output STAC item
def process_items_many_to_one(self):
""" Converts an input STAC Item's data into Zarr, returning an output
STAC catalog. This is a many-to-one operation by default. For
one-to-one operations, it is assumed that the `concatenate` query
parameter is False, and Harmony will invoke this backend service
once per input granule. Because of this, each backend invocation is
expected to produce a single Zarr output.

Parameters
----------
item : pystac.Item
the item that should be converted
source : harmony.message.Source
the input source defining the variables, if any, to subset from the item

Returns
-------
pystac.Item
a STAC item containing the Zarr output
"""
result = item.clone()
result.assets = {}

# Create a temporary dir for processing we may do
workdir = mkdtemp()
try:
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
input_filename = download(
asset.href,
workdir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config
)

name = generate_output_filename(asset.href, ext='.zarr')
root = self.message.stagingLocation + name

try:
store = self.s3.get_mapper(root=root, check=False, create=True)
netcdf_to_zarr(input_filename, store)
except Exception as e:
# Print the real error and convert to user-facing error that's more digestible
self.logger.error(e, exc_info=1)
filename = asset.href.split('?')[0].rstrip('/').split('/')[-1]
raise ZarrException(f'Could not convert file to Zarr: {filename}') from e

# Update the STAC record
result.assets['data'] = Asset(root, title=name, media_type='application/x-zarr', roles=['data'])

# Return the STAC record
return result
items = list(self.catalog.get_items())
netcdf_urls = get_netcdf_urls(items)

local_file_paths = download_granules(netcdf_urls, workdir,
self.message.accessToken,
self.config, self.logger)

# Enable during DAS-1379:
# dimensions_mapping = DimensionsMapping(local_file_paths)

if len(local_file_paths) == 1:
output_name = generate_output_filename(netcdf_urls[0],
ext='.zarr')
else:
# Mimicking PO.DAAC Concise: for many-to-one the file name is
# "<collection>_merged.zarr".
collection = self._get_item_source(items[0]).collection
output_name = f'{collection}_merged.zarr'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume {collection} is concept_id, not just short_name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to dig around for this in the unit tests for harmony-service-lib-py to confirm. Looking at the unit tests there, it looks like you are right; the URL will likely contain the collection concept ID, not the short name. (Here's the class method, where it's essentially looking for the parent item in the STAC catalogue).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short-Name is preferable for end-user. Is this possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree that a short name is more end-user friendly! I'm not sure that the short name is available to us - this is making use of a method on the BaseHarmonyAdapter class, so outside of the scope of the service itself.

I guess the other thing worth noting is that this is what the PO.DAAC Concise service is doing, so we're being consistent with existing services. I definitely think this is a good question, and think the short name would be preferable if available. This might be a conversation to float up to a higher level, to see if the collection short name can be made available to the services somehow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addendum (sorry, still mulling this over): I think, mainly, this all stems from what Harmony chooses to put in the input STAC catalogue sent to the backend service.

(An alternative might be to delve into the metadata of one of the granules to find a short name in the global attributes, but then we'd be inconsistent with Concise)


zarr_root = path_join(self.message.stagingLocation, output_name)
zarr_store = self.s3.get_mapper(root=zarr_root, check=False,
create=True)

if len(local_file_paths) == 1:
# Temporarily retain old version of NetCDF-to-Zarr service
netcdf_to_zarr(local_file_paths[0], zarr_store)
else:
# TODO: DAS-1379 - implement many-to-one Zarr output and
# enable concatenation.
raise NotImplementedError('Concatenation is not yet supported.')
Copy link

@D-Auty D-Auty Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused - isn't this where we initiate our aggregation processing? Above code addresses many-to-one options, why raise an exception here? Oh - I got it, from some conversation - this is still a mergeable solution but not the final solution (yet to come). Classic case for a ToDo comment, but not strictly necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly. My aim here was to ensure that we could deploy this and continue service for a single granule input, but we've not yet actually done the bit to write many-to-one input (that's DAS-1379).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a short-lived TODO comment referring to DAS-1379.


return get_output_catalog(self.catalog, zarr_root)
except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise ZarrException('Could not create Zarr output: '
f'{str(service_exception)}') from service_exception
finally:
# Clean up any intermediate resources
shutil.rmtree(workdir)
rmtree(workdir)
9 changes: 5 additions & 4 deletions harmony_netcdf_to_zarr/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Union
import re

import s3fs
from s3fs import S3FileSystem
import numpy as np
import zarr
from netCDF4 import Dataset
Expand All @@ -19,17 +19,17 @@

def make_localstack_s3fs():
host = os.environ.get('LOCALSTACK_HOST') or 'host.docker.internal'
return s3fs.S3FileSystem(
return S3FileSystem(
use_ssl=False,
key='ACCESS_KEY',
secret='SECRET_KEY',
client_kwargs=dict(
region_name=region,
endpoint_url='http://%s:4572' % (host)))
endpoint_url=f'http://{host}:4572'))


def make_s3fs():
return s3fs.S3FileSystem(client_kwargs=dict(region_name=region))
return S3FileSystem(client_kwargs=dict(region_name=region))


def netcdf_to_zarr(src, dst):
Expand Down Expand Up @@ -207,6 +207,7 @@ def __copy_variable(src, dst_group, name, sema=Semaphore(20)):
s3 = make_localstack_s3fs()
else:
s3 = make_s3fs()

group_name = os.path.join(dst_group.store.root, dst_group.path)
dst = s3.get_mapper(root=group_name, check=False, create=True)
dst_group = zarr.group(dst)
Expand Down
87 changes: 87 additions & 0 deletions harmony_netcdf_to_zarr/download_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
""" A utility module for performing multiple downloads simultaneously.
This is largely derived from the PO.DAAC Concise service:

https://github.com/podaac/concise/blob/develop/podaac/merger/harmony/download_worker.py

"""
from copy import deepcopy
from logging import Logger
from multiprocessing import Manager, Process, Queue
from os import cpu_count
from queue import Empty as QueueEmpty
from typing import List

from harmony.util import Config, download


def download_granules(netcdf_urls: List[str], destination_directory: str,
access_token: str, harmony_config: Config,
logger: Logger, process_count: int = None) -> List[str]:
""" A method which scales concurrent downloads to the number of available
CPU cores. For further explanation, see documentation on "multi-track
drifting"

"""
logger.info('Beginning granule downloads.')

if process_count is None:
process_count = cpu_count()
else:
process_count = min(process_count, cpu_count())

with Manager() as manager:
download_queue = manager.Queue(len(netcdf_urls))
local_paths = manager.list()

for netcdf_url in netcdf_urls:
download_queue.put(netcdf_url)

# Spawn a worker process for each CPU being used
processes = [Process(target=_download_worker,
args=(download_queue, local_paths,
destination_directory, access_token,
harmony_config, logger))
for _ in range(process_count)]

for download_process in processes:
download_process.start()

# Ensure worker processes exit successfully
for download_process in processes:
download_process.join()
if download_process.exitcode != 0:
raise RuntimeError('Download failed - exit code: '
f'{download_process.exitcode}')

download_process.close()

# Copy paths so they persist outside of the Manager context.
download_paths = deepcopy(local_paths)

logger.info('Finished downloading granules')

return download_paths


def _download_worker(download_queue: Queue, local_paths: List,
destination_dir: str, access_token: str,
harmony_config: Config, logger: Logger):
""" A method to be executed in a separate process. This will check for
items in the queue, which correspond to URLs for NetCDF-4 files to
download. If there is at least one URL left for download, then it is
retrieved from the queue and the `harmony-py.util.download` function
is used to retrieve the granule. Otherwise, the process is ended. All
local paths of downloaded granules are added to a list that is
administered by the process manager instance.

"""
while not download_queue.empty():
try:
netcdf_url = download_queue.get_nowait()
except QueueEmpty:
break

local_path = download(netcdf_url, destination_dir, logger=logger,
access_token=access_token, cfg=harmony_config)

local_paths.append(local_path)