Source code for usansred.reduce

import copy
import csv
import logging
import math
import os
from collections import defaultdict
from typing import Any

import numpy as np
from pydantic import BaseModel, Field
from scipy.optimize import curve_fit, differential_evolution

from usansred.io.read import read_config
from usansred.model import IQData, MonitorData, XYData
from usansred.summary import generate_report

# separate logging in file and console
logging.basicConfig(filename="file.log", filemode="w", level=logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger("").addHandler(console)

DARWIN_WIDTH = 5.1  # Darwin width, shouldn't really ever change unless instrument is disassembled


[docs] class Scan(BaseModel): """A single scan (run) of a sample in an experiment Attributes ---------- number : int Scan (run) number experiment : Experiment Experiment this scan belongs to monitor_data : MonitorData Monitor data associated with this scan detector_data : list[MonitorData] Detector data associated with this scan load_data : bool Whether to load data files during initialization. Set to False when creating placeholder scans (e.g. for CombinedSample). """ number: int = Field(..., description="Scan (run) number") experiment: "Experiment" = Field(..., description="Experiment this scan belongs to") monitor_data: MonitorData = Field(default_factory=MonitorData, description="Monitor data associated with this scan") detector_data: list[MonitorData] = Field( default_factory=list, description="Detector data associated with this scan" ) load_data: bool = Field(True, description="Whether to load data files during initialization") # NOTE: Not currently used, but may be useful in the future # sample: "Sample" = Field(..., description="The sample associated with this scan")
[docs] def model_post_init(self, _context: Any) -> None: # noqa ANN401 """Post-validation initializer""" if self.load_data: self.load()
@property def size(self) -> int: """Number of data points""" return len(self.monitor_data.iq_data.q) @property def num_of_banks(self) -> int: """Number of detector banks in the Experiment""" return self.experiment.num_of_banks
[docs] def load(self): """Load experiment data files""" self.load_monitor_data() self.load_detector_data()
[docs] def load_monitor_data(self): filename = f"USANS_{self.number}_monitor_scan_ARN.txt" filepath = os.path.join(self.experiment.folder, filename) xy_data = self.read_xy_file(filepath) iq_data = self.convert_xy_to_iq(xy_data) self.monitor_data = MonitorData(xy_data=xy_data, iq_data=iq_data, filepath=filepath)
[docs] def load_detector_data(self): for bank in range(1, self.num_of_banks + 1): filename = f"USANS_{self.number}_detector_scan_ARN_peak_{bank}.txt" filepath = os.path.join(self.experiment.folder, filename) xy_data = self.read_xy_file(filepath) iq_data = self.convert_xy_to_iq(xy_data) monitor_data = MonitorData(xy_data=xy_data, iq_data=iq_data, filepath=filepath) self.detector_data.append(monitor_data)
[docs] def read_xy_file(self, filepath: str) -> XYData: """Read XY data from a file""" x, y, e, t = [], [], [], [] with open(filepath, "r") as file: reader = csv.reader(file, delimiter=",") for row in reader: if len(row) < 3 or row[0].startswith("#"): continue x.append(float(row[0])) y.append(float(row[1])) e.append(float(row[2])) if len(row) == 4: t.append(float(row[3])) return XYData(x=x, y=y, e=e, t=t)
[docs] def convert_xy_to_iq(self, xy_data: XYData) -> IQData: """Convert XY data to IQ data Directly copies x to q, y to i, and t. For error, calculates based on a Poisson-like statistical model: err = sqrt(|y - 0.5| + 0.5) which ensures a minimum value to avoid zero error for low counts. """ iq_data = IQData( q=xy_data.x.copy(), i=xy_data.y.copy(), e=[math.sqrt(math.fabs(y - 0.5) + 0.5) for y in xy_data.y], t=xy_data.t.copy(), ) return iq_data
[docs] class Sample(BaseModel): """Container for sample information, related scans, and data reduction methods""" name: str = Field(..., description="Sample name") experiment: "Experiment" = Field(..., description="Experiment this sample belongs to") start_scan_num: int = Field(..., description="Starting number for this sample") num_of_scans: int = Field(..., description="Number of scans for this sample") scans: list[Scan] = Field(default_factory=list, description="List of scans for this sample") thickness: float = Field(0.1, description="Sample thickness in cm") is_background: bool = Field(False, description="Flag to indicate if this is a background sample") exclude: list[int] = Field(default_factory=list, description="List of scan numbers to exclude") # Fields that are initialized in model_post_init and not expected from user input detector_data: list[IQData] = Field(default_factory=list, init=False, description="Original detector data") data_scaled: list[IQData] = Field(default_factory=list, init=False, description="Data scaled to thickness") data_log_binned: IQData = Field(default_factory=IQData, init=False, description="Log-binned data") data_bg_subtracted: IQData = Field(default_factory=IQData, init=False, description="Background subtracted data")
[docs] def model_post_init(self, _context: Any) -> None: # noqa ANN401 """Post-validation initializer""" for i in range(self.num_of_scans): if i + self.start_scan_num in self.exclude: continue scan = Scan( number=i + self.start_scan_num, experiment=self.experiment, ) self.scans.append(scan) self.num_of_scans = len(self.scans) # NOTE: # - detector_data: original data after being stitched with another scan # - data_scaled: data after being scaled to thickness # - data_log_binned: data_scaled after being log-binned # - data_bg_subtracted: data_log_binned after background subtraction (aliased as self.data_reduced) self.detector_data = [] self.data_scaled = [] self.data_log_binned = IQData() self.data_bg_subtracted = IQData()
@property def data(self): """Main detector data, currently an alias for detector_data[0]""" return self.detector_data[0] if self.detector_data else None @property def size(self) -> int: """Number of detector data points""" return len(self.data.q) if self.data else 0 @property def data_reduced(self): """Reduced data, currently an alias for bg_subtracted data""" return self.data_bg_subtracted @property def is_log_binned(self) -> bool: """Flag to indicate if the sample has been log-binned""" return bool(self.data_log_binned.q) @property def is_reduced(self) -> bool: """Flag to indicate if the sample has been reduced""" return bool(self.data_bg_subtracted.q) @property def size_reduced(self) -> int: """Number of reduced data points""" return len(self.data_reduced.q) @property def num_of_banks(self) -> int: """Number of detector banks in the Experiment""" return self.experiment.num_of_banks @property def num_log_bins(self) -> int: """Size of the log-binned data""" return len(self.data_log_binned.q) def __eq__(self, other: object) -> bool: """Equality comparison based on sample name and start number.""" if not isinstance(other, type(self)): return NotImplemented return other.name == self.name and other.start_scan_num == self.start_scan_num
[docs] def dump_data_to_csv(self, filepath: str, data: IQData | XYData, title: str | None = None): """Dump IQ or XY data to a CSV file.""" data_dict = data.as_dict() keys = list(data_dict.keys()) # Longest list determines number of rows num_rows = max([len(data_dict[key]) for key in keys]) with open(filepath, "w", newline="") as file: writer = csv.writer(file) if title: writer.writerow([title]) for i in range(num_rows): row = [] for k in keys: try: row.append(data_dict[k][i]) except IndexError: row.append("") writer.writerow(row) return
[docs] def dump_reduced_data_to_csv( self, detector_data: bool = True, scaled_data: bool = True, bg_subtracted_data: bool = True, log_binned_data: bool = True, ): """Dump reduced data to CSV files based on specified flags.""" if detector_data and self.data: filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_unscaled.txt") self.dump_data_to_csv(filepath, self.data) if scaled_data: filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1.txt") self.dump_data_to_csv(filepath, self.data_scaled[0]) if bg_subtracted_data: filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_lbs.txt") self.dump_data_to_csv(filepath, self.data_bg_subtracted) if log_binned_data: if not self.is_log_binned: logging.info(f"Sample {self.name} has not been log-binned; skipping log-binned data dump.") return filepath = os.path.join(self.experiment.output_dir, f"UN_{self.name}_det_1_lb.txt") self.dump_data_to_csv(filepath, self.data_log_binned) return
[docs] def reduce(self): """Reduce this sample's scans""" self.stitch_data() self.rescale_data() # Only process first detector bank data_scaled = self.data_scaled[0] logging.info(f"Only the first bank data is used for sample {self.name}.") # Log-binning is optional if self.experiment.logbin: self.data_log_binned.q, self.data_log_binned.i, self.data_log_binned.e = self.log_bin_data( data_scaled.q, data_scaled.i, data_scaled.e ) if not self.is_background: self.subtract_background(self.experiment.background) logging.info(f"Data reduction finished for sample {self.name}.") return
# TODO: This function should be re-written from scratch
[docs] def log_bin_data( self, momentum_transfer: list[float], intensity: list[float], error: list[float] ) -> tuple[list[float], list[float], list[float]]: """Log-bin the I(Q) data.""" assert len(momentum_transfer) == len(intensity) == len(error) # Sort by momentum transfer sorted_indices = np.argsort(momentum_transfer) momentum_transfer = np.array(momentum_transfer)[sorted_indices] intensity = np.array(intensity)[sorted_indices] error = np.array(error)[sorted_indices] data = {"I": list(intensity), "Q": list(momentum_transfer), "E": list(error)} # The fundamental Q width of the measurement harNo = 1.0 # Only the first harmonic peak is used fundamentalStep = ( 2 * math.pi**2 * self.experiment.darwin_width * harNo / (self.experiment.prim_wave * 3600.0 * 180.0) ) # Step multiplier alpha = math.exp(math.log(10) / self.experiment.step_per_dec) # step relative width kappa = 2.0 * (alpha - 1) / (alpha + 1) # floor ((ln((MyQ[InLength-1])/Qmin))/(ln(alpha))) numOfBins = math.floor(math.log(max(data["Q"]) / self.experiment.min_q) / math.log(alpha)) logQ = [self.experiment.min_q * (alpha**i) for i in range(numOfBins)] logI = [None] * numOfBins logE = [None] * numOfBins logW = [1] * numOfBins origIdx = 0 k2 = None k3 = None stepmin = None stepmax = None testVal = None for lIdx, lq in enumerate(logQ): testVal = kappa * lq if testVal <= fundamentalStep: while logI[lIdx] is None: if origIdx < (len(data["Q"]) - 1) and data["Q"][origIdx + 1] > lq: k2 = data["Q"][origIdx + 1] - data["Q"][origIdx] k3 = lq - data["Q"][origIdx + 1] # rtemp[outindex]=((k3/k2)+1)*MyR[inindex+1]-(k3/k2)*MyR[inindex] logI[lIdx] = ((k3 / k2) + 1) * data["I"][origIdx + 1] - (k3 / k2) * data["I"][origIdx] logE[lIdx] = (((k3 / k2) + 1) ** 2.0) * (data["E"][origIdx + 1] ** 2.0) + ((k3 / k2) ** 2.0) * ( data["E"][origIdx] ** 2.0 ) logW[lIdx] = 1 else: origIdx += 1 else: stepmin = lq - testVal / 2.0 stepmax = lq + testVal / 2.0 origIdx = 0 while origIdx < len(data["Q"]): origIdx += 1 # emulate the do-while loop if not ((data["Q"][origIdx] + fundamentalStep / 2.0) < stepmin): break while origIdx < len(data["Q"]): if (data["Q"][origIdx] - fundamentalStep / 2.0) <= stepmin: if logI[lIdx] is None: # rtemp[outindex]=MyR[Inindex]*((MyQ[inindex]+FunStep/2)-stepmin)/funstep # wtemp[outindex]=(MyQ[inindex]+FunStep/2-stepmin)/funstep # stemp[outindex]=(MyS[InIndex]^2)*((MyQ[inindex]+FunStep/2-stepmin)/funstep)^2 logI[lIdx] = ( data["I"][origIdx] * ((data["Q"][origIdx] + fundamentalStep / 2.0) - stepmin) / fundamentalStep ) logW[lIdx] = (data["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep logE[lIdx] = (data["E"][origIdx] ** 2.0) * ( (data["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep ) ** 2.0 else: # rtemp[outindex]+=MyR[Inindex]*((MyQ[inindex]+FunStep/2)-stepmin)/funstep # wtemp[outindex]+=(MyQ[inindex]+FunStep/2-stepmin)/funstep # stemp[outindex]+=(MyS[InIndex]^2)*((MyQ[inindex]+FunStep/2-stepmin)/funstep)^2 logI[lIdx] += ( data["I"][origIdx] * ((data["Q"][origIdx] + fundamentalStep / 2.0) - stepmin) / fundamentalStep ) logW[lIdx] += (data["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep logE[lIdx] += (data["E"][origIdx] ** 2.0) * ( (data["Q"][origIdx] + fundamentalStep / 2.0 - stepmin) / fundamentalStep ) ** 2.0 elif (self.data["Q"][origIdx] + fundamentalStep / 2.0) > stepmax: if logI[lIdx] is None: # rtemp[outindex]=MyR[Inindex]*(stepmax-(MyQ[inindex]-FunStep/2))/funstep # wtemp[outindex]=(stepmax-(MyQ[inindex]-FunStep/2))/funstep # stemp[outindex]=(MyS[InIndex]^2)*((stepmax-(MyQ[inindex]-FunStep/2))/funstep)^2 logI[lIdx] = ( data["I"][origIdx] * (stepmax - (data["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep ) logW[lIdx] = (stepmax - (data["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep logE[lIdx] = (data["E"][origIdx] ** 2.0) * ( (stepmax - (data["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep ) ** 2.0 else: logI[lIdx] += ( data["I"][origIdx] * (stepmax - (data["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep ) logW[lIdx] += ( stepmax - (self.data["Q"][origIdx] - fundamentalStep / 2.0) ) / fundamentalStep logE[lIdx] += (data["E"][origIdx] ** 2.0) * ( (stepmax - (data["Q"][origIdx] - fundamentalStep / 2.0)) / fundamentalStep ) ** 2.0 else: if logI[lIdx] is None: logI[lIdx] = data["I"][origIdx] logW[lIdx] = 1.0 logE[lIdx] = data["E"][origIdx] ** 2.0 else: logI[lIdx] += data["I"][origIdx] logW[lIdx] += 1.0 logE[lIdx] += data["E"][origIdx] ** 2.0 origIdx += 1 # emulate the do-while loop if not ((data["Q"][origIdx] - fundamentalStep / 2.0) < stepmax): break logI = [logI[ii] / logW[ii] for ii in range(numOfBins)] logE = [logE[ii] / (logW[ii] ** 2.0) for ii in range(numOfBins)] logE = [le**0.5 for le in logE] return (logQ, logI, logE)
[docs] def rescale_data(self, guess_init: bool = False) -> None: """Rescale data with thickness. Fit the I(Q) data to gaussian and calculate peak area. Parameters ---------- guess_init : bool Whether to guess initial parameters for fitting with differential_evolution. """ def _gaussian(x: np.ndarray, k0: float, k1: float, k2: float) -> np.ndarray: """The Gaussian equation according to Igor definition of gaussian curvefit. https://www.wavemetrics.net/doc/igorman/V-01%20Reference.pdf """ return k0 + 1.0 / (k1 * np.sqrt(2 * math.pi)) * np.exp(-1.0 / 2.0 * ((x - k2) / k1) ** 2.0) def _sum_of_squared_error(params: tuple[float, float, float]) -> float: """Function for genetic algorithm to minimize.""" k0, k1, k2 = params val = _gaussian(np.array(self.data.q), k0, k1, k2) result = np.sum((np.array(self.data.i) - val) ** 2) return result def _generate_initial_parameters(test_x: np.ndarray, test_y: np.ndarray): max_x = max(test_x) max_y = max(test_y) max_xy = max(max_x, max_y) parameter_bounds = [ [-max_xy, max_xy], # k0 [-max_xy, max_xy], # k1 [-max_xy, max_xy], # k2 ] # "seed" the numpy random number generator for repeatable results result = differential_evolution(_sum_of_squared_error, parameter_bounds, seed=3) return result.x def _clean_iq(q_scaled: list[float], i_scaled: list[float], e_scaled: list[float]): """Remove duplicate values in q by: - Taking the average of all i-values with the same q - Taking the standard deviation of all e-values with the same q """ # Dictionary to store sums for averaging I and propagating errors for E sum_dict = defaultdict(lambda: {"I_sum": 0, "I_count": 0, "E_sum_squares": 0}) for q, i, e in zip(q_scaled, i_scaled, e_scaled): sum_dict[q]["I_sum"] += i sum_dict[q]["I_count"] += 1 sum_dict[q]["E_sum_squares"] += e**2 q_cleaned = [] i_cleaned = [] e_cleaned = [] for q, values in sum_dict.items(): q_cleaned.append(q) i_cleaned.append(values["I_sum"] / values["I_count"]) e_cleaned.append(math.sqrt(values["E_sum_squares"])) return q_cleaned, i_cleaned, e_cleaned assert self.size > 0 initial_vals = [3.8e-6, 0.1, 0.8] peak_area = None q_offset = None for b in range(self.num_of_banks): bank = b + 1 # 2*(Pi^2)*HarNo/(PrimWavel*3600*180) h_scale = 2 * (math.pi**2.0) * bank / (self.experiment.prim_wave * 3600.0 * 180.0) # VertScale=VertAngle*(DarWidth/HarNo)*Pi/(3600*180)*Sthick//10 v_scale = ( self.experiment.v_angle * (self.experiment.darwin_width / bank) * math.pi / (3600.0 * 180.0) * self.thickness ) if guess_init: initial_vals = _generate_initial_parameters(np.array(self.data.q), np.array(self.data.i)) best_vals, sigma = curve_fit( _gaussian, np.array(self.data.q), np.array(self.data.i), p0=initial_vals, sigma=self.data.e, maxfev=100000, ) initial_vals = list(best_vals) peak_area = ( 1.0 / (initial_vals[1] * math.sqrt(2 * math.pi)) * initial_vals[1] * math.sqrt(2.0) / math.sqrt(2 * math.pi) ) dar_test = initial_vals[1] * math.sqrt(2.0) * (math.pi) ** 0.5 * bank / self.experiment.darwin_width if dar_test <= 0.13: q_offset = initial_vals[2] else: q_offset = 0.0 q_scaled = [math.fabs((q - q_offset) * h_scale) for q in self.data.q] i_scaled = [i / (v_scale * peak_area) for i in self.data.i] e_scaled = [e / (v_scale * peak_area) for e in self.data.e] q_cleaned, i_cleaned, e_cleaned = _clean_iq(q_scaled, i_scaled, e_scaled) iq_scaled = IQData(q=q_cleaned, i=i_cleaned, e=e_cleaned, t=[]) self.data_scaled.append(iq_scaled) q_range = f"{min(self.data_scaled[0].q)} - {max(self.data_scaled[0].q)}" logging.info(f"Rescaled data for sample {self.name}, Q-range: {q_range} 1/angstrom\n") return
[docs] def stitch_data(self): """Stitch scans together and store in property `self.data`""" for bank in range(self.num_of_banks): momentum_transfer = [] intensity = [] error = [] # transmission = [] # omitted for now for scan in self.scans: it = list( zip( scan.monitor_data.iq_data.q, scan.monitor_data.iq_data.i, scan.monitor_data.iq_data.e, scan.detector_data[bank].iq_data.i, scan.detector_data[bank].iq_data.e, ) ) for mq, mi, me, di, de in it: if mq in momentum_transfer: idx = momentum_transfer.index(mq) var = error[idx] ** 2 + (de / mi) ** 2 intensity[idx] = (intensity[idx] * (error[idx] ** 2) + (di / mi) * (de / mi) ** 2) / var error[idx] = var**0.5 else: momentum_transfer.append(mq) intensity.append(di / mi) error.append(de / mi) # Sort by momentum transfer (outside the scan loop, inside the bank loop) sorted_indices = np.argsort(momentum_transfer) momentum_transfer = np.array(momentum_transfer)[sorted_indices] intensity = np.array(intensity)[sorted_indices] error = np.array(error)[sorted_indices] self.detector_data.append( IQData( q=momentum_transfer.tolist(), i=intensity.tolist(), e=error.tolist(), ) ) logging.info(f"Scans stitched together for sample {self.name}.\n") h_scale = 2 * (math.pi**2.0) * 1.0 / (self.experiment.prim_wave * 3600.0 * 180.0) theta_range_msg = "" q_range_msg = "" for scan in self.scans: q_range = f"{min(scan.detector_data[0].iq_data.q)} - {max(scan.detector_data[0].iq_data.q)}" theta_range_msg += f"Theta range for scan {scan.number}: {q_range}\n" temp_q = [math.fabs(q * h_scale) for q in scan.detector_data[0].iq_data.q] q_range_msg += f"Q range for scan {scan.number}: {min(temp_q)} - {max(temp_q)} 1/angstrom\n" logging.info(theta_range_msg) logging.info(q_range_msg) return
def _match_or_interpolate( self, q_data: np.ndarray, q_bg: np.ndarray, i_bg: np.ndarray, e_bg: np.ndarray, tolerance: float = 1e-5, ) -> tuple[np.ndarray, np.ndarray]: """Match q_bg values to q_data directly if close enough, otherwise interpolate. Used for background subtraction""" i_bg_matched = np.zeros_like(q_data) e_bg_matched = np.zeros_like(q_data) for i, q in enumerate(q_data): # Find index in q_bg that is closest to q idx = np.abs(q_bg - q).argmin() if np.abs(q_bg[idx] - q) <= max(tolerance * q, 1e-6): # If within tolerance, take the value directly i_bg_matched[i] = i_bg[idx] e_bg_matched[i] = e_bg[idx] else: # Otherwise, interpolate i_bg_matched[i] = np.interp(q, q_bg, i_bg) e_bg_matched[i] = np.interp(q, q_bg, e_bg) return i_bg_matched, e_bg_matched
[docs] def subtract_background(self, background: "Sample", v_scale: float = 1.0) -> None: """Subtract background data from this sample's data. Parameters ---------- background : Sample The background sample to subtract. Must be processed (stitched, scaled, and binned). v_scale : float Scaling factor for background intensity before subtraction. """ if self.experiment.logbin: assert self.is_log_binned, f"Sample {self.name} must be log-binned before background subtraction." assert background.is_log_binned, ( f"Background {background.name} must be log-binned before background subtraction." ) logging.info( f"Logbinned data are used for background subtraction. Sample {self.name}, background {background.name}" ) data = self.data_log_binned bg_data = background.data_log_binned scale_f = v_scale * self.thickness / background.thickness sample_num_of_bins = self.num_log_bins bg_num_of_bins = self.num_log_bins # TODO: This should instead be background.num_log_bins, but we need to fix log binning first # bg_num_of_bins = background.num_log_bins if sample_num_of_bins < bg_num_of_bins: num_of_bins = sample_num_of_bins momentum_transfer = data.q.copy() else: num_of_bins = bg_num_of_bins momentum_transfer = bg_data.q.copy() intensity = [data.i[i] - (scale_f * bg_data.i[i]) for i in range(num_of_bins)] error = [math.sqrt(data.e[i] ** 2.0 + (scale_f * bg_data.e[i]) ** 2.0) for i in range(num_of_bins)] self.data_bg_subtracted.q = momentum_transfer self.data_bg_subtracted.i = intensity self.data_bg_subtracted.e = error # Use interpolation if log-binning is not applied else: # Only process the first detector bank for now data = self.data_scaled[0] bg_data = background.data_scaled[0] # Convert to numpy arrays for easier manipulation q_data = np.array(data.q) i_data = np.array(data.i) e_data = np.array(data.e) q_bg = np.array(bg_data.q) i_bg = np.array(bg_data.i) e_bg = np.array(bg_data.e) # Match/interpolate background data to sample q values i_bg_matched, e_bg_matched = self._match_or_interpolate(q_data, q_bg, i_bg, e_bg) # Subtract background i_subtracted = i_data - i_bg_matched e_subtracted = np.sqrt(e_data**2 + e_bg_matched**2) self.data_bg_subtracted.q = q_data.tolist() self.data_bg_subtracted.i = i_subtracted.tolist() self.data_bg_subtracted.e = e_subtracted.tolist() logging.info(f"Subtracted background {background.name} from sample {self.name}") return
[docs] class CombinedSample(BaseModel): """Combine multiple Sample measurements at the raw (X,Y,E) scan level before converting to (Q,I,E). Attributes ---------- name : str Combined sample name. experiment : Experiment Experiment this combined sample belongs to. thickness : float Sample thickness in cm. is_background : bool Whether this combined sample represents a background measurement. combined_samples : list[Sample] Individual Sample objects whose scans will be combined. combined_scans : list[Scan] Scans produced by the combination. """ name: str = Field(..., description="Combined sample name") experiment: "Experiment" = Field(..., description="Experiment associated with this combined sample") thickness: float = Field(0.1, description="Sample thickness in cm") is_background: bool = Field(False, description="Whether this is a background sample") combined_samples: list[Sample] = Field(default_factory=list, description="Individual samples to combine") combined_scans: list[Scan] = Field(default_factory=list, description="Combined scans (populated by combine)")
[docs] def combine(self) -> None: """Sum raw XY data from all combined samples scan-by-scan, then generate IQ data. For each scan index, the monitor and detector XY data from every sample are accumulated. If a sample has fewer scans than others a warning is logged and it is skipped for that index. After accumulation the XY to IQ conversion is performed on the combined data and ready for reduction. Raises ------ AssertionError If ``combined_samples`` is empty or none of them contain scans. """ assert len(self.combined_samples) > 0, "No samples to combine." # Reset combined scans in case this method is called multiple times self.combined_scans: list[Scan] = [] max_scans = max((len(sample.scans) for sample in self.combined_samples), default=0) assert max_scans > 0, "No scans in any sample to combine." for scan_idx in range(max_scans): for sample in self.combined_samples: if scan_idx >= len(sample.scans): logging.warning( f"Sample '{sample.name}' contains fewer scans than others " f"(has {len(sample.scans)}, expected at least {scan_idx + 1}). Skipping." ) continue source_scan = sample.scans[scan_idx] if scan_idx >= len(self.combined_scans): # First contribution for this scan index – create a new placeholder scan new_scan = Scan(number=0, experiment=self.experiment, load_data=False) # Seed monitor data from first contributor new_scan.monitor_data = MonitorData( xy_data=copy.deepcopy(source_scan.monitor_data.xy_data), iq_data=IQData(), ) # Seed detector data from first contributor for bank_id in range(self.experiment.num_of_banks): new_scan.detector_data.append( MonitorData( xy_data=copy.deepcopy(source_scan.detector_data[bank_id].xy_data), iq_data=IQData(), ) ) self.combined_scans.append(new_scan) else: # Subsequent contributions – accumulate into existing scan self.combined_scans[scan_idx].monitor_data.xy_data = self._combine_xy_data_pair( self.combined_scans[scan_idx].monitor_data.xy_data, source_scan.monitor_data.xy_data, ) for bank_id in range(self.experiment.num_of_banks): self.combined_scans[scan_idx].detector_data[bank_id].xy_data = self._combine_xy_data_pair( self.combined_scans[scan_idx].detector_data[bank_id].xy_data, source_scan.detector_data[bank_id].xy_data, ) # After all samples have contributed, convert XY → IQ for this scan scan = self.combined_scans[scan_idx] scan.monitor_data.iq_data = scan.convert_xy_to_iq(scan.monitor_data.xy_data) for bank_id in range(self.experiment.num_of_banks): scan.detector_data[bank_id].iq_data = scan.convert_xy_to_iq( scan.detector_data[bank_id].xy_data, ) logging.info( f"Combined {len(self.combined_samples)} samples into '{self.name}' ({len(self.combined_scans)} scans)." )
# ------------------------------------------------------------------ # Static helper – combines two XYData objects by binning close X values # ------------------------------------------------------------------ @staticmethod def _combine_xy_data_pair(base: XYData, other: XYData, tolerance: float = 1e-8) -> XYData: """Combine two :class:`XYData` objects by summing Y values at matching X bins. X values are discretised to integer bins of width ``tolerance`` so that floating-point rounding does not prevent matching. Y values are summed, errors are propagated in quadrature, and T values are averaged. Parameters ---------- base : XYData Accumulated data so far. other : XYData New data to add. tolerance : float Bin width used to discretise X values (default ``1e-8``). Returns ------- XYData Merged result. """ combined: dict[int, dict] = defaultdict(lambda: {"y_sum": 0.0, "e_sq_sum": 0.0, "t_list": [], "count": 0}) for xy_data in (base, other): x_arr = np.array(xy_data.x) y_arr = np.array(xy_data.y) e_arr = np.array(xy_data.e) t_vals = xy_data.t if xy_data.t and len(xy_data.t) == len(xy_data.x) else [0.0] * len(xy_data.x) for x, y, e, t in zip(x_arr, y_arr, e_arr, t_vals): x_key = int(np.round(x / tolerance)) combined[x_key]["y_sum"] += y combined[x_key]["e_sq_sum"] += e**2 combined[x_key]["t_list"].append(t) combined[x_key]["count"] += 1 out_x: list[float] = [] out_y: list[float] = [] out_e: list[float] = [] out_t: list[float] = [] for x_key in sorted(combined.keys()): entry = combined[x_key] out_x.append(x_key * tolerance) out_y.append(entry["y_sum"]) out_e.append(np.sqrt(entry["e_sq_sum"])) out_t.append(float(np.mean(entry["t_list"])) if entry["t_list"] else 0.0) return XYData(x=out_x, y=out_y, e=out_e, t=out_t)
[docs] class Experiment(BaseModel): """Experiment configuration for USANS data reduction Attributes ---------- config : str Path to the configuration file output_dir : str | None Output folder for reduced data, default is current folder prim_wave : float Primary wavelength in Angstroms, default is 3.6 darwin_width : float Darwin width, default is 5.1 v_angle : float Vertical angle, default is 0.042 min_q : float Minimum Q value for binning output, default is 1e-6 step_per_dec : int Steps per decade in binning, default is 33 logbin : bool Flag for log-binning, default is False """ config: str = Field(..., description="Path to the configuration file") output_dir: str = Field("", description="Output folder for reduced data") prim_wave: float = Field(3.6, description="Primary wavelength in Angstroms") darwin_width: float = Field(DARWIN_WIDTH, description="Darwin width") v_angle: float = Field(0.042, description="Vertical angle") min_q: float = Field(1e-6, description="Minimum Q value for binning output") step_per_dec: int = Field(33, description="Steps per decade in binning") logbin: bool = Field(False, description="Flag for logbinning") # Fields that are initialized in model_post_init and not expected from user input folder: str = Field(default="", init=False, description="Working folder for this experiment") num_of_banks: int = Field(default=4, init=False, description="Number of detector banks") background: "Sample | None" = Field(default=None, init=False, description="Background sample") samples: list["Sample"] = Field(default_factory=list, init=False, description="List of samples")
[docs] def model_post_init(self, _context: Any) -> None: # noqa ANN401 """Post-validation initializer""" # The working folder for this experiment, default is current folder _setupfile = os.path.abspath(self.config) self.folder = os.path.dirname(_setupfile) if bool(self.output_dir) is False: # in case `output_dir` is an empty string self.output_dir = os.path.join(self.folder, "reduced") self.num_of_banks: int = 4 if not os.path.exists(self.config): raise FileNotFoundError(f"The file path: {self.config} does not exist") self.folder = os.path.dirname(self.config) background, samples = read_config(self.config) if background is None: logging.warning("No background sample defined in the configuration file.") self.background = None else: self.background = Sample(**background, experiment=self) self.samples = [Sample(**s, experiment=self) for s in samples]
[docs] def reduce(self, output_dir: str | None = None): """Reduce the USANS data Parameters ---------- output_dir: str | None The result will be dumped to the output folder. If none will just use current folder """ if output_dir is not None: self.output_dir = output_dir if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) try: self.background.reduce() except Exception as e: # noqa BLE001 logging.exception(f"Cannot reduce background {self.background.name}: {e}") for sample in self.samples: try: sample.reduce() except Exception as e: # noqa BLE001 logging.exception(f"Cannot reduce sample {sample.name}: {e}") self.dump_reduced_data() return
[docs] def dump_reduced_data(self): """Dump reduced data to txt files""" for sample in self.samples: sample.dump_reduced_data_to_csv() if self.background is not None: self.background.dump_reduced_data_to_csv()
[docs] def parse_args(): import argparse parser = argparse.ArgumentParser(description="USANS Data Reduction") parser.add_argument("path", help="Path to the configuration file") parser.add_argument("-l", "--logbin", action="store_true", help="Enable log-binning of data during reduction") parser.add_argument("-o", "--output", default="", help="Output folder for reduced data (default: current folder)") args = parser.parse_args() return args
[docs] def main(): """Main function to run USANS data reduction""" args = parse_args() experiment = Experiment(config=args.path, logbin=args.logbin, output_dir=args.output) experiment.reduce() generate_report(config_file_path=args.path, output_dir=experiment.output_dir) logging.info("USANS data reduction completed.")
if __name__ == "__main__": main()