From b3e0f558dcdaa411a194391a27fd2789b7440bcc Mon Sep 17 00:00:00 2001 From: Sam Welborn Date: Fri, 31 Jan 2025 14:03:13 -0500 Subject: [PATCH 1/6] cosmic streams loader --- ptypy/experiment/cosmicstream.py | 510 +++++++++++++++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 ptypy/experiment/cosmicstream.py diff --git a/ptypy/experiment/cosmicstream.py b/ptypy/experiment/cosmicstream.py new file mode 100644 index 000000000..fcfcabe51 --- /dev/null +++ b/ptypy/experiment/cosmicstream.py @@ -0,0 +1,510 @@ +from __future__ import annotations + +import threading +from typing import Any, Callable, Generator, List, Optional + +import numpy as np +from cosmicstreams.PtychocamStream import PtychocamStream as PtychoStream +from numpy.typing import NDArray +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from ptypy import utils as u +from ptypy.core.data import PtyScan +from ptypy.experiment import register +from ptypy.utils.verbose import log + +# Instructions: + +#### +# Install requirements... +#### + +# - Download ptypy +""" +git clone https://github.com/swelborn/ptypy.git +""" + +# - Install ptypy requirements (_full.yml to be safe) +""" +mamba env create -f dependencies_full.yml +conda activate ptypy_full +""" + +# - Copy this into a requirements.txt file. + +""" +pydantic +matplotlib==3.7 +""" + +# - pip install it +""" +pip install -r requirements.txt +""" + +# - install cosmicstreams +""" +git clone https://github.com/silvioachilles/cosmicstreams.git +pip install -e cosmicstreams/ +""" + +#### +# Run... +#### + +""" +❯ python +Python 3.11.9 | packaged by conda-forge | (main, Apr 19 2024, 18:36:13) [GCC 12.3.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +>>> from ptypy.experiment.cosmicstream import CosmicStreamLoader +>>> loader = CosmicStreamLoader() +>>> loader.initialize() +Listening on localhost:37013 for topic: b'start' +Listening on localhost:37013 for topic: b'frame' +Listening on localhost:37013 for topic: b'stop' +Listening on localhost:37013 for topic: b'abort' +Waiting for metadata... +""" + + +class Server(BaseModel): + ip: str + commandPort: int + dataPort: int + dataDir: str + filePrefix: str + + +class ZonePlate(BaseModel): + diameter: float + outerZone: float + A0: float + A1: float + type: str + source: str + + +class Monitor(BaseModel): + dwell: int + + +class Ptychography(BaseModel): + position_jitter: float + + +class Zmq(BaseModel): + connect: bool + address: str + + +class Geometry(BaseModel): + distance: float + psize: float + shape: int + resolution: float + rebin: int + basis_vectors: List[List[float]] + + +class Region(BaseModel): + xStart: float + xStop: float + xPoints: int + yStart: float + yStop: float + yPoints: int + xStep: float + yStep: float + xRange: float + yRange: float + xCenter: float + yCenter: float + zStart: float + zStop: float + zPoints: int + zStep: int + zRange: int + zCenter: float + + +class ScanRegions(BaseModel): + Region1: Region + + +class EnergyRegion(BaseModel): + dwell: float + start: float + stop: float + step: float + nEnergies: int + + +class EnergyRegions(BaseModel): + EnergyRegion1: EnergyRegion + + +class Image(BaseModel): + type: str + proposal: str + experimenters: str + sample: str + x: str + y: str + defocus: bool + mode: str + scanRegions: ScanRegions + energyRegions: EnergyRegions + energy: str + doubleExposure: bool + + +class PtychographyImage(BaseModel): + type: str + proposal: str + experimenters: str + sample: str + x: str + y: str + defocus: bool + mode: str + scanRegions: ScanRegions + energyRegions: EnergyRegions + energy: str + doubleExposure: bool + + +class Focus(BaseModel): + type: str + proposal: str + experimenters: str + sample: str + x: str + y: str + defocus: bool + mode: str + scanRegions: ScanRegions + energyRegions: EnergyRegions + z: str + doubleExposure: bool + + +class LineSpectrum(BaseModel): + type: str + proposal: str + experimenters: str + sample: str + x: str + y: str + defocus: bool + mode: str + scanRegions: ScanRegions + energyRegions: EnergyRegions + energy: str + doubleExposure: bool + + +class LastScan(BaseModel): + Image: Image + Ptychography_Image: PtychographyImage = Field(..., alias="Ptychography Image") + Focus: Focus + Line_Spectrum: LineSpectrum = Field(..., alias="Line Spectrum") + + +class CosmicMeta(BaseModel): + header: str + server: Server + zonePlate: ZonePlate + monitor: Monitor + ptychography: Ptychography + zmq: Zmq + geometry: Geometry + lastScan: LastScan + repetition: int + isDoubleExp: int + pos_x: float + pos_y: float + step_size_x: float + step_size_y: float + num_pixels_x: int + num_pixels_y: int + background_pixels_x: int + background_pixels_y: int + dwell1: float + dwell2: float + energy: float + energyIndex: int + scanRegion: int + dark_num_x: int + dark_num_y: int + exp_num_x: int + exp_num_y: int + exp_step_x: float + exp_step_y: float + double_exposure: bool + exp_num_total: int + translations: List[List[float]] + output_frame_width: int + detector_distance: float + x_pixel_size: float + y_pixel_size: float + identifier: str + illumination_real: List[List[float]] + illumination_imag: List[List[float]] + illumination_mask: List[List[bool]] + dp_fraction_for_illumination_init: float + dtype: Optional[str] = "float32" + ptycho_shape: tuple[int, int, int] = (0, 0, 0) + + # For devtools pprint function. + def __pretty__( + self, fmt: Callable[[Any], Any], **kwargs: Any + ) -> Generator[Any, None, None]: + """Custom pretty print to exclude 2D lists.""" + yield self.__repr_name__() + "(" + yield 1 # indentation level + for name, value in self.__repr_args__(): + if isinstance(value, list) and all(isinstance(sub, list) for sub in value): + if name is not None: + yield name + "=" + # Yield the shape of the 2D list (i.e., number of rows and columns) + yield f"List[List], shape=({len(value)}, {len(value[0]) if len(value) > 0 else 0})" + yield "," + yield 0 + continue + if name is not None: + yield name + "=" + yield fmt(value) + yield "," + yield 0 + yield -1 + yield ")" + + @model_validator(mode="after") + def set_ptycho_shape(self): + total_acquisitions = self.exp_num_x * self.exp_num_y + framewidth = self.output_frame_width + self.ptycho_shape = (total_acquisitions, framewidth, framewidth) + return self + + +class CosmicFrameMeta(BaseModel): + shape_y: int + shape_x: int + dtype: str + byteorder: str + order: str + identifier: str + index: int + posy: float + posx: float + + +class CosmicAbortMeta(BaseModel): + identifier: str + + +class PtyScanDefaults(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + name: str = "PtyScan" + dfile: Optional[str] = None + chunk_format: str = ".chunk%02d" + save: Optional[str] = None + auto_center: Optional[bool] = None + load_parallel: str = "data" + rebin: Optional[int] = None + orientation: Optional[int | tuple | list] = None + min_frames: int = 1 + positions_theory: Optional[NDArray] = None + num_frames: Optional[int] = None + label: Optional[str] = None + experimentID: Optional[str] = None + version: float = 0.1 + shape: int | tuple = 256 + center: list | str | tuple = "fftshift" + psize: float | tuple = 0.000172 + distance: float = 7.19 + energy: float = 7.2 + add_poisson_noise: bool = False + + +class CosmicStreamLoaderParams(PtyScanDefaults): + name: str = "CosmicStreamLoader" + host_start: str = "localhost" + + +class CosmicStreamLoaderMeta(BaseModel): + version: float + num_frames: int + label: Optional[str] + shape: int | tuple + psize: float | tuple + energy: float + center: list | str | tuple + distance: float + + +@register() +class CosmicStreamLoader(PtyScan): + """ + Defaults: + + [name] + default = 'CosmicStreamLoader' + type = str + help = + + [host_start] + default = 'localhost' + type = str + help = Hostname for the publisher that this object will connect to + """ + + def __init__(self, pars: Optional[CosmicStreamLoaderParams] = None, **kwargs): + super().__init__(pars, **kwargs) + self.p = CosmicStreamLoaderParams(**self.info) + self._meta: Optional[CosmicStreamLoaderMeta] = None + self.framecount: int = 0 + self.num_frames: int = 0 + self.thread: Optional[threading.Thread] = None + + def initialize(self): + self.stream = PtychoStream(host_start=self.p.host_start) + self.metadata: Optional[CosmicMeta] = None + print("Waiting for metadata...") + while True: + if self.stream.has_scan_started(): + meta_msg = self.stream.recv_start() + self.metadata = CosmicMeta(**meta_msg) + break + else: + continue + print("Metadata received.") + + def setup_params(meta: CosmicMeta): + # TODO: change this to None, or dfile. for testing we can use uuid and saving + self.p.dfile = None + self.p.save = None + self.p.auto_center = None + self.p.load_parallel = "data" # ? + self.p.rebin = meta.geometry.rebin + + # TODO: do we need to invert/transpose this data? + self.p.orientation = None + self.p.min_frames = 1 + self.p.positions_theory = None + self.p.num_frames = meta.exp_num_total // (meta.double_exposure + 1) + self.p.experimentID = meta.identifier + + # TODO: this is the frame shape + self.p.shape = meta.output_frame_width + + # TODO: i am not sure we have this, leave as default... + # self.p.center = meta.geometry.center + + # TODO: this is 2.99e-11, not sure if it is right... + self.p.psize = meta.geometry.psize + + # TODO: this is correct, but dunno units (0.000121, for ex.) + self.p.distance = meta.geometry.distance + + # TODO: this is currently in J + # self.p.energy = meta.energy / constants.e / 1000 + self.p.energy = meta.energy + + # TODO: not sure about this... + self.p.add_poisson_noise = False + + def setup_meta(params: CosmicStreamLoaderParams): + self._meta = CosmicStreamLoaderMeta(**params.model_dump()) + self.meta = u.Param(self._meta.model_dump()) + + def setup_info(params: CosmicStreamLoaderParams): + self.info.num_frames = params.num_frames + self.info.experimentID = params.experimentID + self.info.shape = params.shape + self.info.distance = params.distance + self.info.energy = params.energy + self.info.dfile = params.dfile + self.info.psize = params.psize + + setup_params(self.metadata) + setup_meta(self.p) + setup_info(self.p) + + self.orientation = self.p.orientation # TODO: fix when we know orientation + self.num_frames = self.p.num_frames if self.p.num_frames is not None else 0 + + self._data = np.empty( + shape=self.metadata.ptycho_shape, dtype=self.metadata.dtype + ) + self._pos = np.empty(shape=(self.metadata.ptycho_shape[0], 2), dtype=float) + + print("Starting receiver thread...") + self.thread = threading.Thread(target=self.receive_messages, daemon=True) + self.thread.start() + + def receive_messages(self): + while True: + if self.stream.has_frame_arrived(): + i, frame, idx, posy, posx, _ = self.stream.recv_frame() + if idx < self._data.shape[0] and i == self.metadata.identifier: # type: ignore + self._data[idx] = frame + self._pos[idx] = [posy, posx] + print(f"Received frame {idx}...") + self.framecount += 1 + else: + raise ValueError( + f"Received frame with wrong index {idx} or identifier {i}" + ) + elif self.stream.has_scan_aborted(): + print("Scan has been aborted") + print("Receiving abort metadata...") + self.stream.recv_abort() + # TODO: boolean for stop condition + elif self.stream.has_scan_stopped(): + print("Scan has stopped") + print("Receiving stop metadata...") + self.stream.recv_stop() + # TODO: boolean for stop condition + + def check(self, frames=None, start=None): + end_of_scan: bool = False # TODO: set stop condition from recv thread + frames_accessible: int = 0 + + if start is None: + start = self.framestart + + if frames is None: + frames = self.min_frames + + # Check how many frames are available + new_frames = self.framecount - start + # not reached expected nr. of frames + if new_frames <= frames: + # but its last chunk of scan so load it anyway + if self.framecount == self.num_frames: + frames_accessible = new_frames + end_of_scan = True + # otherwise, do nothing + else: + frames_accessible = 0 + end_of_scan = False + # reached expected nr. of frames + else: + frames_accessible = frames + end_of_scan = False + + return frames_accessible, end_of_scan + + def load(self, indices): + intensities = {} + positions = {} + weights = {} + log(4, "Loading...") + log(4, f"indices = {indices}") + for ind in indices: + intensities[ind] = self._data[ind] + positions[ind] = self._pos[ind] + weights[ind] = np.ones(len(intensities[ind])) + + return intensities, positions, weights \ No newline at end of file From e519d8cf52cafec114dcd07186ce82fffa7e720b Mon Sep 17 00:00:00 2001 From: Bjoern Enders Date: Mon, 3 Feb 2025 18:27:47 -0800 Subject: [PATCH 2/6] Made compatible with ptypy internals and adjusted geometry variables --- ptypy/experiment/cosmicstream.py | 153 +++++++++---------------------- 1 file changed, 44 insertions(+), 109 deletions(-) diff --git a/ptypy/experiment/cosmicstream.py b/ptypy/experiment/cosmicstream.py index fcfcabe51..683e82107 100644 --- a/ptypy/experiment/cosmicstream.py +++ b/ptypy/experiment/cosmicstream.py @@ -303,46 +303,6 @@ class CosmicAbortMeta(BaseModel): identifier: str -class PtyScanDefaults(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True) - name: str = "PtyScan" - dfile: Optional[str] = None - chunk_format: str = ".chunk%02d" - save: Optional[str] = None - auto_center: Optional[bool] = None - load_parallel: str = "data" - rebin: Optional[int] = None - orientation: Optional[int | tuple | list] = None - min_frames: int = 1 - positions_theory: Optional[NDArray] = None - num_frames: Optional[int] = None - label: Optional[str] = None - experimentID: Optional[str] = None - version: float = 0.1 - shape: int | tuple = 256 - center: list | str | tuple = "fftshift" - psize: float | tuple = 0.000172 - distance: float = 7.19 - energy: float = 7.2 - add_poisson_noise: bool = False - - -class CosmicStreamLoaderParams(PtyScanDefaults): - name: str = "CosmicStreamLoader" - host_start: str = "localhost" - - -class CosmicStreamLoaderMeta(BaseModel): - version: float - num_frames: int - label: Optional[str] - shape: int | tuple - psize: float | tuple - energy: float - center: list | str | tuple - distance: float - - @register() class CosmicStreamLoader(PtyScan): """ @@ -353,22 +313,39 @@ class CosmicStreamLoader(PtyScan): type = str help = - [host_start] + [source] default = 'localhost' type = str help = Hostname for the publisher that this object will connect to + + [orientation] + default = 3 + + [load_parallel] + default = None + + [psize] + default = None + + [energy] + default = None + + [distance] + default = None + + [shape] + default = None + """ - def __init__(self, pars: Optional[CosmicStreamLoaderParams] = None, **kwargs): + def __init__(self, pars, **kwargs): super().__init__(pars, **kwargs) - self.p = CosmicStreamLoaderParams(**self.info) - self._meta: Optional[CosmicStreamLoaderMeta] = None self.framecount: int = 0 - self.num_frames: int = 0 self.thread: Optional[threading.Thread] = None + self._thread_end_of_scan: bool = False def initialize(self): - self.stream = PtychoStream(host_start=self.p.host_start) + self.stream = PtychoStream(host_start=self.info.source) self.metadata: Optional[CosmicMeta] = None print("Waiting for metadata...") while True: @@ -380,70 +357,26 @@ def initialize(self): continue print("Metadata received.") - def setup_params(meta: CosmicMeta): - # TODO: change this to None, or dfile. for testing we can use uuid and saving - self.p.dfile = None - self.p.save = None - self.p.auto_center = None - self.p.load_parallel = "data" # ? - self.p.rebin = meta.geometry.rebin - - # TODO: do we need to invert/transpose this data? - self.p.orientation = None - self.p.min_frames = 1 - self.p.positions_theory = None - self.p.num_frames = meta.exp_num_total // (meta.double_exposure + 1) - self.p.experimentID = meta.identifier - - # TODO: this is the frame shape - self.p.shape = meta.output_frame_width - - # TODO: i am not sure we have this, leave as default... - # self.p.center = meta.geometry.center - - # TODO: this is 2.99e-11, not sure if it is right... - self.p.psize = meta.geometry.psize - - # TODO: this is correct, but dunno units (0.000121, for ex.) - self.p.distance = meta.geometry.distance - - # TODO: this is currently in J - # self.p.energy = meta.energy / constants.e / 1000 - self.p.energy = meta.energy - - # TODO: not sure about this... - self.p.add_poisson_noise = False - - def setup_meta(params: CosmicStreamLoaderParams): - self._meta = CosmicStreamLoaderMeta(**params.model_dump()) - self.meta = u.Param(self._meta.model_dump()) - - def setup_info(params: CosmicStreamLoaderParams): - self.info.num_frames = params.num_frames - self.info.experimentID = params.experimentID - self.info.shape = params.shape - self.info.distance = params.distance - self.info.energy = params.energy - self.info.dfile = params.dfile - self.info.psize = params.psize - - setup_params(self.metadata) - setup_meta(self.p) - setup_info(self.p) - - self.orientation = self.p.orientation # TODO: fix when we know orientation - self.num_frames = self.p.num_frames if self.p.num_frames is not None else 0 + md = self.metadata + self.num_frames = md.exp_num_total // (md.double_exposure + 1) + self.meta.energy = md.energy * 6.2425e15 + self.meta.psize = md.geometry.psize * 1e6 + self.info.psize = md.geometry.psize * 1e6 # need to set info.psize in case there is a rebining in PtyScan + self.meta.distance = md.geometry.distance * 1e3 self._data = np.empty( shape=self.metadata.ptycho_shape, dtype=self.metadata.dtype ) self._pos = np.empty(shape=(self.metadata.ptycho_shape[0], 2), dtype=float) + super().initialize() + print("Starting receiver thread...") self.thread = threading.Thread(target=self.receive_messages, daemon=True) self.thread.start() def receive_messages(self): + # TODO: Thread needs to terminate when scan has stopped. while True: if self.stream.has_frame_arrived(): i, frame, idx, posy, posx, _ = self.stream.recv_frame() @@ -460,15 +393,16 @@ def receive_messages(self): print("Scan has been aborted") print("Receiving abort metadata...") self.stream.recv_abort() - # TODO: boolean for stop condition + self._thread_end_of_scan = True + elif self.stream.has_scan_stopped(): print("Scan has stopped") print("Receiving stop metadata...") self.stream.recv_stop() - # TODO: boolean for stop condition + self._thread_end_of_scan = True def check(self, frames=None, start=None): - end_of_scan: bool = False # TODO: set stop condition from recv thread + end_of_scan: int = 0 # could also be None if there was a condition that this streaming implementation wouldn't know frames_accessible: int = 0 if start is None: @@ -482,17 +416,17 @@ def check(self, frames=None, start=None): # not reached expected nr. of frames if new_frames <= frames: # but its last chunk of scan so load it anyway - if self.framecount == self.num_frames: + if self.framecount == self.num_frames or self._thread_end_of_scan: frames_accessible = new_frames - end_of_scan = True + end_of_scan = 1 # otherwise, do nothing else: - frames_accessible = 0 - end_of_scan = False + frames_accessible = new_frames + end_of_scan = 0 # reached expected nr. of frames else: frames_accessible = frames - end_of_scan = False + end_of_scan = 0 return frames_accessible, end_of_scan @@ -505,6 +439,7 @@ def load(self, indices): for ind in indices: intensities[ind] = self._data[ind] positions[ind] = self._pos[ind] - weights[ind] = np.ones(len(intensities[ind])) + weights[ind] = np.ones_like(self._data[ind]) + + return intensities, positions, weights - return intensities, positions, weights \ No newline at end of file From 6a276cdde4b7610235fcb98b5e0cfbb32749442f Mon Sep 17 00:00:00 2001 From: Bjoern Enders Date: Mon, 3 Feb 2025 18:36:31 -0800 Subject: [PATCH 3/6] Added templates to work with CosmicStreamLoader --- .../als_cosmic/cosmic_stream_to_ptyd.py | 22 ++++++++ .../als_cosmic/ptypy_cosmic_load_ptyd.py | 46 ++++++++++++++++ .../als_cosmic/ptypy_cosmic_stream_test.py | 53 +++++++++++++++++++ 3 files changed, 121 insertions(+) create mode 100644 templates/experiment/als_cosmic/cosmic_stream_to_ptyd.py create mode 100644 templates/experiment/als_cosmic/ptypy_cosmic_load_ptyd.py create mode 100644 templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py diff --git a/templates/experiment/als_cosmic/cosmic_stream_to_ptyd.py b/templates/experiment/als_cosmic/cosmic_stream_to_ptyd.py new file mode 100644 index 000000000..0c5748fa9 --- /dev/null +++ b/templates/experiment/als_cosmic/cosmic_stream_to_ptyd.py @@ -0,0 +1,22 @@ +from ptypy.experiment.cosmicstream import CosmicStreamLoader +import time +from ptypy import utils as u +u.verbose.set_level(4) +p = u.Param(dfile="test.ptyd", + save="append", + min_frames=50, + rebin=2, + shape=512, + center=None, + auto_center=True) +loader = CosmicStreamLoader(p) +loader.initialize() +while True: + msg = loader.auto(100) + if msg == loader.WAIT: + time.sleep(2) + elif msg == loader.EOS: + print("end") + break + else: + print([it['index'] for it in msg['iterable']]) \ No newline at end of file diff --git a/templates/experiment/als_cosmic/ptypy_cosmic_load_ptyd.py b/templates/experiment/als_cosmic/ptypy_cosmic_load_ptyd.py new file mode 100644 index 000000000..9669e9bda --- /dev/null +++ b/templates/experiment/als_cosmic/ptypy_cosmic_load_ptyd.py @@ -0,0 +1,46 @@ +""" +This script is a test for ptychographic reconstruction after an +experiment has been carried out and the data is available in ptypy's +data file format in the current directory as "sample.ptyd". Use together +with `ptypy_make_sample_ptyd.py`. +""" +from ptypy.core import Ptycho +from ptypy import utils as u + +import tempfile +tmpdir = tempfile.gettempdir() + +p = u.Param() +p.verbose_level = "info" +p.io = u.Param() +p.io.home = "/".join([tmpdir, "ptypy"]) + +p.scans = u.Param() +p.scans.MF = u.Param() +p.scans.MF.data= u.Param() +p.scans.MF.name = 'BlockFull' +p.scans.MF.data.name = 'PtydScan' +p.scans.MF.data.source = 'file' +p.scans.MF.data.dfile = 'test.ptyd' + +p.scans.MF.illumination = u.Param() +p.scans.MF.illumination.aperture = u.Param() +p.scans.MF.illumination.aperture.diffuser = None +p.scans.MF.illumination.aperture.form = "circ" +p.scans.MF.illumination.aperture.size = 0.0001 +p.scans.MF.illumination.aperture.central_stop = .2 +p.scans.MF.illumination.propagation = u.Param() +p.scans.MF.illumination.propagation.focussed = 0.005 +p.scans.MF.illumination.propagation.parallel = 0.00005 +p.scans.MF.illumination.propagation.spot_size = None + +p.engines = u.Param() +p.engines.engine00 = u.Param() +p.engines.engine00.name = 'DM' +p.engines.engine00.numiter = 200 +p.engines.engine01 = u.Param() +p.engines.engine01.name = 'ML' +p.engines.engine01.numiter = 50 + +if __name__ == "__main__": + P = Ptycho(p,level=5) diff --git a/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py b/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py new file mode 100644 index 000000000..5d66e3208 --- /dev/null +++ b/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py @@ -0,0 +1,53 @@ +""" +This script is a test for ptychographic reconstruction after an +experiment has been carried out and the data is available in ptypy's +data file format in the current directory as "sample.ptyd". Use together +with `ptypy_make_sample_ptyd.py`. +""" +from ptypy.core import Ptycho +from ptypy import utils as u +from ptypy.experiment.cosmicstream import CosmicStreamLoader + +import tempfile +tmpdir = tempfile.gettempdir() + +p = u.Param() +p.verbose_level = "info" +p.io = u.Param() +p.io.home = "/".join([tmpdir, "ptypy"]) + +p.scans = u.Param() +p.scans.MF = u.Param() +p.scans.MF.data= u.Param() +p.scans.MF.name = 'BlockFull' +p.scans.MF.data.name = 'CosmicStreamLoader' +p.scans.MF.data.source = 'localhost' +p.scans.MF.data.dfile = None +p.scans.MF.data.save = None +p.scans.MF.data.min_frames=50 +p.scans.MF.data.rebin=2 +p.scans.MF.data.shape=512 +p.scans.MF.data.center=None +p.scans.MF.data.auto_center=True + +p.scans.MF.illumination = u.Param() +p.scans.MF.illumination.aperture = u.Param() +p.scans.MF.illumination.aperture.diffuser = None +p.scans.MF.illumination.aperture.form = "circ" +p.scans.MF.illumination.aperture.size = 0.0001 +p.scans.MF.illumination.aperture.central_stop = .2 +p.scans.MF.illumination.propagation = u.Param() +p.scans.MF.illumination.propagation.focussed = 0.005 +p.scans.MF.illumination.propagation.parallel = -0.00005 +p.scans.MF.illumination.propagation.spot_size = None + +p.engines = u.Param() +p.engines.engine00 = u.Param() +p.engines.engine00.name = 'DM' +p.engines.engine00.numiter = 200 +#p.engines.engine01 = u.Param() +#p.engines.engine01.name = 'ML' +#p.engines.engine01.numiter = 20 + +if __name__ == "__main__": + P = Ptycho(p,level=5) From cc0b0ebf922e156470f0889f3037bbd57311f8ac Mon Sep 17 00:00:00 2001 From: Sam Welborn Date: Thu, 6 Feb 2025 10:07:46 -0500 Subject: [PATCH 4/6] adjustments to streaming parameters --- .../experiment/als_cosmic/ptypy_cosmic_stream_test.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py b/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py index 5d66e3208..76dedac6c 100644 --- a/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py +++ b/templates/experiment/als_cosmic/ptypy_cosmic_stream_test.py @@ -12,7 +12,7 @@ tmpdir = tempfile.gettempdir() p = u.Param() -p.verbose_level = "info" +p.verbose_level = "error" p.io = u.Param() p.io.home = "/".join([tmpdir, "ptypy"]) @@ -24,7 +24,7 @@ p.scans.MF.data.source = 'localhost' p.scans.MF.data.dfile = None p.scans.MF.data.save = None -p.scans.MF.data.min_frames=50 +p.scans.MF.data.min_frames=10 p.scans.MF.data.rebin=2 p.scans.MF.data.shape=512 p.scans.MF.data.center=None @@ -38,13 +38,14 @@ p.scans.MF.illumination.aperture.central_stop = .2 p.scans.MF.illumination.propagation = u.Param() p.scans.MF.illumination.propagation.focussed = 0.005 -p.scans.MF.illumination.propagation.parallel = -0.00005 +p.scans.MF.illumination.propagation.parallel = 0.00005 p.scans.MF.illumination.propagation.spot_size = None p.engines = u.Param() p.engines.engine00 = u.Param() p.engines.engine00.name = 'DM' p.engines.engine00.numiter = 200 +p.engines.engine00.probe_fourier_support = 0.06 #p.engines.engine01 = u.Param() #p.engines.engine01.name = 'ML' #p.engines.engine01.numiter = 20 From 69313a3b10cf2be50d41055d1b1118c8f23b7f5c Mon Sep 17 00:00:00 2001 From: Sam Welborn Date: Thu, 6 Feb 2025 10:08:20 -0500 Subject: [PATCH 5/6] run stream in master thread, load common --- ptypy/experiment/cosmicstream.py | 62 ++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/ptypy/experiment/cosmicstream.py b/ptypy/experiment/cosmicstream.py index 683e82107..416b85b02 100644 --- a/ptypy/experiment/cosmicstream.py +++ b/ptypy/experiment/cosmicstream.py @@ -11,6 +11,7 @@ from ptypy import utils as u from ptypy.core.data import PtyScan from ptypy.experiment import register +from ptypy.utils import parallel from ptypy.utils.verbose import log # Instructions: @@ -345,35 +346,42 @@ def __init__(self, pars, **kwargs): self._thread_end_of_scan: bool = False def initialize(self): - self.stream = PtychoStream(host_start=self.info.source) self.metadata: Optional[CosmicMeta] = None - print("Waiting for metadata...") - while True: - if self.stream.has_scan_started(): - meta_msg = self.stream.recv_start() - self.metadata = CosmicMeta(**meta_msg) - break - else: - continue - print("Metadata received.") - - md = self.metadata - self.num_frames = md.exp_num_total // (md.double_exposure + 1) - self.meta.energy = md.energy * 6.2425e15 - self.meta.psize = md.geometry.psize * 1e6 - self.info.psize = md.geometry.psize * 1e6 # need to set info.psize in case there is a rebining in PtyScan - self.meta.distance = md.geometry.distance * 1e3 - - self._data = np.empty( - shape=self.metadata.ptycho_shape, dtype=self.metadata.dtype - ) - self._pos = np.empty(shape=(self.metadata.ptycho_shape[0], 2), dtype=float) - + if parallel.master: + self.stream = PtychoStream(host_start=self.info.source) + print("Waiting for metadata...") + while True: + if self.stream.has_scan_started(): + meta_msg = self.stream.recv_start() + self.metadata = CosmicMeta(**meta_msg) + break + else: + continue + print("Metadata received.") + + md = self.metadata + self.num_frames = md.exp_num_total // (md.double_exposure + 1) + self.meta.energy = md.energy * 6.2425e15 + self.meta.psize = md.geometry.psize * 1e6 + self.info.psize = md.geometry.psize * 1e6 # need to set info.psize in case there is a rebining in PtyScan + self.meta.distance = md.geometry.distance * 1e3 + + self._data = np.empty( + shape=self.metadata.ptycho_shape, dtype=self.metadata.dtype + ) + self._pos = np.empty(shape=(self.metadata.ptycho_shape[0], 2), dtype=float) + print("Starting receiver thread...") + self.thread = threading.Thread(target=self.receive_messages, daemon=True) + self.thread.start() + super().initialize() - - print("Starting receiver thread...") - self.thread = threading.Thread(target=self.receive_messages, daemon=True) - self.thread.start() + self.meta.energy = self.common["energy"] + self.meta.psize = self.common["psize"] + self.info.psize = self.common["psize"] + self.meta.distance = self.common["distance"] + + def load_common(self): + return self.meta def receive_messages(self): # TODO: Thread needs to terminate when scan has stopped. From 0ca0fb515cd8a760a570221826544f3671df1e40 Mon Sep 17 00:00:00 2001 From: Sam Welborn Date: Thu, 6 Feb 2025 10:09:00 -0500 Subject: [PATCH 6/6] don't repeat frames_accessible assignment --- ptypy/experiment/cosmicstream.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ptypy/experiment/cosmicstream.py b/ptypy/experiment/cosmicstream.py index 416b85b02..2a550e342 100644 --- a/ptypy/experiment/cosmicstream.py +++ b/ptypy/experiment/cosmicstream.py @@ -423,13 +423,11 @@ def check(self, frames=None, start=None): new_frames = self.framecount - start # not reached expected nr. of frames if new_frames <= frames: + frames_accessible = new_frames # but its last chunk of scan so load it anyway if self.framecount == self.num_frames or self._thread_end_of_scan: - frames_accessible = new_frames end_of_scan = 1 - # otherwise, do nothing - else: - frames_accessible = new_frames + else: # otherwise, do nothing end_of_scan = 0 # reached expected nr. of frames else: