Skip to content

Commit

Permalink
ENH: append StreamDatums to StreamResource
Browse files Browse the repository at this point in the history
  • Loading branch information
genematx committed Mar 13, 2024
1 parent 9140a65 commit b479cbe
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions bluesky/callbacks/tiled_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import pandas as pd
from event_model import DocumentRouter, RunRouter
from tiled.client import from_profile, from_uri
from tiled.structures.array import ArrayStructure
from tiled.structures.array import ArrayStructure, BuiltinDtype
from tiled.structures.core import Spec, StructureFamily
from tiled.structures.data_source import Asset, DataSource, Management

MIME_LOOKUP = {"TEST": "application/x-hdf5"}
MIMETYPE_LOOKUP = {"hdf5": "application/x-hdf5"}


class TiledWriter:
Expand Down Expand Up @@ -79,14 +79,18 @@ def stream_resource(self, doc):
def stream_datum(self, doc):
descriptor_node = self._descriptor_nodes[doc["descriptor"]]
arr_shape = dict(descriptor_node.metadata)["data_keys"]["image"]["shape"]
num_rows = (
doc["indices"]["stop"] - doc["indices"]["start"]
) # Number of rows added by new StreamDatum

# Get the Stream Resource node if it already exists or register if from a cached SR document
print(f"Processing Stream Datum \n {doc}")

try:
SR_node = self._SR_nodes[doc["stream_resource"]]
except KeyError:
# Register new Stream Resource

except KeyError:
# Register a new (empty) Stream Resource
SR_doc = self._SR_cache.pop(doc["stream_resource"])

# POST /api/v1/register/{path}
Expand All @@ -105,9 +109,14 @@ def stream_datum(self, doc):
data_sources=[
DataSource(
assets=assets,
mimetype=MIME_LOOKUP[SR_doc["spec"]],
mimetype=MIMETYPE_LOOKUP[SR_doc["spec"]],
structure_family=StructureFamily.array,
structure=ArrayStructure.from_array(np.ones(arr_shape)),
structure=ArrayStructure(
data_type=BuiltinDtype.from_numpy_dtype(np.dtype("double")),
shape=[0] + arr_shape,
chunks=[[0]] + [[d] for d in arr_shape],
),
# structure=ArrayStructure.from_array(np.ones(arr_shape)),
parameters={"path": ["test"]},
management=Management.external,
)
Expand All @@ -117,3 +126,16 @@ def stream_datum(self, doc):
)

self._SR_nodes[SR_doc["uid"]] = SR_node

# Append StreamDatum to an existing StereamResource (by overwriting it with changed shape)
url = SR_node.uri.replace("/metadata/", "/data_source/")
SR_node.refresh()
ds_dict = SR_node.data_sources()[0]
# SR_node.include_data_sources() ?
ds_dict["structure"]["shape"][0] += num_rows
ds_dict["structure"]["chunks"][0] = [min(ds_dict["structure"]["shape"][0], 100)]
SR_node.context.http_client.put(
url, json={"data_source": ds_dict}, params={"data_source": 1}
)

# breakpoint()

0 comments on commit b479cbe

Please sign in to comment.