import copy
import csv
import logging
import math
import os
from collections import defaultdict
from typing import Any
import numpy as np
from pydantic import BaseModel, Field
from scipy.optimize import curve_fit
from usansred.io.read import cast_to_bool, is_csv, read_config
from usansred.model import IQData, MonitorData, XYData
from usansred.summary import generate_report
# separate logging in file and console
logging.basicConfig(filename="file.log", filemode="w", level=logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger("").addHandler(console)
ARCSEC_TO_RADIANS = math.pi / (3600.0 * 180.0)
def _gaussian(x: np.ndarray, background: float, amplitude: float, sigma: float, center: float) -> np.ndarray:
"""Gaussian peak with arbitrary amplitude on a constant background."""
return background + amplitude * np.exp(-0.5 * ((x - center) / sigma) ** 2.0)
[docs]
def horizontal_rocking_width(order: int) -> float:
"""
FWHM (arcs) of the resolution function at the detector for a given reflection order
References
----------
M. Agamalian et al., "Progress on The Time-of-Flight Ultra Small Angle Neutron Scattering Instrument at SNS",
J. Phys.: Conf. Ser. 1021 (2018) 012033.
Parameters
----------
order : int
Positive reflection order (a.k.a. harmonic or bank)
Returns
-------
float
Computed horizontal angular resolution.
"""
assert order > 0, "Order must be positive"
return 5.34 * math.exp(-0.01793 * order**2) / order**2
[docs]
class Scan(BaseModel):
"""A single scan (run) of a sample in an experiment
Attributes
----------
number : int
Scan (run) number
experiment : Experiment
Experiment this scan belongs to
monitor_data : MonitorData
Monitor data associated with this scan
detector_data : list[MonitorData]
Detector data associated with this scan
load_data : bool
Whether to load data files during initialization. Set to False when
creating placeholder scans (e.g. for CombinedSample).
"""
number: int = Field(..., description="Scan (run) number")
experiment: "Experiment" = Field(..., description="Experiment this scan belongs to")
monitor_data: MonitorData = Field(default_factory=MonitorData, description="Monitor data associated with this scan")
detector_data: list[MonitorData] = Field(
default_factory=list, description="Detector data associated with this scan"
)
load_data: bool = Field(True, description="Whether to load data files during initialization")
# NOTE: Not currently used, but may be useful in the future
# sample: "Sample" = Field(..., description="The sample associated with this scan")
[docs]
def model_post_init(self, _context: Any) -> None: # noqa ANN401
"""Post-validation initializer"""
if self.load_data:
self.load()
@property
def size(self) -> int:
"""Number of data points"""
return len(self.monitor_data.iq_data.q)
@property
def num_of_banks(self) -> int:
"""Number of detector banks in the Experiment"""
return self.experiment.num_of_banks
[docs]
def load(self):
"""Load experiment data files"""
self.load_monitor_data()
self.load_detector_data()
[docs]
def load_monitor_data(self):
filename = f"USANS_{self.number}_monitor_scan_ARN.txt"
filepath = os.path.join(self.experiment.folder, filename)
xy_data = self.read_xy_file(filepath)
iq_data = self.convert_xy_to_iq(xy_data)
self.monitor_data = MonitorData(xy_data=xy_data, iq_data=iq_data, filepath=filepath)
[docs]
def load_detector_data(self):
for bank in range(1, self.num_of_banks + 1):
filename = f"USANS_{self.number}_detector_scan_ARN_peak_{bank}.txt"
filepath = os.path.join(self.experiment.folder, filename)
xy_data = self.read_xy_file(filepath)
iq_data = self.convert_xy_to_iq(xy_data)
monitor_data = MonitorData(xy_data=xy_data, iq_data=iq_data, filepath=filepath)
self.detector_data.append(monitor_data)
[docs]
def read_xy_file(self, filepath: str) -> XYData:
"""Read XY data from a file"""
x, y, e, t = [], [], [], []
with open(filepath, "r") as file:
reader = csv.reader(file, delimiter=",")
for row in reader:
if len(row) < 3 or row[0].startswith("#"):
continue
x.append(float(row[0]))
y.append(float(row[1]))
e.append(float(row[2]))
if len(row) == 4:
t.append(float(row[3]))
return XYData(x=x, y=y, e=e, t=t)
[docs]
def convert_xy_to_iq(self, xy_data: XYData) -> IQData:
"""Convert XY data to IQ data
Directly copies x to q, y to i, and t.
For error, calculates based on a Poisson-like statistical model:
err = sqrt(|y - 0.5| + 0.5)
which ensures a minimum value to avoid zero error for low counts.
"""
iq_data = IQData(
q=xy_data.x.copy(),
i=xy_data.y.copy(),
e=[math.sqrt(math.fabs(y - 0.5) + 0.5) for y in xy_data.y],
t=xy_data.t.copy(),
)
return iq_data
[docs]
def normalize_by_monitor(self) -> None:
"""Normalize detector intensities by monitor counts.
Each harmonic in the scan is normalized independently, and within each harmonic,
the counts collected at the detector during the time the analyzer-motor remained at a particular angle
are divided by the counts collected at the monitor during such time.
"""
for harmonic in self.detector_data:
intensity_normalized = []
error_normalized = []
for monitor_i, monitor_e, detector_i, detector_e in zip(
self.monitor_data.iq_data.i,
self.monitor_data.iq_data.e,
harmonic.iq_data.i,
harmonic.iq_data.e,
):
intensity = detector_i / monitor_i
error = np.sqrt(detector_e**2 + (intensity * monitor_e) ** 2) / monitor_i
intensity_normalized.append(intensity)
error_normalized.append(error)
harmonic.iq_data.i = intensity_normalized
harmonic.iq_data.e = error_normalized
[docs]
class Sample(BaseModel):
"""Container for sample information, related scans, and data reduction methods"""
name: str = Field(..., description="Sample name")
experiment: "Experiment" = Field(..., description="Experiment this sample belongs to")
start_scan_num: int = Field(..., description="Starting number for this sample")
num_of_scans: int = Field(..., description="Number of scans for this sample")
scans: list[Scan] = Field(default_factory=list, description="List of scans for this sample")
thickness: float = Field(0.1, description="Sample thickness in cm")
is_background: bool = Field(False, description="Flag to indicate if this is a background sample")
exclude: list[int] = Field(default_factory=list, description="List of scan numbers to exclude")
# Fields that are initialized in model_post_init and not expected from user input
detector_data: list[IQData] = Field(default_factory=list, init=False, description="Original detector data")
data_scaled: list[IQData] = Field(default_factory=list, init=False, description="Data scaled to thickness")
data_log_binned: IQData = Field(default_factory=IQData, init=False, description="Log-binned data")
data_bg_subtracted: IQData = Field(default_factory=IQData, init=False, description="Background subtracted data")
[docs]
def model_post_init(self, _context: Any) -> None: # noqa ANN401
"""Post-validation initializer"""
for i in range(self.num_of_scans):
if i + self.start_scan_num in self.exclude:
continue
scan = Scan(
number=i + self.start_scan_num,
experiment=self.experiment,
)
self.scans.append(scan)
self.num_of_scans = len(self.scans)
# NOTE:
# - detector_data: original data after being stitched with another monitor-normalized scan
# - data_scaled: data after being scaled to thickness
# - data_log_binned: data_scaled after being log-binned
# - data_bg_subtracted: data_log_binned after background subtraction (aliased as self.data_reduced)
self.detector_data = []
self.data_scaled = []
self.data_log_binned = IQData()
self.data_bg_subtracted = IQData()
@property
def data(self):
"""Main detector data, currently an alias for detector_data[0]"""
return self.detector_data[0] if self.detector_data else None
@property
def size(self) -> int:
"""Number of detector data points"""
return len(self.data.q) if self.data else 0
@property
def data_reduced(self):
"""Reduced data, currently an alias for bg_subtracted data"""
return self.data_bg_subtracted
@property
def is_log_binned(self) -> bool:
"""Flag to indicate if the sample has been log-binned"""
return bool(self.data_log_binned.q)
@property
def is_reduced(self) -> bool:
"""Flag to indicate if the sample has been reduced"""
return bool(self.data_bg_subtracted.q)
@property
def size_reduced(self) -> int:
"""Number of reduced data points"""
return len(self.data_reduced.q)
@property
def config(self):
return self.experiment.config
@property
def num_of_banks(self) -> int:
"""Number of detector banks in the Experiment"""
return self.experiment.num_of_banks
@property
def num_log_bins(self) -> int:
"""Size of the log-binned data"""
return len(self.data_log_binned.q)
def __eq__(self, other: object) -> bool:
"""Equality comparison based on sample name and start number."""
if not isinstance(other, type(self)):
return NotImplemented
return other.name == self.name and other.start_scan_num == self.start_scan_num
[docs]
def dump_data_to_csv(self, filepath: str, data: IQData | XYData, title: str | None = None):
"""Dump IQ or XY data to a CSV file."""
output_dir = os.path.dirname(filepath)
if output_dir and not os.path.exists(output_dir):
logging.info(f"Output directory {output_dir} does not exist; creating it.")
os.makedirs(output_dir)
data_dict = data.as_dict()
keys = list(data_dict.keys())
# Longest list determines number of rows
num_rows = max([len(data_dict[key]) for key in keys])
with open(filepath, "w", newline="") as file:
writer = csv.writer(file)
if title:
writer.writerow([title])
for i in range(num_rows):
row = []
for k in keys:
try:
row.append(data_dict[k][i])
except IndexError:
row.append("")
writer.writerow(row)
return
[docs]
def dump_reduced_data_to_csv(
self,
detector_data: bool = True,
scaled_data: bool = True,
bg_subtracted_data: bool = True,
log_binned_data: bool = True,
):
"""Dump reduced data to CSV files based on specified flags."""
if detector_data and self.data:
filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_unscaled.txt")
self.dump_data_to_csv(filepath, self.data)
if self.config.get("save_all_harmonics", False):
for i in range(1, self.num_of_banks):
bank = i + 1 # start with the second order
filepath = os.path.join(
self.experiment.output_dir,
f"bank_{bank}",
f"UN_{self.name}_unscaled.txt",
)
self.dump_data_to_csv(filepath, self.detector_data[i])
if scaled_data:
filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1.txt")
self.dump_data_to_csv(filepath, self.data_scaled[0])
if self.config.get("save_all_harmonics", False):
for i in range(1, self.num_of_banks):
bank = i + 1 # start with the second order
filepath = os.path.join(
self.experiment.output_dir,
f"bank_{bank}",
f"UN_{self.name}.txt",
)
self.dump_data_to_csv(filepath, self.data_scaled[i])
if bg_subtracted_data:
filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_lbs.txt")
self.dump_data_to_csv(filepath, self.data_bg_subtracted)
if log_binned_data:
if not self.is_log_binned:
logging.info(f"Sample {self.name} has not been log-binned; skipping log-binned data dump.")
return
filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_lb.txt")
self.dump_data_to_csv(filepath, self.data_log_binned)
return
[docs]
def normalize_by_monitor(self) -> None:
"""Normalize detector intensities by monitor counts for all scans."""
for scan in self.scans:
scan.normalize_by_monitor()
[docs]
def reduce(self):
"""Reduce this sample's scans"""
self.normalize_by_monitor()
self.stitch_scans()
self.rocking_curve_centering()
self.rescale_data()
# Only process first detector bank
data_scaled = self.data_scaled[0]
logging.info(f"Only the first bank data is used for sample {self.name}.")
# Log-binning is optional
if self.experiment.log_binning:
self.data_log_binned = self.log_bin_data(data_scaled)
if self.experiment.background and not self.is_background:
self.subtract_background(self.experiment.background)
logging.info(f"Data reduction finished for sample {self.name}.")
return
# TODO: This function should be re-written from scratch
[docs]
def log_bin_data(self, data: IQData) -> IQData:
"""Log-bin the I(Q) data."""
assert len(data.q) == len(data.i) == len(data.e)
# Sort by momentum transfer
sorted_indices = np.argsort(data.q)
q = np.array(data.q)[sorted_indices]
i = np.array(data.i)[sorted_indices]
e = np.array(data.e)[sorted_indices]
iq_dict = {"I": list(i), "Q": list(q), "E": list(e)}
# The resolution in Q, ΔQ=2πΔθ/λ_1, where Δθ is the FWHM of the resolution function at the detector
fundamentalStep = 2 * math.pi * horizontal_rocking_width(1) * ARCSEC_TO_RADIANS / self.experiment.prim_wave
# Step multiplier
steps_per_decade = self.experiment.config["binning"]["steps_per_decade"]
alpha = math.exp(math.log(10) / steps_per_decade)
# step relative width
kappa = 2.0 * (alpha - 1) / (alpha + 1)
# floor ((ln((MyQ[InLength-1])/Qmin))/(ln(alpha)))
q_min = self.experiment.config["binning"]["q_min"]
numOfBins = math.floor(math.log(max(iq_dict["Q"]) / q_min) / math.log(alpha))
logQ = [q_min * (alpha**n) for n in range(numOfBins)]
logI = [None] * numOfBins
logE = [None] * numOfBins
logW = [1] * numOfBins
origIdx = 0
k2 = None
k3 = None
stepmin = None
stepmax = None
testVal = None
for lIdx, lq in enumerate(logQ):
testVal = kappa * lq
if testVal <= fundamentalStep:
while logI[lIdx] is None:
if origIdx < (len(iq_dict["Q"]) - 1) and iq_dict["Q"][origIdx + 1] > lq:
k2 = iq_dict["Q"][origIdx + 1] - iq_dict["Q"][origIdx]
k3 = lq - iq_dict["Q"][origIdx + 1]
# rtemp[outindex]=((k3/k2)+1)*MyR[inindex+1]-(k3/k2)*MyR[inindex]
logI[lIdx] = ((k3 / k2) + 1) * iq_dict["I"][origIdx + 1] - (k3 / k2) * iq_dict["I"][origIdx]
logE[lIdx] = (((k3 / k2) + 1) ** 2.0) * (iq_dict["E"][origIdx + 1] ** 2.0) + (
(k3 / k2) ** 2.0
) * (iq_dict["E"][origIdx] ** 2.0)
logW[lIdx] = 1
else:
origIdx += 1
else:
stepmin = lq - testVal / 2.0
stepmax = lq + testVal / 2.0
origIdx = 1
while origIdx < len(iq_dict["Q"]):
if (iq_dict["Q"][origIdx] + fundamentalStep / 2.0) >= stepmin:
break
origIdx += 1
while origIdx < len(iq_dict["Q"]):
if (iq_dict["Q"][origIdx] - fundamentalStep / 2.0) <= stepmin:
if logI[lIdx] is None:
# rtemp[outindex]=MyR[Inindex]*((MyQ[inindex]+FunStep/2)-stepmin)/funstep
# wtemp[outindex]=(MyQ[inindex]+FunStep/2-stepmin)/funstep
# stemp[outindex]=(MyS[InIndex]^2)*((MyQ[inindex]+FunStep/2-stepmin)/funstep)^2
logI[lIdx] = (
iq_dict["I"][origIdx]
* ((iq_dict["Q"][origIdx] + fundamentalStep / 2.0) - stepmin)
/ fundamentalStep
)
logW[lIdx] = (iq_dict["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep
logE[lIdx] = (iq_dict["E"][origIdx] ** 2.0) * (
(iq_dict["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep
) ** 2.0
else:
# rtemp[outindex]+=MyR[Inindex]*((MyQ[inindex]+FunStep/2)-stepmin)/funstep
# wtemp[outindex]+=(MyQ[inindex]+FunStep/2-stepmin)/funstep
# stemp[outindex]+=(MyS[InIndex]^2)*((MyQ[inindex]+FunStep/2-stepmin)/funstep)^2
logI[lIdx] += (
iq_dict["I"][origIdx]
* ((iq_dict["Q"][origIdx] + fundamentalStep / 2.0) - stepmin)
/ fundamentalStep
)
logW[lIdx] += (iq_dict["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep
logE[lIdx] += (iq_dict["E"][origIdx] ** 2.0) * (
(iq_dict["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep
) ** 2.0
elif (iq_dict["Q"][origIdx] + fundamentalStep / 2.0) > stepmax:
if logI[lIdx] is None:
# rtemp[outindex]=MyR[Inindex]*(stepmax-(MyQ[inindex]-FunStep/2))/funstep
# wtemp[outindex]=(stepmax-(MyQ[inindex]-FunStep/2))/funstep
# stemp[outindex]=(MyS[InIndex]^2)*((stepmax-(MyQ[inindex]-FunStep/2))/funstep)^2
logI[lIdx] = (
iq_dict["I"][origIdx]
* (stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0))
/ fundamentalStep
)
logW[lIdx] = (stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep
logE[lIdx] = (iq_dict["E"][origIdx] ** 2.0) * (
(stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep
) ** 2.0
else:
logI[lIdx] += (
iq_dict["I"][origIdx]
* (stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0))
/ fundamentalStep
)
logW[lIdx] += (stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep
logE[lIdx] += (iq_dict["E"][origIdx] ** 2.0) * (
(stepmax - (iq_dict["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep
) ** 2.0
else:
if logI[lIdx] is None:
logI[lIdx] = iq_dict["I"][origIdx]
logW[lIdx] = 1.0
logE[lIdx] = iq_dict["E"][origIdx] ** 2.0
else:
logI[lIdx] += iq_dict["I"][origIdx]
logW[lIdx] += 1.0
logE[lIdx] += iq_dict["E"][origIdx] ** 2.0
origIdx += 1
if origIdx < len(iq_dict["Q"]) and (iq_dict["Q"][origIdx] - fundamentalStep / 2.0) >= stepmax:
break
logI = [logI[ii] / logW[ii] for ii in range(numOfBins)]
logE = [logE[ii] / (logW[ii] ** 2.0) for ii in range(numOfBins)]
logE = [le**0.5 for le in logE]
return IQData(q=logQ, i=logI, e=logE)
@staticmethod
def _combine_duplicate_q_points(
q_scaled: list[float], i_scaled: list[float], e_scaled: list[float]
) -> tuple[list[float], list[float], list[float]]:
"""Sort by Q and average duplicate momentum transfer points.
Duplicate Q values happen when negative and positive analyzer-motor angles
have the same magnitude after conversion to momentum transfer in 1/angstrom.
Intensities are averaged, and uncertainties are propagated for the averaged
values.
"""
# Dictionary to store sums for averaging I and propagating errors for E.
# One dictionary entry per unique Q value, which is itself a dictionary.
sum_dict = defaultdict(lambda: {"I_sum": 0, "I_count": 0, "E_sum_squares": 0})
for q, i, e in zip(q_scaled, i_scaled, e_scaled):
sum_dict[q]["I_sum"] += i
sum_dict[q]["I_count"] += 1
sum_dict[q]["E_sum_squares"] += e**2
q_cleaned = []
i_cleaned = []
e_cleaned = []
for q, values in sorted(sum_dict.items()):
q_cleaned.append(q)
i_cleaned.append(values["I_sum"] / values["I_count"])
e_cleaned.append(math.sqrt(values["E_sum_squares"]) / values["I_count"])
return q_cleaned, i_cleaned, e_cleaned
[docs]
def rescale_data(self) -> None:
"""Rescale reflected data by the analyzer's solid angle acceptance and by sample thickness."""
assert self.size > 0, "No data points to rescale. Please check if the scans have been stitched correctly."
self.data_scaled = []
for harmonic in range(1, 1 + self.num_of_banks):
# angle-to-Q conversion factor: radians_per_arcsecond * (2π / λ_n)
theta_to_q = ARCSEC_TO_RADIANS * (2 * math.pi / (self.experiment.prim_wave / harmonic))
# analyzer solid angle acceptance ΔΩ = vertical angular width * horizontal angular width
analyzer_solid_angle = self.experiment.v_angle * (horizontal_rocking_width(harmonic) * ARCSEC_TO_RADIANS)
# negative theta angles do correspond to positive values of the momentum transfer, hence abs()
iq_data = self.detector_data[harmonic - 1]
q_scaled = [abs(theta) * theta_to_q for theta in iq_data.q]
scaling_factor = 1.0 / (analyzer_solid_angle * self.thickness)
i_scaled = [i * scaling_factor for i in iq_data.i]
e_scaled = [e * scaling_factor for e in iq_data.e]
q_cleaned, i_cleaned, e_cleaned = self._combine_duplicate_q_points(q_scaled, i_scaled, e_scaled)
iq_scaled = IQData(q=q_cleaned, i=i_cleaned, e=e_cleaned, t=[])
self.data_scaled.append(iq_scaled)
q_range = f"{min(self.data_scaled[0].q)} - {max(self.data_scaled[0].q)}"
logging.info(f"Rescaled data for sample {self.name}, Q-range: {q_range} 1/angstrom\n")
return
[docs]
def stitch_scans(self):
"""Stitch scan data from each detector bank into per-bank intensity curves.
For each detector bank (harmonic), combine all scans in ``self.scans`` onto a single
Intensity-versus-Q profile. Detector intensities and errors are expected to
already be normalized by ``Scan.normalize_by_monitor`` before being added
to the stitched output.
Notice that at this stage, the "Q" values are actually analyzer-motor angles,
that is, detector bank ``iq_data.q`` values are analyzer-motor angles (in arcsec units).
If two or more scans contain the same "Q" value, their intensities are combined
into one point using inverse-variance weighting, and the combined uncertainty
is stored as the square root of the inverse summed weights.
After all scans for a bank are processed, the stitched points are sorted by Q
The method also generates log messages for the raw scan theta ranges
and converted Q ranges in ``1/angstrom`` for the first detector bank.
Results are stored on ``self.detector_data``; no value is returned.
"""
# Build one stitched Q/I/E curve for each detector bank(harmonic).
for bank in range(self.num_of_banks):
momentum_transfer = []
intensity = []
error = []
# transmission = [] # omitted for now
for scan in self.scans:
scan_data = zip(
scan.detector_data[bank].iq_data.q,
scan.detector_data[bank].iq_data.i,
scan.detector_data[bank].iq_data.e,
)
for scan_q, detector_intensity, detector_error in scan_data:
if scan_q in momentum_transfer:
# Merge repeated Q values from multiple scans using inverse-variance weights.
matched_q_index = momentum_transfer.index(scan_q)
matched_weight = 1.0 / error[matched_q_index] ** 2
detector_weight = 1.0 / detector_error**2
combined_weight = matched_weight + detector_weight
intensity[matched_q_index] = (
intensity[matched_q_index] * matched_weight + detector_intensity * detector_weight
) / combined_weight
error[matched_q_index] = (1.0 / combined_weight) ** 0.5
else:
# Add Q values that have not appeared in earlier scans.
momentum_transfer.append(scan_q)
intensity.append(detector_intensity)
error.append(detector_error)
# Sort by momentum transfer (outside the scan loop, inside the bank loop)
sorted_indices = np.argsort(momentum_transfer)
momentum_transfer = np.array(momentum_transfer)[sorted_indices]
intensity = np.array(intensity)[sorted_indices]
error = np.array(error)[sorted_indices]
# Store the stitched curve for this detector bank.
self.detector_data.append(
IQData(
q=momentum_transfer.tolist(),
i=intensity.tolist(),
e=error.tolist(),
)
)
logging.info(f"Scans stitched together for sample {self.name}.\n")
theta_to_q = 2 * (math.pi**2.0) * 1.0 / (self.experiment.prim_wave * 3600.0 * 180.0)
theta_range_msg = ""
q_range_msg = ""
# Log raw theta ranges and converted Q ranges for each scan. Remember at this stage in the reduction,
# scan.detector_data[0].iq_data.q stores analyzer-motor angles, not yet converted to Q values
for scan in self.scans:
theta_range = f"{min(scan.detector_data[0].iq_data.q)} - {max(scan.detector_data[0].iq_data.q)}"
theta_range_msg += f"Theta range for scan {scan.number}: {theta_range}\n"
temp_q = [math.fabs(theta * theta_to_q) for theta in scan.detector_data[0].iq_data.q]
q_range_msg += f"Q range for scan {scan.number}: {min(temp_q)} - {max(temp_q)} 1/angstrom\n"
logging.info(theta_range_msg)
logging.info(q_range_msg)
return
[docs]
def rocking_curve_centering(self) -> float:
"""Center the stitched rocking curves by fitting a Gaussian peak to the rocking curve of the first harmonic.
Notice that the ``q`` values of the stitched rocking curves are analyzer motor angles at this
stage of reduction, not yet converted to Q values.
The first harmonic is fit to a Gaussian over the mostly symmetric angle range ``[q_min, -q_min]``,
where ``q_min`` is the minimum analyzier motor angle. It will be a negative value.
The center of the fitted Gaussian represents the value of the analyzer motor angle at which
the analyzer reflects neutrons that have not been scattered by the sample. It should be very close to zero.
Returns
-------
float
Fitted first-harmonic motor-angle center.
"""
assert self.detector_data, "Detector data must be stitched before centering."
first_harmonic_rocking_curve = self.detector_data[0]
if not first_harmonic_rocking_curve.q:
raise ValueError("Cannot center rocking curve because first-harmonic curve is empty.")
q = np.array(first_harmonic_rocking_curve.q)
intensity = np.array(first_harmonic_rocking_curve.i)
error = np.array(first_harmonic_rocking_curve.e)
q_min = float(np.min(q))
if q_min >= 0:
raise ValueError("Can't center rocking curve because angles don't include negative values.")
fit_mask = (q >= q_min) & (q <= -q_min)
q_fit = q[fit_mask]
intensity_fit = intensity[fit_mask]
error_fit = error[fit_mask] if len(error) == len(q) else None
if len(q_fit) < 3:
raise ValueError("Can't center rocking curve because fewer than three points are in the symmetric range.")
# Initial guess for Gaussian parameters: background, amplitude, sigma, center
initial_guess = {
"background": float(np.min(intensity_fit)),
"amplitude": float(np.max(intensity_fit) - np.min(intensity_fit)),
"sigma": float(max(np.std(q_fit), np.finfo(float).eps)),
"center": float(q_fit[np.argmax(intensity_fit)]),
}
best_vals, _sigma = curve_fit(
_gaussian,
q_fit,
intensity_fit,
p0=list(initial_guess.values()),
sigma=error_fit,
maxfev=100000,
)
q_offset = float(best_vals[3]) # the center of the fitted Gaussian
for rocking_curve in self.detector_data:
rocking_curve.q = [float(harmonic_q - q_offset) for harmonic_q in rocking_curve.q]
logging.info(f"Centered rocking curves for sample {self.name} using offset {q_offset}.")
return q_offset
def _match_or_interpolate(
self,
q_data: np.ndarray,
q_bg: np.ndarray,
i_bg: np.ndarray,
e_bg: np.ndarray,
tolerance: float = 1e-5,
) -> tuple[np.ndarray, np.ndarray]:
"""Match q_bg values to q_data directly if close enough, otherwise interpolate.
Used for background subtraction"""
i_bg_matched = np.zeros_like(q_data)
e_bg_matched = np.zeros_like(q_data)
for i, q in enumerate(q_data):
# Find index in q_bg that is closest to q
idx = np.abs(q_bg - q).argmin()
if np.abs(q_bg[idx] - q) <= max(tolerance * q, 1e-6):
# If within tolerance, take the value directly
i_bg_matched[i] = i_bg[idx]
e_bg_matched[i] = e_bg[idx]
else:
# Otherwise, interpolate
i_bg_matched[i] = np.interp(q, q_bg, i_bg)
e_bg_matched[i] = np.interp(q, q_bg, e_bg)
return i_bg_matched, e_bg_matched
[docs]
def subtract_background(self, background: "Sample") -> None:
"""Subtract background data from this sample's data.
Parameters
----------
background : Sample
The background sample to subtract. Must be processed (stitched, scaled, and binned).
"""
if self.experiment.log_binning:
assert self.is_log_binned, f"Sample {self.name} must be log-binned before background subtraction."
assert background.is_log_binned, (
f"Background {background.name} must be log-binned before background subtraction."
)
logging.info(
f"Logbinned data are used for background subtraction. Sample {self.name}, background {background.name}"
)
data = self.data_log_binned
bg_data = background.data_log_binned
sample_num_of_bins = self.num_log_bins
bg_num_of_bins = self.num_log_bins
# TODO: This should instead be background.num_log_bins, but we need to fix log binning first
# bg_num_of_bins = background.num_log_bins
if sample_num_of_bins < bg_num_of_bins:
num_of_bins = sample_num_of_bins
momentum_transfer = data.q.copy()
else:
num_of_bins = bg_num_of_bins
momentum_transfer = bg_data.q.copy()
intensity = [data.i[i] - bg_data.i[i] for i in range(num_of_bins)]
error = [math.sqrt(data.e[i] ** 2.0 + bg_data.e[i] ** 2.0) for i in range(num_of_bins)]
self.data_bg_subtracted.q = momentum_transfer
self.data_bg_subtracted.i = intensity
self.data_bg_subtracted.e = error
# Use interpolation if log-binning is not applied
else:
# Only process the first detector bank for now
data = self.data_scaled[0]
bg_data = background.data_scaled[0]
# Convert to numpy arrays for easier manipulation
q_data = np.array(data.q)
i_data = np.array(data.i)
e_data = np.array(data.e)
q_bg = np.array(bg_data.q)
i_bg = np.array(bg_data.i)
e_bg = np.array(bg_data.e)
# Match/interpolate background data to sample q values
i_bg_matched, e_bg_matched = self._match_or_interpolate(q_data, q_bg, i_bg, e_bg)
# Subtract background
i_subtracted = i_data - i_bg_matched
e_subtracted = np.sqrt(e_data**2 + e_bg_matched**2)
self.data_bg_subtracted.q = q_data.tolist()
self.data_bg_subtracted.i = i_subtracted.tolist()
self.data_bg_subtracted.e = e_subtracted.tolist()
logging.info(f"Subtracted background {background.name} from sample {self.name}")
return
[docs]
class CombinedSample(BaseModel):
"""Combine multiple Sample measurements at the raw (X,Y,E) scan level before converting to (Q,I,E).
Attributes
----------
name : str
Combined sample name.
experiment : Experiment
Experiment this combined sample belongs to.
thickness : float
Sample thickness in cm.
is_background : bool
Whether this combined sample represents a background measurement.
combined_samples : list[Sample]
Individual Sample objects whose scans will be combined.
combined_scans : list[Scan]
Scans produced by the combination.
"""
name: str = Field(..., description="Combined sample name")
experiment: "Experiment" = Field(..., description="Experiment associated with this combined sample")
thickness: float = Field(0.1, description="Sample thickness in cm")
is_background: bool = Field(False, description="Whether this is a background sample")
combined_samples: list[Sample] = Field(default_factory=list, description="Individual samples to combine")
combined_scans: list[Scan] = Field(default_factory=list, description="Combined scans (populated by combine)")
[docs]
def combine(self) -> None:
"""Sum raw XY data from all combined samples scan-by-scan, then generate IQ data.
For each scan index, the monitor and detector XY data from every sample are
accumulated. If a sample has fewer scans than others a warning is logged and
it is skipped for that index. After accumulation the XY to IQ conversion is
performed on the combined data and ready for reduction.
Raises
------
AssertionError
If ``combined_samples`` is empty or none of them contain scans.
"""
assert len(self.combined_samples) > 0, "No samples to combine."
# Reset combined scans in case this method is called multiple times
self.combined_scans: list[Scan] = []
max_scans = max((len(sample.scans) for sample in self.combined_samples), default=0)
assert max_scans > 0, "No scans in any sample to combine."
for scan_idx in range(max_scans):
for sample in self.combined_samples:
if scan_idx >= len(sample.scans):
logging.warning(
f"Sample '{sample.name}' contains fewer scans than others "
f"(has {len(sample.scans)}, expected at least {scan_idx + 1}). Skipping."
)
continue
source_scan = sample.scans[scan_idx]
if scan_idx >= len(self.combined_scans):
# First contribution for this scan index – create a new placeholder scan
new_scan = Scan(number=0, experiment=self.experiment, load_data=False)
# Seed monitor data from first contributor
new_scan.monitor_data = MonitorData(
xy_data=copy.deepcopy(source_scan.monitor_data.xy_data),
iq_data=IQData(),
)
# Seed detector data from first contributor
for bank_id in range(self.experiment.num_of_banks):
new_scan.detector_data.append(
MonitorData(
xy_data=copy.deepcopy(source_scan.detector_data[bank_id].xy_data),
iq_data=IQData(),
)
)
self.combined_scans.append(new_scan)
else:
# Subsequent contributions – accumulate into existing scan
self.combined_scans[scan_idx].monitor_data.xy_data = self._combine_xy_data_pair(
self.combined_scans[scan_idx].monitor_data.xy_data,
source_scan.monitor_data.xy_data,
)
for bank_id in range(self.experiment.num_of_banks):
self.combined_scans[scan_idx].detector_data[bank_id].xy_data = self._combine_xy_data_pair(
self.combined_scans[scan_idx].detector_data[bank_id].xy_data,
source_scan.detector_data[bank_id].xy_data,
)
# After all samples have contributed, convert XY → IQ for this scan
scan = self.combined_scans[scan_idx]
scan.monitor_data.iq_data = scan.convert_xy_to_iq(scan.monitor_data.xy_data)
for bank_id in range(self.experiment.num_of_banks):
scan.detector_data[bank_id].iq_data = scan.convert_xy_to_iq(
scan.detector_data[bank_id].xy_data,
)
logging.info(
f"Combined {len(self.combined_samples)} samples into '{self.name}' ({len(self.combined_scans)} scans)."
)
# ------------------------------------------------------------------
# Static helper – combines two XYData objects by binning close X values
# ------------------------------------------------------------------
@staticmethod
def _combine_xy_data_pair(base: XYData, other: XYData, tolerance: float = 1e-8) -> XYData:
"""Combine two :class:`XYData` objects by summing Y values at matching X bins.
X values are discretised to integer bins of width ``tolerance`` so that
floating-point rounding does not prevent matching. Y values are summed,
errors are propagated in quadrature, and T values are averaged.
Parameters
----------
base : XYData
Accumulated data so far.
other : XYData
New data to add.
tolerance : float
Bin width used to discretise X values (default ``1e-8``).
Returns
-------
XYData
Merged result.
"""
combined: dict[int, dict] = defaultdict(lambda: {"y_sum": 0.0, "e_sq_sum": 0.0, "t_list": [], "count": 0})
for xy_data in (base, other):
x_arr = np.array(xy_data.x)
y_arr = np.array(xy_data.y)
e_arr = np.array(xy_data.e)
t_vals = xy_data.t if xy_data.t and len(xy_data.t) == len(xy_data.x) else [0.0] * len(xy_data.x)
for x, y, e, t in zip(x_arr, y_arr, e_arr, t_vals):
x_key = int(np.round(x / tolerance))
combined[x_key]["y_sum"] += y
combined[x_key]["e_sq_sum"] += e**2
combined[x_key]["t_list"].append(t)
combined[x_key]["count"] += 1
out_x: list[float] = []
out_y: list[float] = []
out_e: list[float] = []
out_t: list[float] = []
for x_key in sorted(combined.keys()):
entry = combined[x_key]
out_x.append(x_key * tolerance)
out_y.append(entry["y_sum"])
out_e.append(np.sqrt(entry["e_sq_sum"]))
out_t.append(float(np.mean(entry["t_list"])) if entry["t_list"] else 0.0)
return XYData(x=out_x, y=out_y, e=out_e, t=out_t)
[docs]
class Experiment(BaseModel):
"""Experiment configuration for USANS data reduction
Attributes
----------
config_file : str
Path to the configuration file
config : dict
Configuration file loaded into a Python dictionary. Populated for both JSON and CSV config files
output_dir : str | None
Output folder for reduced data, default is current folder
prim_wave : float
Primary wavelength in Angstroms, default is 3.6
v_angle : float
Vertical angle, default is 0.042
log_binning : bool
Flag for log-binning, default is False
"""
config_file: str = Field(..., description="Path to the configuration file")
config: dict = Field(default_factory=dict, description="configuration file loaded into a Python dictionary")
output_dir: str = Field("", description="Output folder for reduced data")
prim_wave: float = Field(3.6, description="Primary wavelength in Angstroms")
v_angle: float = Field(0.042, description="Vertical angle")
log_binning: bool = Field(False, description="Flag for log-binning")
num_of_banks: int = Field(default=4, init=False, description="Number of detector banks")
folder: str = Field(default="", init=False, description="Working folder for this experiment")
background: "Sample | None" = Field(default=None, init=False, description="Background sample")
samples: list["Sample"] = Field(default_factory=list, init=False, description="List of samples")
[docs]
def model_post_init(self, _context: Any) -> None: # noqa ANN401
"""Post-validation initializer"""
# The working folder for this experiment, default is current folder
_setupfile = os.path.abspath(self.config_file)
self.folder = os.path.dirname(_setupfile)
if bool(self.output_dir) is False: # in case `output_dir` is an empty string
self.output_dir = os.path.join(self.folder, "reduced")
self.num_of_banks: int = 4
if not os.path.exists(self.config_file):
raise FileNotFoundError(f"The file path: {self.config_file} does not exist")
self.folder = os.path.dirname(self.config_file)
self.config = read_config(self.config_file)
self.log_binning = cast_to_bool(self.config["binning"]["log_binning"])
background = self.config["background"]
if background is None:
logging.info("No background sample defined in the configuration file.")
self.background = None
else:
self.background = Sample(**background, experiment=self)
self.samples = [Sample(**s, experiment=self) for s in self.config["samples"]]
[docs]
def amend_log_binning(self, logbin: bool) -> None:
"""Override the log-binning setting with the command-line --logbin flag.
For backwards compatibility when user enters a CSV file
Parameters
----------
logbin : bool
When True, enables log-binning regardless of what the config file says.
When False, the config-file value set during initialisation is preserved.
"""
if logbin:
self.log_binning = True
self.config["binning"]["log_binning"] = True
[docs]
def reduce(self, output_dir: str | None = None):
"""Reduce the USANS data
Parameters
----------
output_dir: str | None
The result will be dumped to the output folder. If none will just use current folder
"""
if output_dir is not None:
self.output_dir = output_dir
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
if self.background:
try:
self.background.reduce()
except Exception as e: # noqa BLE001
logging.exception(f"Cannot reduce background {self.background.name}: {e}")
for sample in self.samples:
try:
sample.reduce()
except Exception as e: # noqa BLE001
logging.exception(f"Cannot reduce sample {sample.name}: {e}")
self.dump_reduced_data()
return
[docs]
def dump_reduced_data(self):
"""Dump reduced data to txt files"""
for sample in self.samples:
sample.dump_reduced_data_to_csv()
if self.background is not None:
self.background.dump_reduced_data_to_csv()
[docs]
def parse_args():
import argparse
parser = argparse.ArgumentParser(description="USANS Data Reduction")
parser.add_argument("path", help="Path to the configuration file")
parser.add_argument(
"-l",
"--logbin",
action="store_true",
help="Enable log-binning of data during reduction. Option only valid for CSV files",
)
parser.add_argument("-o", "--output", default="", help="Output folder for reduced data (default: current folder)")
args = parser.parse_args()
return args
[docs]
def main():
"""Main function to run USANS data reduction"""
args = parse_args()
experiment = Experiment(config_file=args.path, output_dir=args.output)
if is_csv(args.path): # backwards compatibility for CSV files, which don't have log-binning settings
experiment.amend_log_binning(args.logbin)
experiment.reduce()
generate_report(config_file_path=args.path, output_dir=experiment.output_dir)
logging.info("USANS data reduction completed.")
if __name__ == "__main__":
main()