Skip to content

Commit

Permalink
Sketches and notes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Apr 9, 2024
1 parent e973c0b commit 955093c
Showing 1 changed file with 118 additions and 14 deletions.
132 changes: 118 additions & 14 deletions src/bluesky/callbacks/tiled_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,118 @@
}


# TODO (probably in a future PR?):
# Move these into external repo (probably area-detector-handlers)
# and use the existing handler discovery mechanism.

"""
A handler consumes documents from Bluesky. It composes details
(DataSource and its Assets) that will go into the Tiled database.
Tiled Adapters will later use this to read the data, with good
random access and bulk access support.
We put this code into handlers so that additional, possibly very
unusual, formats can be supported by users without getting a PR
merged into Bluesky or Tiled.
The STREAM_HANDLER_REGISTRY below (which is just a sketch)
and the Tiled catalog paramter adapters_by_mimetype
can together be used to support:
- ingesting a new mimetype from Bluesky documents and generating
DataSource and Asset with appropriate parameters (the handler's job)
- Interpreting those DataSource and Asset parameters to do I/O
(the adapter's job)
"""
# The above seems solid (to Dan).

# The below is confusing (to Dan).
# We have a use for StreamDatum(s) in, data out.
# No dependency on Tiled, its database, HTTP, etc.
# We just want fast incremental access to streaming data.

# In common:
# - knowledge of format details (meaning of parameters like swmr)
# - instance state to track things like num_total_rows.

# Different:
# - The "at rest" use case has a conceptual dependency on
# DataSource Asset.
# - The streaming data use case has an import dependency o
# whatever the I/O library is (e.g. h5py).


class Thing:
def __init__(self, data_uri, swmr): # TODO Add reshape, skips.
...

def read_stream_datum(self, *stream_datum_docs):
import h5py

array = ...
return array


class HDF5StreamHandler:
# This will be instantiated per StreamResource.

def __init__(self, data_uri, swmr): # TODO Add reshape, skips.
self.data_uri = data_uri
self._total_rows = 0

def consume_stream_datum(self, stream_datum_docs):
# This will be called for every new StreamDatum received.
# Consume new StreamDatum, update chunk.
...
self._total_rows += ...

# Does this belong here??

# def consume_and_read_stream_datum(self, *stream_datum_docs):
# self.consume_stream_datum(*stream_datum_docs)
# import h5py
#
# array = ...
# return array

def get_data_source(self):
# This will be called when we want to update Tiled.
# If the rate of StreamDatum consumed is high, we may not update
# Tiled _every_ time we consume a new StreamDatum, but do some
# batching of update. This is why this is its own method.
return DataSource(
assets=[Asset(data_uri=data_uri, is_directory=False, parameter="data_uri")],
mimetype=mimetype,
structure_family=StructureFamily.array,
structure=ArrayStructure(
data_type=BuiltinDtype.from_numpy_dtype(data_type),
shape=[0, *data_shape],
chunks=[[]] + [[d] for d in data_shape],
),
parameters={"path": data_path.split("/")},
management=Management.external,
)


class TIFFStreamHandler:
def __init__(self, data_uri):
self.data_uri = data_uri

def consume_stream_datum(self, doc):
...

def get_data_source_and_assets(self):
return DataSource(...)


STREAM_HANDLER_REGISTRY = {
"application/x-hdf5": HDF5StreamHandler,
"image/tiff": TIFFStreamHandler,
}


class TiledWriter:
"Write metadata and data from Bluesky documents into Tiled."

Expand Down Expand Up @@ -152,23 +264,15 @@ def stream_datum(self, doc: StreamDatum):
file_path = sr_doc["root"].strip("/") + "/" + sr_doc["resource_path"].strip("/")
data_uri = "file://localhost/" + file_path

handler_class = STREAM_HANDLER_REGISTRY[mimetype]
handler = handler_class(data_uri, **parameters)
self._handlers[sr_uid] = handler
data_source = handler.get_data_source()

sr_node = parent_node.new(
key=data_key,
structure_family=StructureFamily.array,
data_sources=[
DataSource(
assets=[Asset(data_uri=data_uri, is_directory=False, parameter="data_uri")],
mimetype=mimetype,
structure_family=StructureFamily.array,
structure=ArrayStructure(
data_type=BuiltinDtype.from_numpy_dtype(data_type),
shape=[0, *data_shape],
chunks=[[]] + [[d] for d in data_shape],
),
parameters={"path": data_path.split("/")},
management=Management.external,
)
],
data_sources=[data_source],
metadata=sr_doc,
specs=[],
)
Expand Down

0 comments on commit 955093c

Please sign in to comment.