Source code for ewoksdata.data.blissdata.blissdatav1
from numbers import Number
from collections import Counter
from typing import List, Optional
from numpy.typing import ArrayLike
from .exceptions import VersionError
try:
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.scan import ScanState
from blissdata.redis_engine.stream import StreamingClient
from blissdata.redis_engine.exceptions import EndOfStream
from blissdata.redis_engine.exceptions import IndexNoMoreThereError
from blissdata.lima.client import lima_client_factory
except ImportError as e:
raise VersionError(str(e)) from e
[docs]
def iter_bliss_scan_data_from_memory(
db_name: str,
lima_names: List[str],
counter_names: List[str],
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
data_store = _get_data_store()
scan = data_store.load_scan(db_name, scan_cls=Scan)
buffers = {name: list() for name in lima_names + counter_names}
while scan.state < ScanState.PREPARED:
scan.update()
lima_streams = dict()
lima_clients = dict()
counter_streams = dict()
for name, stream in scan.streams.items():
if stream.encoding["type"] == "json" and "lima" in stream.info["format"]:
if name.split(":")[-2] in lima_names:
lima_streams[name] = stream
lima_clients[name] = lima_client_factory(data_store, stream.info)
elif name.split(":")[-1] in counter_names:
counter_streams[name] = stream
client = StreamingClient({**lima_streams, **counter_streams})
lima_buffer_count = Counter()
while True:
try:
output = client.read()
except EndOfStream:
break
for stream, (_, payload) in output.items():
name_parts = stream.name.split(":")
if stream.name in lima_streams:
# payload is a sequence of JSON statuses
ctr_name = name_parts[-2]
last_status = payload[-1]
lima_client = lima_clients[stream.name]
lima_client.update(**last_status)
n_already_read = lima_buffer_count[ctr_name]
try:
data = lima_client[n_already_read:]
except IndexNoMoreThereError:
continue
buffers[ctr_name].extend(data)
lima_buffer_count[ctr_name] += len(data)
else:
# payload is a sequence of data points (0D, 1D, 2D)
ctr_name = name_parts[-1]
buffers[ctr_name].extend(payload)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
[docs]
def last_lima_image(channel_info: dict) -> ArrayLike:
"""Get last lima image from memory"""
data_store = _get_data_store()
lima_client = lima_client_factory(data_store, channel_info)
return lima_client.get_last_live_image().array
def _get_data_store() -> None:
redis_url = BeaconData().get_redis_data_db()
return DataStore(redis_url)