Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
471 changes: 373 additions & 98 deletions docs/non_tested_examples/example.ipynb

Large diffs are not rendered by default.

151 changes: 107 additions & 44 deletions libra_toolbox/neutron_detection/activation_foils/compass.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,40 @@ class Detector:
channel_nb: Channel number of the detector
live_count_time: Active measurement time excluding dead time (in seconds)
real_count_time: Total elapsed measurement time (in seconds)
spectrum: Cached energy spectrum (accessed via property)
bin_edges: Cached bin edges for the energy spectrum (accessed via property)
"""

events: NDArray[Tuple[float, float]] # type: ignore
channel_nb: int
live_count_time: Union[float, None]
real_count_time: Union[float, None]
_spectrum: Union[NDArray[np.float64], None] = None
_bin_edges: Union[NDArray[np.float64], None] = None

def __init__(self, channel_nb) -> None:
def __init__(self, channel_nb, nb_digitizer_bins=4096) -> None:
"""
Initialize a Detector object.
Args:
channel_nb: channel number of the detector
nb_digitizer_bins: number of digitizer bins for the detector.
"""
self.channel_nb = channel_nb
self.nb_digitizer_bins = nb_digitizer_bins
self.events = np.empty((0, 2)) # Initialize as empty 2D array with 2 columns
self.live_count_time = None
self.real_count_time = None

@property
def spectrum(self) -> Union[NDArray[np.float64], None]:
"""Get the cached energy spectrum. Read-only property."""
return getattr(self, "_spectrum", None)

@property
def bin_edges(self) -> Union[NDArray[np.float64], None]:
"""Get the cached bin edges for the energy spectrum. Read-only property."""
return getattr(self, "_bin_edges", None)

def get_energy_hist(
self, bins: Union[None, NDArray[np.float64], int, str] = None
) -> Tuple[np.ndarray, np.ndarray]:
Expand All @@ -67,6 +83,9 @@ def get_energy_hist(
Returns:
Tuple of histogram values and bin edges
"""
if self._spectrum is not None and self._bin_edges is not None:
# If spectrum and bin edges are already calculated, return them
return self._spectrum, self._bin_edges

