adds new function to analyze rheed data, doesn't really work atm

thanks DeepSeek
This commit is contained in:
2026-03-16 12:51:05 +01:00
parent 10c68bf260
commit 839799a13f

View File

@@ -179,6 +179,57 @@ def deduplicate_instruments_from_layers(layers):
# "rheed_system": rheeds, # "rheed_system": rheeds,
# } # }
def analyse_rheed_data(data, n_layers: int):
'''
Takes the content of a tsv file and returns a dictionary with timestamps and intensities.
The file should contain a 2D array composed of 3N+1 columns - where N is the total number of layers in a given sample - and an unspecified number of rows.
-----
Time Layer1_Int1 Layer1_Int2 Layer1_Int3 (repeat...)
-----
Distinct ValueErrors are raised if:
- The array is not 2-dimensional;
- The number of (intensity) columns is not a multiple of 3;
- The total number of columns does not equate exactly 3N+1.
# TO-DO: complete this description...
Written with help from DeepSeek.
'''
# Verifying the format of the input file:
if data.ndim != 2:
raise ValueError(f"Unexpected trace format: expected 2D array, got ndim = {data.ndim}.")
n_cols = data.shape[1] # 0 = rows, 1 = columns
if (n_cols - 1) % 3 != 0:
raise ValueError(f"Unexpected number of columns: expected 3N+1 columns, got {n_cols}.")
if (n_cols - 1) // 3 != n_layers:
exp = n_layers * 3 + 1
raise ValueError(f"Unexpected volume of data: found {n_layers} layers, expected {exp} (3N+1) columns, got {n_cols}.")
n_time_points = data.shape[0]
# Get time (all rows of col 0) as Float64:
time = data[:, 0].astype(np.float64, copy=False) # copy=False suggested by LLM for mem. eff.
# Empty 3D array for intensities:
intensities = np.zeros(
(n_layers, n_time_points, 3)
)
# Loop through layers:
for layer_index in range(n_layers):
layer_name = f"layer_{layer_index + 1}"
# Columns for this layer are from 3i+1 to 3i+3 incl. (= 3i+4 excl.)
start_col = 1 + layer_index * 3
end_col = start_col + 3 # remember this gets excluded!
# Get layer-specific intensities (all rows of columns start_col:end_col) as Float32:
intensities[layer_index, :, :] = data[:, start_col:end_col].astype(np.float32, copy=False)
return {
"time": time,
"intensity": intensities,
}
def make_nexus_schema_dictionary(substrate_object, layers): def make_nexus_schema_dictionary(substrate_object, layers):
''' '''
Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications. Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications.
@@ -457,9 +508,23 @@ def build_nexus_file(pld_fabrication, output_path, rheed_osc=None):
nx_instruments.create_dataset("rheed_system", data = instruments_dict["rheed_system"]) nx_instruments.create_dataset("rheed_system", data = instruments_dict["rheed_system"])
except TypeError as te: except TypeError as te:
raise TypeError(te) raise TypeError(te)
nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata" # RHEED data section
nx_rheed.create_dataset("intensity", data=rheed_osc) if rheed_osc is not None:
nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata"
nx_rheed.create_dataset("time", data=rheed_osc["time"])
nx_rheed["time"].attrs["units"] = "s"
nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
#nx_rheed["intensity"].attrs["units"] = "counts"
nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
nx_rheed.attrs["signal"] = "intensity"
nx_rheed.attrs["axes"] = "layer:time:channel"
nx_rheed.attrs["layer_indices"] = [0] # asse layer
nx_rheed.attrs["time_indices"] = [1] # asse tempo
nx_rheed.attrs["channel_indices"] = [2]
return return
if __name__=="__main__": if __name__=="__main__":
@@ -472,10 +537,16 @@ if __name__=="__main__":
sample_name = sample.name.strip().replace(" ","_") sample_name = sample.name.strip().replace(" ","_")
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
n_layers = len(layers) # total number of layers on the sample
result = make_nexus_schema_dictionary(substrate_object, layers) result = make_nexus_schema_dictionary(substrate_object, layers)
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug # print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
with open (f"output/sample-{sample_name}.json", "w") as f: with open (f"output/sample-{sample_name}.json", "w") as f:
json.dump(result, f, indent=3) json.dump(result, f, indent=3)
# TO-DO: remove the hard-coded path of the RWA file
# ideally the script should download a TXT/CSV file from each layer
# (IF PRESENT ←→ also handle missing file error)
# and merge all data in a single file to analyse it
with open(f"tests/Realtime_Window_Analysis.txt", "r") as o: with open(f"tests/Realtime_Window_Analysis.txt", "r") as o:
osc = np.loadtxt(o) osc = np.loadtxt(o, delimiter="\t")
build_nexus_file(result, output_path=f"output/sample-{sample_name}-nexus.h5", rheed_osc=osc) rheed_osc = analyse_rheed_data(data=osc, n_layers=n_layers) or None # analyze rheed data first, build the file later
build_nexus_file(result, output_path=f"output/sample-{sample_name}-nexus.h5", rheed_osc=rheed_osc)