Skip to content

Commit

Permalink
DAS-1376 - Update HarmonyAdapter to perform many-to-one operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
owenlittlejohns committed Feb 19, 2022
1 parent 11811f5 commit a3385cf
Show file tree
Hide file tree
Showing 13 changed files with 1,065 additions and 152 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ You can now run a workflow in your local Harmony stack and it will execute using

#### Testing & running the Service Independently

This will require credentials for the Harmony Sandbox NGAPShApplicationDeveloper
to be present in your `~/.aws/credentials` file.

Run tests with coverage reports:

$ make test
Expand Down
128 changes: 68 additions & 60 deletions harmony_netcdf_to_zarr/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
Service adapter for converting NetCDF4 to Zarr
"""

from os import environ
import shutil
from os.path import join as path_join
from shutil import rmtree
from tempfile import mkdtemp

from pystac import Asset
from harmony import BaseHarmonyAdapter
from harmony.util import generate_output_filename, HarmonyException

import harmony
from harmony.util import generate_output_filename, download, HarmonyException
from .convert import netcdf_to_zarr, make_localstack_s3fs, make_s3fs
from .download_utilities import download_granules
from .stac_utilities import get_netcdf_urls, get_output_catalog


ZARR_MEDIA_TYPES = ['application/zarr', 'application/x-zarr']


class ZarrException(HarmonyException):
Expand All @@ -26,7 +30,7 @@ def __init__(self, message=None):
super().__init__(message, 'harmonyservices/netcdf-to-zarr')


class NetCDFToZarrAdapter(harmony.BaseHarmonyAdapter):
class NetCDFToZarrAdapter(BaseHarmonyAdapter):
"""
Translates NetCDF4 to Zarr
"""
Expand All @@ -52,68 +56,72 @@ def __init__(self, message, catalog=None, config=None):
self.s3 = make_s3fs()

def invoke(self):
"""
Downloads, translates to Zarr, then re-uploads granules
""" Downloads, translates to Zarr, then re-uploads granules. The
`invoke` class method also validates the request by ensuring that
the requested output format is Zarr, and a STAC catalog is provided
to the service.
"""
if (
not self.message.format
or not self.message.format.mime
or self.message.format.mime not in ['application/zarr', 'application/x-zarr']
or self.message.format.mime not in ZARR_MEDIA_TYPES
):
self.logger.error(f'The Zarr formatter cannot convert to {self.message.format}, skipping')
raise ZarrException('Request failed due to an incorrect service workflow')
self.message.format.process('mime')
return super().invoke()
self.logger.error('The Zarr formatter cannot convert to '
f'{self.message.format}, skipping')
raise ZarrException('Request failed due to an incorrect service '
'workflow')
elif not self.catalog:
raise ZarrException('Invoking NetCDF-to-Zarr without STAC catalog '
'is not supported.')
else:
self.message.format.process('mime')
return (self.message, self.process_items_many_to_one())

