Source code for

import os
from datetime import datetime
from pathlib import Path
from typing import Tuple
import numpy
import h5py

__all__ = ["save_bliss_scan"]

[docs] def save_bliss_scan( tmpdir: Path, image: numpy.ndarray, npoints_per_file: int, npoints: int, scannr: int, subscannr: int = 1, sequence: str = "add", lima_names: Tuple[str, ...] = ("p3",), counter_names: Tuple[str, ...] = ("diode1", "diode2"), ) -> Path: dataset_filename = tmpdir / "sample_dataset.h5" lima_dirname = tmpdir / f"scan{scannr:04d}" lima_dirname.mkdir() with h5py.File(str(dataset_filename), mode="w") as root: root.attrs["NX_class"] = "NXroot" root.attrs["creator"] = "bliss" scan = root.create_group(f"{scannr}.{subscannr}") scan.attrs["NX_class"] = "NXentry" instrument = scan.create_group("instrument") instrument.attrs["NX_class"] = "NXinstrument" measurement = scan.create_group("measurement") measurement.attrs["NX_class"] = "NXcollection" for name in counter_names: _save_counter(scan, name, npoints) for name in lima_names: _save_lima( scan, name, image, npoints, npoints_per_file, lima_dirname, scannr, sequence, ) scan["end_time"] = return dataset_filename
def _save_counter(scan: h5py.Group, name: str, npoints: int) -> None: diode = _add_detector(scan, name) diode["data"] = numpy.arange(npoints) def _save_lima( scan: h5py.Group, name: str, image: numpy.ndarray, npoints: int, npoints_per_file: int, lima_dirname: Path, scannr: int, sequence: str, ) -> None: tot_shape = (npoints,) + image.shape lima_shape = (npoints_per_file,) + image.shape nlima = npoints // npoints_per_file + bool(npoints % npoints_per_file) lima_dtype = float layout = h5py.VirtualLayout(tot_shape, dtype=lima_dtype) for i in range(nlima): with h5py.File(str(lima_dirname / f"{name}_{i}.h5"), mode="w") as limaroot: limaroot.attrs["NX_class"] = "NXroot" entry = limaroot.create_group("entry_0000") entry.attrs["NX_class"] = "NXentry" lmeasurement = entry.create_group("measurement") lmeasurement.attrs["NX_class"] = "NXcollection" lmeasurement["data"] = numpy.full(lima_shape, numpy.nan) i0 = i * npoints_per_file i1 = min(i0 + npoints_per_file, npoints) for itot in range(i0, i1): if sequence == "add": lmeasurement["data"][itot - i0, ...] = image + itot else: m = itot / (npoints - 1) + 1 if itot == 0: monitor = _add_detector(scan, "monitor") monitor["data"] = numpy.zeros(npoints) monitor["data"][itot] = m lmeasurement["data"][itot - i0, ...] = image * m spath = os.path.join(f"scan{scannr:04d}", f"{name}_{i}.h5") sdset = "/entry_0000/measurement/data" sshape = (i1 - i0,) + lima_shape[1:] vsource = h5py.VirtualSource(spath, sdset, shape=sshape, dtype=lima_dtype) layout[i0:i1] = vsource detector = _add_detector(scan, name) detector.create_virtual_dataset("data", layout, fillvalue=numpy.nan) def _add_detector(scan: h5py.Group, name: str) -> h5py.Group: grp = scan["instrument"].create_group(name) grp.attrs["NX_class"] = "NXdetector" scan["measurement"][name] = h5py.SoftLink(f"{}/data") return grp