Source code for ewoksdata.tests.test_dataset_writer

import time
import itertools
import multiprocessing

import h5py
import numpy
import pytest
from silx.io import h5py_utils


from ..data.hdf5 import dataset_writer


[docs] @pytest.mark.parametrize("npoints", (1, 3, 1000)) @pytest.mark.parametrize("flush_period", (None, 0.1)) @pytest.mark.parametrize("known_npoints", (True, False)) def test_dataset_writer(tmpdir, npoints, flush_period, known_npoints): expected = list() filename = str(tmpdir / "test.h5") if flush_period is None: sleep_time = None else: sleep_time = flush_period + 0.1 isleep = npoints // 3 kwargs = {"flush_period": flush_period} if known_npoints: kwargs["npoints"] = npoints with h5py.File(filename, mode="w") as f: with dataset_writer.DatasetWriter(f, "data", **kwargs) as writer: for ipoint in range(npoints): data = numpy.random.random((10, 20)) writer.add_point(data) expected.append(data) if sleep_time and ipoint == isleep: time.sleep(sleep_time) with h5py.File(filename, mode="r") as f: data = f["data"][()] numpy.testing.assert_allclose(data, expected)
[docs] @pytest.mark.parametrize("nstack", (1, 4)) @pytest.mark.parametrize("npoints", (1, 3, 1000)) @pytest.mark.parametrize("flush_period", (None, 0.1)) @pytest.mark.parametrize("known_npoints", (True, False)) @pytest.mark.parametrize("known_nstack", (True, False)) @pytest.mark.parametrize("append_stacks_in_parallel", (True, False)) def test_stack_dataset_writer( tmpdir, nstack, npoints, flush_period, known_npoints, known_nstack, append_stacks_in_parallel, ): expected = [list() for _ in range(nstack)] filename = str(tmpdir / "test.h5") if flush_period is None: sleep_time = None else: sleep_time = flush_period + 0.1 isleep = (nstack * npoints) // 3 kwargs = {"flush_period": flush_period} if known_npoints: kwargs["npoints"] = npoints if known_nstack: kwargs["nstack"] = nstack if append_stacks_in_parallel: itpoints = itertools.product(range(npoints), range(nstack)) else: itpoints = itertools.product(range(nstack), range(npoints)) with h5py.File(filename, mode="w") as f: with dataset_writer.StackDatasetWriter(f, "data", **kwargs) as writer: for tpl in itpoints: if append_stacks_in_parallel: ipoint, istack = tpl else: istack, ipoint = tpl data = numpy.random.random((10, 20)) writer.add_point(data, istack) expected[istack].append(data) if sleep_time and (ipoint * nstack + istack) == isleep: time.sleep(sleep_time) with h5py.File(filename, mode="r") as f: data = f["data"][()] numpy.testing.assert_allclose(data, expected)
[docs] def test_concurrent_reader(tmpdir): npoints = 50 hdf5_filename = str(tmpdir / "test.h5") read_timestamps = str(tmpdir / "read_timestamps.log") write_timestamps = str(tmpdir / "write_timestamps.log") dataset_name = "data" # Run read loop in a sub-process start_event = multiprocessing.Event() reader_process = multiprocessing.Process( target=_read_hdf5_file, args=(hdf5_filename, dataset_name, npoints, read_timestamps, start_event), ) reader_process.start() assert start_event.wait(timeout=10) # Run write loop try: _write_hdf5_file(hdf5_filename, dataset_name, npoints, write_timestamps) except Exception: reader_process.terminate() raise # Wait for read loop to finish try: reader_process.join(timeout=10) if reader_process.exitcode is None: raise TimeoutError("Reader process did not terminate within the timeout.") elif reader_process.exitcode != 0: raise RuntimeError( f"Reader process terminated with exit code {reader_process.exitcode}." ) except Exception: reader_process.terminate() raise # Check that reading and writing happened concurrently with open(read_timestamps, "r") as eventfile: read_events = [int(line.strip()) for line in eventfile.readlines()] with open(write_timestamps, "r") as eventfile: write_events = [int(line.strip()) for line in eventfile.readlines()] assert len(read_events) == npoints assert len(write_events) == npoints all_events = sorted(read_events + write_events) assert ( read_events != all_events[: len(read_events)] ), "reads and writes are not interleaved"
class _DatasetWriter(dataset_writer.DatasetWriter): """Testing concurrent reading and writing always ends up being flaky. So we introduce a sleep after flushing to give the reader plenty of time to fetch new data. """ def _flush_hdf5(self) -> None: print("flush") super()._flush_hdf5() time.sleep(0.2) def _write_hdf5_file(hdf5_filename, dataset_name, npoints, event_filename): data = numpy.zeros((500, 500), dtype=numpy.int64) with open(event_filename, "a") as eventfile: with h5py.File(hdf5_filename, mode="w") as h5file: with _DatasetWriter(h5file, dataset_name) as writer: for i in range(npoints): writer.add_point(data) eventfile.write(f"{time.perf_counter_ns()}\n") print(f"write {i}") def _read_hdf5_file(hdf5_filename, dataset_name, npoints, event_filename, start_event): i = 0 with open(event_filename, "a") as eventfile: start_event.set() while i < npoints: for _ in _iter_data(hdf5_filename, dataset_name): eventfile.write(f"{time.perf_counter_ns()}\n") print(f"read {i}") i += 1 @h5py_utils.retry() def _iter_data(hdf5_filename, dataset_name, start_index=0): with h5py_utils.File(hdf5_filename) as h5file: for _ in h5file[dataset_name][start_index:], start_index: yield None