Source code for usansred.reduce_USANS

import csv
import json
import logging
import math
import os
import sys
import traceback
import warnings

import numpy as np
from mantid.simpleapi import (
    ConvertTableToMatrixWorkspace,
    CropWorkspace,
    LoadEventNexus,
    Rebin,
    StepScan,
    SumSpectra,
    mtd,
)
from matplotlib import use

from usansred import reduce
from usansred.io.save import save_ascii, save_summed_spectra
from usansred.summary import generate_report

use("agg")
np.seterr(all="ignore")
warnings.filterwarnings("ignore", module="numpy")
peaks = []


[docs] def update_sequence_info(out_file, info): scan_dict_global = {} if os.path.isfile(out_file): with open(out_file, "r") as fd: try: scan_dict_global = json.loads(fd.read()) except: # noqa E722 logging.error("Could not read json file: creating a new one") scan_dict_global.update(info) with open(out_file, "w") as fd: fd.write(json.dumps(scan_dict_global))
[docs] def get_sequence_info(seq_file): """Get the sequence information from local autoreduce folder json files for autoreduction""" if os.path.exists(seq_file): with open(seq_file) as json_file: seq_dict = json.load(json_file) else: return None return seq_dict
[docs] def main(): # check number of arguments if len(sys.argv) != 3: print("autoreduction code requires a filename and an output directory") sys.exit(1) if not (os.path.isfile(sys.argv[1])): print("data file ", sys.argv[1], " not found") sys.exit() else: filename = sys.argv[1] outdir = sys.argv[2] # Don't load monitors unless we really need them try: LoadEventNexus(filename, LoadMonitors=True, OutputWorkspace="USANS") load_monitors = True except: # noqa E722 LoadEventNexus(filename, LoadMonitors=False, OutputWorkspace="USANS") load_monitors = False # LoadEventNexus(filename, LoadMonitors=False, OutputWorkspace="USANS") # load_monitors = False file_prefix = os.path.split(filename)[1].split(".")[0] # if mtd['USANS'].getRun().hasProperty("BL1A:CS:Scan:USANS:Wavelength"): # main_wl = mtd['USANS'].getRun().getProperty("BL1A:CS:Scan:USANS:Wavelength").value[0] # else: # main_wl = "main_peak" # Get ROI from logs wavelength = [3.6, 1.8, 1.2, 0.9, 0.72, 0.6] roi_min = mtd["USANS"].getRun().getProperty("BL1A:Det:N1:Det1:TOF:ROI:1:Min").value[-1] # roi_step = mtd["USANS"].getRun().getProperty("BL1A:Det:N1:Det1:TOF:ROI:1:Size").value[-1] # Reference to the item in the wavelength array main_index = 1 for i in range(1, 8): lower_bound = mtd["USANS"].getRun().getProperty("BL1A:Det:N1:Det1:TOF:ROI:%s:Min" % i).value[-1] tof_step = mtd["USANS"].getRun().getProperty("BL1A:Det:N1:Det1:TOF:ROI:%s:Size" % i).value[-1] if i > 1 and lower_bound == roi_min: main_index = i - 2 peaks.append([lower_bound * 1000.0, (lower_bound + tof_step) * 1000.0]) main_wl = wavelength[main_index] # Produce ASCII data Rebin(InputWorkspace="USANS", Params="0,10,17000", OutputWorkspace="USANS") file_path = os.path.join(outdir, "%s_detector_trans.txt" % file_prefix) save_summed_spectra("USANS", file_path, header="TOF, COUNTS, ERROR") CropWorkspace( InputWorkspace="USANS", StartWorkspaceIndex=0, EndWorkspaceIndex=1023, OutputWorkspace="USANS_detector", ) file_path = os.path.join(outdir, "%s_detector.txt" % file_prefix) save_summed_spectra("USANS_detector", file_path, header="TOF, COUNTS, ERROR") CropWorkspace( InputWorkspace="USANS", StartWorkspaceIndex=1024, EndWorkspaceIndex=2047, OutputWorkspace="USANS_trans", ) file_path = os.path.join(outdir, "%s_trans.txt" % file_prefix) save_summed_spectra("USANS_trans", file_path, header="TOF, COUNTS, ERROR") if load_monitors: Rebin( InputWorkspace="USANS_monitors", Params="0,10,17000", OutputWorkspace="USANS_monitors", ) file_path = os.path.join(outdir, "%s_monitor.txt" % file_prefix) save_ascii("USANS_monitors", file_path, header="TOF, COUNTS, ERROR") # Find whether we have a motor turning short_name = "" for item in mtd["USANS"].getRun().getProperties(): if item.name.startswith("BL1A:Mot:") and not item.name.endswith(".RBV"): stats = item.getStatistics() if ( abs(stats.mean) > 0 and abs(stats.standard_deviation / item.getStatistics().mean) > 0.01 or abs(stats.mean) == 0 and abs(stats.standard_deviation) > 0.0 ): scan_var = item.name short_name = item.name.replace("BL1A:Mot:", "") y_monitor = None if load_monitors: StepScan( InputWorkspace="USANS_monitors", OutputWorkspace="mon_scan_table", ) ConvertTableToMatrixWorkspace( InputWorkspace="mon_scan_table", ColumnX=scan_var, ColumnY="Counts", ColumnE="Error", OutputWorkspace="USANS_scan_monitor", ) file_path = os.path.join(outdir, "%s_monitor_scan_%s.txt" % (file_prefix, short_name)) save_ascii("USANS_scan_monitor", file_path, header=f"{scan_var}, COUNTS, ERROR") y_monitor = mtd["USANS_scan_monitor"].readY(0) iq_file_path_simple = os.path.join(outdir, "%s_iq_%s_%s.txt" % (file_prefix, short_name, main_wl)) iq_fd_simple = open(iq_file_path_simple, "w") iq_file_path = os.path.join(outdir, "%s_iq_%s.txt" % (file_prefix, short_name)) iq_fd = open(iq_file_path, "w") start_time = mtd["USANS"].getRun().getProperty("start_time").value experiment = mtd["USANS"].getRun().getProperty("experiment_identifier").value run_number = mtd["USANS"].getRun().getProperty("run_number").value run_title = mtd["USANS"].getRun().getProperty("run_title").value sequence_first_run = mtd["USANS"].getRun().getProperty("BL1A:CS:Scan:USANS:FirstRun").value[-1] sequence_index = mtd["USANS"].getRun().getProperty("BL1A:CS:Scan:USANS:Index").value[-1] meta_wavelength = mtd["USANS"].getRun().getProperty("BL1A:CS:Scan:USANS:Wavelength").value[-1] print("Wavelength: %s [%s]" % (wavelength[main_index], meta_wavelength)) iq_fd.write("# Experiment %s Run %s\n" % (experiment, run_number)) iq_fd.write("# Run start time: %s\n" % start_time) iq_fd.write("# Title: %s\n" % run_title) iq_fd.write("# Sequence ID: %s\n" % sequence_first_run) iq_fd.write("# Sequence index: %s\n" % sequence_index) iq_fd.write("# Selected wavelength: %s\n" % wavelength[main_index]) iq_fd.write( "# %-8s %-10s %-10s %-10s %-10s %-10s %-10s %-5s\n" % ("Q", "I(Q)", "dI(Q)", "dQ", "N(Q)", "dN(Q)", "Mon(Q)", "Lambda") ) iq_fd_simple.write("# Experiment %s Run %s\n" % (experiment, run_number)) iq_fd_simple.write("# Run start time: %s\n" % start_time) iq_fd_simple.write("# Title: %s\n" % run_title) iq_fd_simple.write("# Sequence ID: %s\n" % sequence_first_run) iq_fd_simple.write("# Sequence index: %s\n" % sequence_index) iq_fd_simple.write("# Selected wavelength: %s\n" % wavelength[main_index]) iq_fd_simple.write("# %-8s %-10s %-10s\n" % ("Q", "I(Q)", "dI(Q)")) for i in range(len(peaks)): peak = peaks[i] CropWorkspace( InputWorkspace="USANS_detector", OutputWorkspace="peak_detector", XMin=peak[0], XMax=peak[1], ) StepScan(InputWorkspace="peak_detector", OutputWorkspace="scan_table") ConvertTableToMatrixWorkspace( InputWorkspace="scan_table", ColumnX=scan_var, ColumnY="Counts", ColumnE="Error", OutputWorkspace="USANS_scan_detector", ) mtd["USANS_scan_detector"].getAxis(1).getUnit().setLabel("Counts", "Counts") x_data = mtd["USANS_scan_detector"].readX(0) y_data = mtd["USANS_scan_detector"].readY(0) e_data = mtd["USANS_scan_detector"].readE(0) if i == 0: file_path = os.path.join(outdir, "%s_detector_%s.txt" % (file_prefix, main_wl)) save_ascii("USANS_scan_detector", file_path, header=f"{scan_var}, COUNTS, ERROR") # json_file_path = os.path.join(outdir, "%s_plot_data.json" % file_prefix) # SavePlot1DAsJson( # InputWorkspace="USANS_scan_detector", JsonFilename=json_file_path, PlotName="main_output" # ) try: from postprocessing.publish_plot import plot1d except ImportError: try: from finddata.publish_plot import plot1d except: # noqa E722 logging.error("Cannot import postprocessing or finddata.") try: plot1d( run_number, [[x_data, y_data, e_data]], instrument="USANS", x_title=scan_var, y_title="Counts", y_log=True, ) except: # noqa E722 logging.error("Error calling plot1d, no image plotted") traceback.print_exc() # Save scan info to use for stitching later update_sequence_info( os.path.join(outdir, "scan_%s.json" % sequence_first_run), {run_number: {"iq": file_path}}, ) update_sequence_info( os.path.join(outdir, "sample_%s.json" % sequence_first_run), {"title": run_title, "background": 0, "thickness": 0.1}, ) for i_theta in range(len(x_data)): q = ( 2.0 * math.pi * math.sin(x_data[i_theta] * math.pi / 180.0 / 3600.0) / wavelength[main_index] ) # if q<=0: # continue # Write I(q) file i_q = y_data[i_theta] / y_monitor[i_theta] di_q = math.sqrt( (e_data[i_theta] / y_monitor[i_theta]) ** 2 + y_data[i_theta] ** 2 / y_monitor[i_theta] ** 3 ) iq_fd_simple.write("%-10.6g %-10.6g %-10.6g\n" % (q, i_q, di_q)) else: file_path = os.path.join( outdir, "%s_detector_scan_%s_peak_%s.txt" % (file_prefix, short_name, i), ) save_ascii("USANS_scan_detector", file_path, header=f"{scan_var}, COUNTS, ERROR") for i_theta in range(len(x_data)): q = 2.0 * math.pi * math.sin(x_data[i_theta] * math.pi / 180.0 / 3600.0) / wavelength[i - 1] # if q<=0: # continue # Write complete I(q) file i_q = y_data[i_theta] / y_monitor[i_theta] di_q = math.sqrt( (e_data[i_theta] / y_monitor[i_theta]) ** 2 + y_data[i_theta] ** 2 / y_monitor[i_theta] ** 3 ) if i_q > 0: iq_fd.write( "%-10.6g %-10.6g %-10.6g %-10.6g %-10.6g %-10.6g %-10.6g %-5.4g\n" % ( q, i_q, di_q, 0, y_data[i_theta], e_data[i_theta], y_monitor[i_theta], wavelength[i - 1], ) ) CropWorkspace( InputWorkspace="USANS_trans", OutputWorkspace="peak_trans", XMin=peak[0], XMax=peak[1], ) StepScan(InputWorkspace="peak_trans", OutputWorkspace="scan_table") ConvertTableToMatrixWorkspace( InputWorkspace="scan_table", ColumnX=scan_var, ColumnY="Counts", OutputWorkspace="USANS_scan_trans", ) if i == 0: file_path = os.path.join(outdir, "%s_trans_%s.txt" % (file_prefix, main_wl)) else: file_path = os.path.join( outdir, "%s_trans_scan_%s_peak_%s.txt" % (file_prefix, short_name, i), ) save_ascii("USANS_scan_trans", file_path, header=f"{scan_var}, COUNTS, ERROR") iq_fd.close() iq_fd_simple.close() # list all the sequence files json_files = [ pos_json for pos_json in os.listdir(outdir) if pos_json.endswith(".json") and pos_json.startswith("scan") ] json_files.sort() json_files = [os.path.join(outdir, ff) for ff in json_files] sample_json_files = [ pos_json for pos_json in os.listdir(outdir) if pos_json.endswith(".json") and pos_json.startswith("sample") ] sample_json_files.sort() sample_json_files = [os.path.join(outdir, ff) for ff in json_files] # construct the csv file seq_len = [] seq_thickness = [] seq_titles = [] seq_start_runnums = [] seq_background_flag = [] for idx, ff in enumerate(json_files): seq_dict = get_sequence_info(ff) try: sample_dict = get_sequence_info(sample_json_files[idx]) except: # noqa E722 sample_dict = {} seq_len.append(len(seq_dict) - 1) if "title" in sample_dict: seq_titles.append(sample_dict["title"]) else: seq_titles.append("Sample" + str(idx)) if "thickness" in sample_dict: seq_thickness.append(int(sample_dict["thickness"])) else: seq_thickness.append(0.1) if "background_flag" in sample_dict: seq_background_flag.append(int(sample_dict["background"])) else: seq_background_flag.append(0) seq_start_runnums.append(list(seq_dict)[0]) # in case there is no background designated, assign the first sample if sum(seq_background_flag) == 0.0: seq_background_flag[0] = 1 else: pass autocsvfile = os.path.join(outdir, "auto.csv") with open(autocsvfile, "w", newline="") as autocsv: datawriter = csv.writer(autocsv, delimiter=",") for flag, title, first_run, ll, thickness in zip( seq_background_flag, seq_start_runnums, seq_titles, seq_len, seq_thickness ): flag = "b" if flag == 1 else "s" rr = [str(col) for col in [flag, first_run, title, ll, thickness]] datawriter.writerow(rr) sys.path.insert(2, "/SNS/USANS/shared/autoreduce/usans-reduction") autodir = os.path.join(outdir, "auto") if not os.path.exists(autodir): os.makedirs(autodir) exp = reduce.Experiment(config=autocsvfile) exp.reduce(output_dir=autodir) generate_report(config_file_path=autocsvfile, output_dir=exp.outputFolder)
# seq_dict = get_sequence_info(os.path.join(outdir, "scan_%s.json" % sequence_first_run)) if __name__ == "__main__": main()