energy_values = self.events[:, 1].copy()
time_values = self.events[:, 0].copy()
Expand All @@ -79,39 +98,51 @@ def get_energy_hist(
energy_values = np.nan_to_num(energy_values, nan=0)

if bins is None:
bins = np.arange(
int(np.nanmin(energy_values)), int(np.nanmax(energy_values))
)
if self.nb_digitizer_bins == None:
bins = np.arange(
int(np.nanmin(energy_values)), int(np.nanmax(energy_values)) + 1
)
else:
bins = np.arange(self.nb_digitizer_bins + 1)

return np.histogram(energy_values, bins=bins)

def get_energy_hist_background_substract(
self,
background_detector: "Detector",
bins: Union[NDArray[np.float64], None] = None,
live_or_real: str = "live",
) -> Tuple[np.ndarray, np.ndarray]:
ps_to_seconds = 1e-12
"""Get the energy histogram of the detector events with background subtraction.

Args:
background_detector: _description_
bins: _description_. Defaults to None.
live_or_real: When doing the background sub decide whether the background
histogram is scaled by live or real count time.
"""

assert (
self.channel_nb == background_detector.channel_nb
), f"Channel number mismatch: {self.channel_nb} != {background_detector.channel_nb}"

raw_hist, raw_bin_edges = self.get_energy_hist(bins=bins)
background_times = background_detector.events[:, 0].copy()
background_energies = background_detector.events[:, 1].copy()

if self.real_count_time < background_detector.real_count_time:
# get background counts for the duration of the sample count
end_ind = np.nanargmin(
np.abs(
self.real_count_time / ps_to_seconds
- (background_times - background_times[0])
)
)
b_hist, _ = np.histogram(
background_energies[: end_ind + 1],
bins=raw_bin_edges,
b_hist, _ = background_detector.get_energy_hist(bins=raw_bin_edges)

if live_or_real == "live":
# Scale background histogram by live count time
b_hist = b_hist * (
self.live_count_time / background_detector.live_count_time
)
else:
b_hist, _ = np.histogram(background_energies, bins=raw_bin_edges)
elif live_or_real == "real":
# Scale background histogram by real count time
b_hist = b_hist * (
self.real_count_time / background_detector.real_count_time
)
else:
raise ValueError(
f"Invalid live_or_real value: {live_or_real}. Use 'live' or 'real'."
)

hist_background_substracted = raw_hist - b_hist

Expand Down Expand Up @@ -220,7 +251,7 @@ def from_directory(

return measurement_object

def to_h5(self, filename: str, mode: str = "w") -> None:
def to_h5(self, filename: str, mode: str = "w", spectrum_only=False) -> None:
"""
Save the measurement data to an HDF5 file.
Args:
Expand All @@ -243,37 +274,52 @@ def to_h5(self, filename: str, mode: str = "w") -> None:

# Store detectors
for detector in self.detectors:
detector_group = measurement_group.create_group(f"detector_{detector.channel_nb}")
detector_group.create_dataset("events", data=detector.events)
detector_group = measurement_group.create_group(
f"detector_{detector.channel_nb}"
)
if spectrum_only:
hist, bin_edges = detector.get_energy_hist(bins=None)
detector_group.create_dataset("spectrum", data=hist)
detector_group.create_dataset("bin_edges", data=bin_edges)
detector_group.create_dataset("events", data=[])
else:
detector_group.create_dataset("events", data=detector.events)

detector_group.attrs["live_count_time"] = detector.live_count_time
detector_group.attrs["real_count_time"] = detector.real_count_time

@classmethod
def from_h5(cls, filename: str, measurement_name: str = None) -> Union["Measurement", List["Measurement"]]:
def from_h5(
cls, filename: str, measurement_name: str = None
) -> Union["Measurement", List["Measurement"]]:
"""
Load measurement data from an HDF5 file.
Args:
filename: name of the HDF5 file
measurement_name: specific measurement name to load. If None, loads all measurements.
Returns:
Single Measurement object if measurement_name is specified,
Single Measurement object if measurement_name is specified,
or list of Measurement objects if loading all measurements.
"""
measurements = []

with h5py.File(filename, "r") as f:
# Get all measurement group names
measurement_names = [name for name in f.keys() if isinstance(f[name], h5py.Group)]

measurement_names = [
name for name in f.keys() if isinstance(f[name], h5py.Group)
]

if measurement_name is not None:
if measurement_name not in measurement_names:
raise ValueError(f"Measurement '{measurement_name}' not found in file. Available: {measurement_names}")
raise ValueError(
f"Measurement '{measurement_name}' not found in file. Available: {measurement_names}"
)
measurement_names = [measurement_name]

for name in measurement_names:
measurement = cls(name=name)
measurement_group = f[name]

# Load start and stop time
if "start_time" in measurement_group.attrs:
measurement.start_time = datetime.datetime.fromisoformat(
Expand All @@ -283,28 +329,39 @@ def from_h5(cls, filename: str, measurement_name: str = None) -> Union["Measurem
measurement.stop_time = datetime.datetime.fromisoformat(
measurement_group.attrs["stop_time"]
)

# Load detectors
detectors = []
for detector_name in measurement_group.keys():
if detector_name.startswith("detector_"):
channel_nb = int(detector_name.replace("detector_", ""))
detector = Detector(channel_nb=channel_nb)

detector_group = measurement_group[detector_name]
detector.events = detector_group["events"][:]
detector.live_count_time = detector_group.attrs["live_count_time"]
detector.real_count_time = detector_group.attrs["real_count_time"]

detector.live_count_time = detector_group.attrs[
"live_count_time"
]
detector.real_count_time = detector_group.attrs[
"real_count_time"
]

if "spectrum" in detector_group:
detector._spectrum = detector_group["spectrum"][:]
if "bin_edges" in detector_group:
detector._bin_edges = detector_group["bin_edges"][:]

detectors.append(detector)

measurement.detectors = detectors
measurements.append(measurement)

return measurements[0] if measurement_name is not None else measurements

@classmethod
def write_multiple_to_h5(cls, measurements: List["Measurement"], filename: str) -> None:
def write_multiple_to_h5(
cls, measurements: List["Measurement"], filename: str
) -> None:
"""
Save multiple measurement objects to a single HDF5 file.
Args:
Expand All @@ -318,13 +375,19 @@ def write_multiple_to_h5(cls, measurements: List["Measurement"], filename: str)

# Store start and stop time
if measurement.start_time:
measurement_group.attrs["start_time"] = measurement.start_time.isoformat()
measurement_group.attrs["start_time"] = (
measurement.start_time.isoformat()
)
if measurement.stop_time:
measurement_group.attrs["stop_time"] = measurement.stop_time.isoformat()
measurement_group.attrs["stop_time"] = (
measurement.stop_time.isoformat()
)

# Store detectors
for detector in measurement.detectors:
detector_group = measurement_group.create_group(f"detector_{detector.channel_nb}")
detector_group = measurement_group.create_group(
f"detector_{detector.channel_nb}"
)
detector_group.create_dataset("events", data=detector.events)
detector_group.attrs["live_count_time"] = detector.live_count_time
detector_group.attrs["real_count_time"] = detector.real_count_time
Expand Down Expand Up @@ -653,7 +716,7 @@ def get_calibration_data(
background_detector = [
detector
for detector in background_measurement.detectors
if detector.channel_nb == detector.channel_nb
if detector.channel_nb == channel_nb
][0]

calibration_energies = []
Expand Down
Loading
Loading