def process_item(self, item, source):
"""
Converts an input STAC Item's data into Zarr, returning an output STAC item
def process_items_many_to_one(self):
""" Converts an input STAC Item's data into Zarr, returning an output
STAC catalog. This is a many-to-one operation by default. For
one-to-one operations, it is assumed that the `concatenate` query
parameter is False, and Harmony will invoke this backend service
once per input granule. Because of this, each backend invocation is
expected to produce a single Zarr output.
Parameters
----------
item : pystac.Item
the item that should be converted
source : harmony.message.Source
the input source defining the variables, if any, to subset from the item
Returns
-------
pystac.Item
a STAC item containing the Zarr output
"""
result = item.clone()
result.assets = {}

# Create a temporary dir for processing we may do
workdir = mkdtemp()
try:
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
input_filename = download(
asset.href,
workdir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config
)

name = generate_output_filename(asset.href, ext='.zarr')
root = self.message.stagingLocation + name

try:
store = self.s3.get_mapper(root=root, check=False, create=True)
netcdf_to_zarr(input_filename, store)
except Exception as e:
# Print the real error and convert to user-facing error that's more digestible
self.logger.error(e, exc_info=1)
filename = asset.href.split('?')[0].rstrip('/').split('/')[-1]
raise ZarrException(f'Could not convert file to Zarr: {filename}') from e

# Update the STAC record
result.assets['data'] = Asset(root, title=name, media_type='application/x-zarr', roles=['data'])

# Return the STAC record
return result
items = list(self.catalog.get_items())
netcdf_urls = get_netcdf_urls(items)

local_file_paths = download_granules(netcdf_urls, workdir,
self.message.accessToken,
self.config, self.logger)

# Enable during DAS-1379:
# dimensions_mapping = DimensionsMapping(local_file_paths)

if len(local_file_paths) == 1:
output_name = generate_output_filename(netcdf_urls[0],
ext='.zarr')
else:
# Mimicking PO.DAAC Concise: for many-to-one the file name is
# "<collection>_merged.zarr".
collection = self._get_item_source(items[0]).collection
output_name = f'{collection}_merged.zarr'

zarr_root = path_join(self.message.stagingLocation, output_name)
zarr_store = self.s3.get_mapper(root=zarr_root, check=False,
create=True)

if len(local_file_paths) == 1:
# Temporarily retain old version of NetCDF-to-Zarr service
netcdf_to_zarr(local_file_paths[0], zarr_store)
else:
raise NotImplementedError('Concatenation is not yet supported.')

return get_output_catalog(self.catalog, zarr_root)
except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise ZarrException('Could not create Zarr output: '
f'{str(service_exception)}') from service_exception
finally:
# Clean up any intermediate resources
shutil.rmtree(workdir)
rmtree(workdir)
9 changes: 5 additions & 4 deletions harmony_netcdf_to_zarr/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Union
import re

import s3fs
from s3fs import S3FileSystem
import numpy as np
import zarr
from netCDF4 import Dataset
Expand All @@ -19,17 +19,17 @@

def make_localstack_s3fs():
host = os.environ.get('LOCALSTACK_HOST') or 'host.docker.internal'
return s3fs.S3FileSystem(
return S3FileSystem(
use_ssl=False,
key='ACCESS_KEY',
secret='SECRET_KEY',
client_kwargs=dict(
region_name=region,
endpoint_url='http://%s:4572' % (host)))
endpoint_url=f'http://{host}:4572'))


def make_s3fs():
return s3fs.S3FileSystem(client_kwargs=dict(region_name=region))
return S3FileSystem(client_kwargs=dict(region_name=region))


def netcdf_to_zarr(src, dst):
Expand Down Expand Up @@ -207,6 +207,7 @@ def __copy_variable(src, dst_group, name, sema=Semaphore(20)):
s3 = make_localstack_s3fs()
else:
s3 = make_s3fs()

group_name = os.path.join(dst_group.store.root, dst_group.path)
dst = s3.get_mapper(root=group_name, check=False, create=True)
dst_group = zarr.group(dst)
Expand Down
87 changes: 87 additions & 0 deletions harmony_netcdf_to_zarr/download_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
""" A utility module for performing multiple downloads simultaneously.
This is largely derived from the PO.DAAC Concise service:
https://github.com/podaac/concise/blob/develop/podaac/merger/harmony/download_worker.py
"""
from copy import deepcopy
from logging import Logger
from multiprocessing import Manager, Process, Queue
from os import cpu_count
from queue import Empty as QueueEmpty
from typing import List

from harmony.util import Config, download


def download_granules(netcdf_urls: List[str], destination_directory: str,
access_token: str, harmony_config: Config,
logger: Logger, process_count: int = None) -> List[str]:
""" A method which scales concurrent downloads to the number of available
CPU cores. For further explaination, see documentation on "multi-track
drifting"
"""
logger.info('Beginning granule downloads.')

if process_count is None:
process_count = cpu_count()
else:
process_count = min(process_count, cpu_count())

with Manager() as manager:
download_queue = manager.Queue(len(netcdf_urls))
local_paths = manager.list()

for netcdf_url in netcdf_urls:
download_queue.put(netcdf_url)

# Spawn a worker process for each CPU being used
processes = [Process(target=_download_worker,
args=(download_queue, local_paths,
destination_directory, access_token,
harmony_config, logger))
for _ in range(process_count)]

for download_process in processes:
download_process.start()

# Ensure worker processes exit successfully
for download_process in processes:
download_process.join()
if download_process.exitcode != 0:
raise RuntimeError('Download failed - exit code: '
f'{download_process.exitcode}')

download_process.close()

# Copy paths so they persist outside of the Manager context.
download_paths = deepcopy(local_paths)

logger.info('Finished downloading granules')

return download_paths


def _download_worker(download_queue: Queue, local_paths: List,
destination_dir: str, access_token: str,
harmony_config: Config, logger: Logger):
""" A method to be executed in a separate process. This will check for
items in the queue, which correspond to URLs for NetCDF-4 files to
download. If there is at least one URL left for download, then it is
retrieved from the queue and the `harmony-py.util.download` function
is used to retrieve the granule. Otherwise, the process is ended. All
local paths of downloaded granules are added to a list that is
administered by the process manager instance.
"""
while not download_queue.empty():
try:
netcdf_url = download_queue.get_nowait()
except QueueEmpty:
break

local_path = download(netcdf_url, destination_dir, logger=logger,
access_token=access_token, cfg=harmony_config)

local_paths.append(local_path)

0 comments on commit a3385cf

Please sign in to comment.