Files
eXParser-PLD/src/main.py
PioApocalypse bb1ea8f1c3 proposed: schemas are placed in src/schema (module)
separating schemas from main.py might be a good idea since the parser
will support more fabrication methods, but since every method has its
dictionary is it even possible?
2026-05-08 11:20:10 +02:00

745 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
import os, json, requests, h5py
import numpy as np
from getpass import getpass
from APIHandler import APIHandler
from classes import *
from PIL import Image
# from schema import pld_deposition
def call_entrypoint_from_elabid(elabid):
"""
Calls an entrypoint sample from eLabFTW using its elabid, then returns an object of the Entrypoint class.
If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError.
"""
try:
sample_data = APIHandler(apikey).get_entry_from_elabid(
elabid, entryType="items"
)
if not sample_data.get("category_title") == "Sample":
raise ValueError(
"The resource you selected is not a sample, therefore it can't be used as an entrypoint."
)
sample_object = Entrypoint(sample_data)
except ConnectionError as e:
raise ConnectionError(e)
return sample_object # Entrypoint-class object
def call_material_from_elabid(elabid):
"""
Calls a material from eLabFTW using its elabid, then returns an object of the Material class.
If the entry is neither a PLD Target or a Substrate batch returns ValueError. Such entries always have a category_title key with its value matching exactly "PLD Target" or "Substrate".
Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted.
"""
try:
material_data = APIHandler(apikey).get_entry_from_elabid(
elabid, entryType="items"
)
material_category = material_data.get("category_title")
# TO-DO: correct this typo on elabftw: Subtrate → Substrate.
if not material_category in ["PLD Target", "Substrate", "Subtrate"]:
print(f"Category of the resource: {material_category}.")
raise ValueError(
f"The referenced resource (elabid = {elabid}) is not a material."
)
elif material_category == "PLD Target":
material_object = Target(material_data)
else:
material_object = Substrate(material_data)
except ConnectionError as e:
raise ConnectionError(e)
return material_object # Material-class object
def call_layers_from_list(elabid_list):
"""
Calls a list of (PLD deposition) experiments from eLabFTW using their elabid - which means the input must be a list of integers instead of a single one - then returns a list of Layer-class objects.
If one of the entries is not related to a deposition layer (category_title not matching exactly "PLD Deposition") that entry is skipped, with no error raised.
"""
list_of_layers = []
for elabid in elabid_list:
try:
layer_data = APIHandler(apikey).get_entry_from_elabid(
elabid, entryType="experiments"
)
if not layer_data.get("category_title") == "PLD Deposition":
continue
layer_object = Layer(layer_data)
list_of_layers.append(layer_object)
except ConnectionError as e:
nums = [layer.layer_number for layer in list_of_layers]
nums.sort()
print(f"LIST OF THE LAYERS PROCESSED (unordered):\n" + str(nums))
raise ConnectionError(
f"An error occurred while fetching the experiment with elabid = {elabid}:\n"
+ str(e)
+ f"\nPlease solve the problem before retrying."
+ "\n\n"
+ f"Last resource attempted to call: {ELABFTW_API_URL}/experiments/{elabid}"
)
return list_of_layers # list of Layer-class objects
def chain_entrypoint_to_batch(sample_object):
"""
Takes an Entrypoint-class object, looks at its .batch_elabid attribute and returns a Material-class object containing data on the substrate batch associated to the starting sample.
Dependency: call_material_from_elabid.
"""
material_elabid = sample_object.batch_elabid
material_object = call_material_from_elabid(material_elabid)
return material_object
def chain_entrypoint_to_layers(sample_object):
"""
Takes an Entrypoint-class object, looks at its .linked_experiments_elabid attribute (list) and returns a list of Layer-class objects containing data on the deposition layers associated to the starting sample - using the function call_layers_from_list.
The list is sorted by progressive layer number (layer_number attribute).
Dependency: call_layers_from_list.
"""
linked_experiments_elabid = (
sample_object.linked_experiments_elabid
) # list of elabid
layer_object_list = call_layers_from_list(linked_experiments_elabid)
layer_object_list.sort(key=lambda x: x.layer_number)
return layer_object_list
def chain_layer_to_target(layer_object):
"""
Takes a Layer-class object, looks at its .target_elabid attribute and returns a Material-class object containing data on the PLD target used in the deposition of said layer.
Dependency: call_material_from_elabid.
"""
target_elabid = layer_object.target_elabid
material_object = call_material_from_elabid(target_elabid)
return material_object
def deduplicate_instruments_from_layers(layers):
"""
Takes a list of Layer-class objects and for each layer gets the instruments used (laser, depo chamber and RHEED), returns dictionary with one item per category. This means that if more layers share the same instruments it returns a dictionary with just their names as strings (no lists or sub-dictionaries).
If different layers have different instruments (e.g. laser systems) the user is prompted to only select one.
"""
lasers = []
chambers = []
rheeds = []
elegant_dict = {}
for lyr in layers:
instruments = lyr.get_instruments(apikey)
lasers.append(instruments["laser_system"])
chambers.append(instruments["deposition_chamber"])
rheeds.append(instruments["rheed_system"])
elegant_dict[f"layer_{lyr.layer_number}"] = {
"laser_system": instruments["laser_system"],
"deposition_chamber": instruments["deposition_chamber"],
"rheed_system": instruments["rheed_system"],
}
ded_lasers = list(set(lasers))
ded_chambers = list(set(chambers))
ded_rheeds = list(set(rheeds))
elegant_dict["multilayer"] = {
# Keep key names human readable since they're used in the messages of the following errors
"laser_system": ", ".join(ded_lasers),
"deposition_chamber": ", ".join(ded_chambers),
"rheed_system": ", ".join(ded_rheeds),
} # dictionary's name is a joke
# updated_dict = {} # use this for containing the final dataset
# for ded in elegant_dict:
# if len(elegant_dict[ded]) == 0:
# # if len of list is 0 - empty list - raise error
# raise IndexError(f"Missing data: no Laser System, Chamber and/or RHEED System is specified in any of the Deposition-type experiments related to this sample. Fix this on eLabFTW before retrying. Affected list: {ded}.")
# elif len(elegant_dict[ded]) > 1:
# # if len of list is > 1 - too many values - allow the user to pick one
# print("Warning: different instruments have been used for different layers - which is currently not allowed.")
# # there's a better way to do this but I can't remember now for the life of me...
# i = 0
# while i < len(elegant_dict[ded]):
# print(f"{i} - {elegant_dict[ded][i]}")
# i += 1
# ans = None
# while not type(ans) == int or not ans in range(0, len(elegant_dict[ded])):
# ans = input("Please pick one of the previous (0, 1, ...) [default = 0]: ") or "0"
# if ans.isdigit():
# ans = int(ans)
# continue # unnecessary?
# updated_dict[ded] = elegant_dict[ded][ans]
# elif elegant_dict[ded][0] in ["", 0, None]:
# # if len is 1 BUT value is "", 0 or None raise error
# raise ValueError(f"Missing data: a Laser System, Chamber and/or RHEED System which is specified across all the Deposition-type experiments related to this sample is either empty or invalid. Fix this on eLabFTW before retrying. Affected list: {ded}.")
# else:
# # if none of the previous (only 1 value), that single value is used
# updated_dict[ded] = elegant_dict[ded][0]
# instruments_used_dict = {
# "laser_system": updated_dict["Laser Systems"],
# "deposition_chamber": updated_dict["Deposition Chamber"],
# "rheed_system": updated_dict["RHEED Systems"],
# }
return elegant_dict
### OLD CODE
# if 0 in [ len(i) for i in elegant_list ]:
# # i.e. if length of one of the lists in elegant_list is zero (missing data):
# raise IndexError("Missing data: no Laser System, Chamber and/or RHEED System is specified in any of the Deposition-type experiments related to this sample.")
# if not all([ len(i) == 1 for i in elegant_list ]):
# print("Warning: different instruments have been used for different layers - which is currently not allowed.")
# # for every element in elegant list check if len > 1 and if it is
# print("Selecting the first occurence for every category...")
###
# lasers = { f"layer_{lyr.layer_number}": lyr.laser_system for lyr in layers }
# chambers = { f"layer_{lyr.layer_number}": lyr.deposition_chamber for lyr in layers }
# rheeds = { f"layer_{lyr.layer_number}": lyr.rheed_system for lyr in layers }
# instruments_used_dict = {
# "laser_system": lasers,
# "deposition_chamber": chambers,
# "rheed_system": rheeds,
# }
def analyse_rheed_data(data):
"""
Takes the content of a tsv file and returns a dictionary with timestamps and intensities.
The file should contain a 2D array composed of 4 columns - where the first column is a timestamp and the other three are RHEED intensities - and an unspecified number of rows.
-----
Time Layer1_Int1 Layer1_Int2 Layer1_Int3
-----
Distinct ValueErrors are raised if:
- The array is not 2-dimensional;
- The total number of columns does not equate exactly 1+3 (= 4).
Time is expressed in seconds, intensities are normalized (adimensional).
# TO-DO: complete this description...
Written with help from DeepSeek.
"""
# Verifying the format of the input file:
if data.ndim != 2:
raise ValueError(
f"Unexpected trace format: expected 2D array, got ndim = {data.ndim}."
)
n_cols = data.shape[1] # 0 = rows, 1 = columns
if n_cols > 4:
print(
f"Warning! The input file (for Realtime Window Analysis) has {n_cols - 4} more than needed.\nOnly 4 columns will be considered - with the first representing time and the others representing RHEED intensities."
)
if n_cols < 4:
raise ValueError(
f"Insufficient number of columns: expected 4, got n_cols = {n_cols}."
)
n_time_points = data.shape[0]
# Get time (all rows of col 0) as Float64:
time = data[:, 0].astype(
np.float64, copy=False
) # copy=False suggested by LLM for mem. eff.
# Get intensities (all rows of cols 1,2,3) as Float32:
intensities = data[:, 1:4].astype(np.float32, copy=False)
return {
"time": np.transpose(time),
"intensity": np.transpose(intensities),
}
def make_nexus_schema_dictionary(substrate_object, layers):
"""
Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications.
"""
instruments = deduplicate_instruments_from_layers(layers)
pld_fabrication = {
"sample": {
"substrate": {
"name": substrate_object.name,
"chemical_formula": substrate_object.get_compound_formula(apikey),
"orientation": substrate_object.orientation,
"miscut_angle": {
"value": substrate_object.miscut_angle,
"units": substrate_object.miscut_angle_unit,
},
"miscut_direction": substrate_object.miscut_direction,
"thickness": {
"value": substrate_object.thickness,
"units": substrate_object.thickness_unit,
},
"dimensions": substrate_object.dimensions,
"surface_treatment": substrate_object.surface_treatment,
"manufacturer": substrate_object.manufacturer,
"batch_id": substrate_object.batch_id,
},
"multilayer": {},
},
"instruments_used": instruments["multilayer"],
}
multilayer = pld_fabrication["sample"]["multilayer"]
for layer in layers:
name = "layer_" + layer.layer_number
target_object = chain_layer_to_target(layer)
target_dict = {
"name": target_object.name,
"chemical_formula": target_object.get_compound_formula(apikey),
"description": target_object.description,
"shape": target_object.shape,
"dimensions": target_object.dimensions,
"thickness": {
"value": target_object.thickness,
"units": target_object.thickness_unit,
},
"solid_form": target_object.solid_form,
"manufacturer": target_object.manufacturer,
"batch_id": target_object.name,
# TO-DO: currently not available:
}
multilayer[name] = {
"target": target_dict,
"start_time": layer.start_time,
"operator": layer.operator,
"description": layer.description,
"number_of_pulses": layer.number_of_pulses,
"deposition_time": {
"value": layer.deposition_time,
"units": layer.deposition_time_unit,
},
"temperature": {
"value": layer.temperature,
"units": layer.temperature_unit,
},
"heating_method": layer.heating_method,
"layer_thickness": {
"value": layer.layer_thickness,
"units": layer.layer_thickness_unit,
},
"buffer_gas": layer.buffer_gas,
"process_pressure": {
"value": layer.process_pressure,
"units": layer.process_pressure_unit,
},
"heater_target_distance": {
"value": layer.heater_target_distance,
"units": layer.heater_target_distance_unit,
},
"repetition_rate": {
"value": layer.repetition_rate,
"units": layer.repetition_rate_unit,
},
"laser_fluence": {
"value": layer.laser_fluence,
"units": layer.laser_fluence_unit,
},
"laser_spot_area": {
"value": layer.laser_spot_area,
"units": layer.laser_spot_area_unit,
},
"laser_energy": {
"value": layer.laser_energy,
"units": layer.laser_energy_unit,
},
"laser_rastering": {
"geometry": layer.laser_rastering_geometry,
"positions": layer.laser_rastering_positions,
"velocities": layer.laser_rastering_velocities,
},
"pre_annealing": {
"ambient_gas": layer.pre_annealing_ambient_gas,
"pressure": {
"value": layer.pre_annealing_pressure,
"units": layer.pre_annealing_pressure_unit,
},
"temperature": {
"value": layer.pre_annealing_temperature,
"units": layer.pre_annealing_temperature_unit,
},
"duration": {
"value": layer.pre_annealing_duration,
"units": layer.pre_annealing_duration_unit,
},
},
"post_annealing": {
"ambient_gas": layer.post_annealing_ambient_gas,
"pressure": {
"value": layer.post_annealing_pressure,
"units": layer.post_annealing_pressure_unit,
},
"temperature": {
"value": layer.post_annealing_temperature,
"units": layer.post_annealing_temperature_unit,
},
"duration": {
"value": layer.post_annealing_duration,
"units": layer.post_annealing_duration_unit,
},
},
"instruments_used": instruments[name],
}
return pld_fabrication
def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matrix=None):
# NOTE: look at the mail attachment from Emiliano...
with h5py.File(output_path, "w") as f:
nx_pld_entry = f.create_group("pld_fabrication")
nx_pld_entry.attrs["NX_class"] = "NXentry"
# Sample section
nx_sample = nx_pld_entry.create_group("sample")
nx_sample.attrs["NX_class"] = "NXsample"
sample_dict = pld_fabrication["sample"]
# Substrate sub-section
nx_substrate = nx_sample.create_group("substrate")
nx_substrate.attrs["NX_class"] = "NXsubentry"
substrate_dict = sample_dict["substrate"]
try:
# Substrate fields (datasets)
nx_substrate.create_dataset("name", data=substrate_dict["name"])
nx_substrate.create_dataset(
"chemical_formula", data=substrate_dict["chemical_formula"]
)
nx_substrate.create_dataset(
"orientation", data=substrate_dict["orientation"]
)
nx_substrate.create_dataset(
"miscut_angle", data=substrate_dict["miscut_angle"]["value"]
) # float
nx_substrate["miscut_angle"].attrs["units"] = substrate_dict[
"miscut_angle"
]["units"]
nx_substrate.create_dataset(
"miscut_direction", data=substrate_dict["miscut_direction"]
)
nx_substrate.create_dataset(
"thickness", data=substrate_dict["thickness"]["value"]
) # float/int
nx_substrate["thickness"].attrs["units"] = substrate_dict["thickness"][
"units"
]
nx_substrate.create_dataset("dimensions", data=substrate_dict["dimensions"])
nx_substrate.create_dataset(
"surface_treatment", data=substrate_dict["surface_treatment"]
)
nx_substrate.create_dataset(
"manufacturer", data=substrate_dict["manufacturer"]
)
nx_substrate.create_dataset("batch_id", data=substrate_dict["batch_id"])
except TypeError as te:
# sooner or later I'll handle this too - not today tho
raise TypeError(te)
# Multilayer sub-section
nx_multilayer = nx_sample.create_group("multilayer")
nx_multilayer.attrs["NX_class"] = "NXsubentry"
multilayer_dict = sample_dict["multilayer"]
# Repeat FOR EACH LAYER:
for layer in multilayer_dict:
nx_layer = nx_multilayer.create_group(layer)
nx_layer.attrs["NX_class"] = "NXsubentry"
layer_dict = multilayer_dict[layer]
# Sub-groups of a layer
## Target
nx_target = nx_layer.create_group("target")
nx_target.attrs["NX_class"] = "NXsample"
target_dict = layer_dict["target"]
## Rastering and Annealing
nx_laser_rastering = nx_layer.create_group("laser_rastering")
nx_laser_rastering.attrs["NX_class"] = "NXprocess"
rastering_dict = layer_dict["laser_rastering"]
nx_pre_annealing = nx_layer.create_group("pre_annealing")
nx_pre_annealing.attrs["NX_class"] = "NXprocess"
pre_ann_dict = layer_dict["pre_annealing"]
nx_post_annealing = nx_layer.create_group("post_annealing")
nx_post_annealing.attrs["NX_class"] = "NXprocess"
post_ann_dict = layer_dict["post_annealing"]
nx_layer_instruments = nx_layer.create_group("instruments_used")
nx_layer_instruments.attrs["NX_class"] = "NXinstrument"
layer_instruments_dict = layer_dict["instruments_used"]
## Target metadata
try:
nx_target.create_dataset("name", data=target_dict["name"])
nx_target.create_dataset(
"chemical_formula", data=target_dict["chemical_formula"]
)
nx_target.create_dataset("description", data=target_dict["description"])
nx_target.create_dataset("shape", data=target_dict["shape"])
nx_target.create_dataset("dimensions", data=target_dict["dimensions"])
nx_target.create_dataset(
"thickness", data=target_dict["thickness"]["value"]
) # float/int
nx_target["thickness"].attrs["units"] = target_dict["thickness"][
"units"
]
nx_target.create_dataset("solid_form", data=target_dict["solid_form"])
nx_target.create_dataset(
"manufacturer", data=target_dict["manufacturer"]
)
nx_target.create_dataset("batch_id", data=target_dict["batch_id"])
except TypeError as te:
raise TypeError(te)
## Other layer-specific metadata
try:
nx_layer.create_dataset("start_time", data=layer_dict["start_time"])
nx_layer.create_dataset("operator", data=layer_dict["operator"])
nx_layer.create_dataset(
"number_of_pulses", data=layer_dict["number_of_pulses"]
)
nx_layer.create_dataset(
"deposition_time", data=layer_dict["deposition_time"]["value"]
)
nx_layer["deposition_time"].attrs["units"] = layer_dict[
"deposition_time"
]["units"]
nx_layer.create_dataset(
"repetition_rate", data=layer_dict["repetition_rate"]["value"]
)
nx_layer["repetition_rate"].attrs["units"] = layer_dict[
"repetition_rate"
]["units"]
nx_layer.create_dataset(
"temperature", data=layer_dict["temperature"]["value"]
)
nx_layer["temperature"].attrs["units"] = layer_dict["temperature"][
"units"
]
nx_layer.create_dataset(
"heating_method", data=layer_dict["heating_method"]
)
nx_layer.create_dataset(
"layer_thickness", data=layer_dict["layer_thickness"]["value"]
)
nx_layer["layer_thickness"].attrs["units"] = layer_dict[
"layer_thickness"
]["units"]
nx_layer.create_dataset("buffer_gas", data=layer_dict["buffer_gas"])
nx_layer.create_dataset(
"process_pressure", data=layer_dict["process_pressure"]["value"]
)
nx_layer["process_pressure"].attrs["units"] = layer_dict[
"process_pressure"
]["units"]
nx_layer.create_dataset(
"heater_target_distance",
data=layer_dict["heater_target_distance"]["value"],
)
nx_layer["heater_target_distance"].attrs["units"] = layer_dict[
"heater_target_distance"
]["units"]
nx_layer.create_dataset(
"laser_fluence", data=layer_dict["laser_fluence"]["value"]
)
nx_layer["laser_fluence"].attrs["units"] = layer_dict["laser_fluence"][
"units"
]
nx_layer.create_dataset(
"laser_spot_area", data=layer_dict["laser_spot_area"]["value"]
)
nx_layer["laser_spot_area"].attrs["units"] = layer_dict[
"laser_spot_area"
]["units"]
nx_layer.create_dataset(
"laser_energy", data=layer_dict["laser_energy"]["value"]
)
nx_layer["laser_energy"].attrs["units"] = layer_dict["laser_energy"][
"units"
]
except TypeError as te:
raise TypeError(te)
## Rastering metadata
try:
nx_laser_rastering.create_dataset(
"geometry", data=rastering_dict["geometry"]
)
nx_laser_rastering.create_dataset(
"positions", data=rastering_dict["positions"]
)
nx_laser_rastering.create_dataset(
"velocities", data=rastering_dict["velocities"]
)
except TypeError as te:
raise TypeError(te)
## Annealing metadata
try:
nx_pre_annealing.create_dataset(
"ambient_gas", data=pre_ann_dict["ambient_gas"]
)
nx_pre_annealing.create_dataset(
"pressure", data=pre_ann_dict["pressure"]["value"]
)
nx_pre_annealing["pressure"].attrs["units"] = pre_ann_dict["pressure"][
"units"
]
nx_pre_annealing.create_dataset(
"temperature", data=pre_ann_dict["temperature"]["value"]
)
nx_pre_annealing["temperature"].attrs["units"] = pre_ann_dict[
"temperature"
]["units"]
nx_pre_annealing.create_dataset(
"duration", data=pre_ann_dict["duration"]["value"]
)
nx_pre_annealing["duration"].attrs["units"] = pre_ann_dict["duration"][
"units"
]
except TypeError as te:
raise TypeError(te)
try:
nx_post_annealing.create_dataset(
"ambient_gas", data=post_ann_dict["ambient_gas"]
)
nx_post_annealing.create_dataset(
"pressure", data=post_ann_dict["pressure"]["value"]
)
nx_post_annealing["pressure"].attrs["units"] = post_ann_dict[
"pressure"
]["units"]
nx_post_annealing.create_dataset(
"temperature", data=post_ann_dict["temperature"]["value"]
)
nx_post_annealing["temperature"].attrs["units"] = post_ann_dict[
"temperature"
]["units"]
nx_post_annealing.create_dataset(
"duration", data=post_ann_dict["duration"]["value"]
)
nx_post_annealing["duration"].attrs["units"] = post_ann_dict[
"duration"
]["units"]
except TypeError as te:
raise TypeError(te)
try:
nx_layer_instruments.create_dataset(
"laser_system", data=layer_instruments_dict["laser_system"]
)
nx_layer_instruments.create_dataset(
"deposition_chamber",
data=layer_instruments_dict["deposition_chamber"],
)
nx_layer_instruments.create_dataset(
"rheed_system", data=layer_instruments_dict["rheed_system"]
)
except TypeError as te:
raise TypeError(te)
# Instruments used section
nx_instruments = nx_pld_entry.create_group("instruments_used")
nx_instruments.attrs["NX_class"] = "NXinstrument"
instruments_dict = pld_fabrication["instruments_used"]
try:
nx_instruments.create_dataset(
"laser_system", data=instruments_dict["laser_system"]
)
nx_instruments.create_dataset(
"deposition_chamber", data=instruments_dict["deposition_chamber"]
)
nx_instruments.create_dataset(
"rheed_system", data=instruments_dict["rheed_system"]
)
except TypeError as te:
raise TypeError(te)
# RHEED data section
nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata"
if rheed_osc is not None:
# Asse temporale
t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"])
t_ds.attrs["units"] = "s"
t_ds.attrs["long_name"] = "Time"
# Intensità: shape (n_layers, n_timepoints, 3)
i_ds = nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
i_ds.attrs["units"] = "a.u."
i_ds.attrs["long_name"] = "RHEED Intensity"
# Attributi NXdata — notazione NeXus 3.x corretta
nx_rheed.attrs["signal"] = "intensity"
nx_rheed.attrs["axes"] = [
".",
"time",
".",
] # solo l'asse 1 (time) è denominato
nx_rheed.attrs["time_indices"] = np.array([1], dtype=np.int32)
# ###########
# nx_rheed = nx_pld_entry.create_group("rheed_data")
# nx_rheed.attrs["NX_class"] = "NXdata"
# nx_rheed.create_dataset("time", data=rheed_osc["time"])
# nx_rheed["time"].attrs["units"] = "s"
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
# #nx_rheed["intensity"].attrs["units"] = "counts"
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
# nx_rheed.attrs["signal"] = "intensity"
# nx_rheed.attrs["axes"] = "layer:time:channel"
# nx_rheed.attrs["layer_indices"] = [0] # asse layer
# nx_rheed.attrs["time_indices"] = [1] # asse tempo
# nx_rheed.attrs["channel_indices"] = [2]
if heatmap_matrix is not None:
heatmap = nx_rheed.create_dataset("diffraction_image", data=heatmap_matrix)
heatmap.attrs["long_name"] = "Diffraction Image"
heatmap.attrs["units"] = "a.u."
# this is of my own initiative. good???
heatmap.attrs["interpretation"] = "spectrum"
# suggested by DeepSeek, useful? probably not.
# heatmap.attrs["suggested_colormap"] = "inferno"
# heatmap.attrs["scale_min"] = 0.0
# heatmap.attrs["scale_max"] = 1.0
return
if __name__ == "__main__":
# TO-DO: place the API base URL somewhere else.
ELABFTW_API_URL = "https://elabftw.fisica.unina.it/api/v2"
apikey = getpass("Paste API key here: ")
elabid = input("Enter elabid of your starting sample [default = 1111]: ") or 1111
data = APIHandler(apikey).get_entry_from_elabid(elabid)
sample = Entrypoint(data)
sample_name = sample.name.strip().replace(" ", "_")
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
n_layers = len(layers) # total number of layers on the sample
result = make_nexus_schema_dictionary(substrate_object, layers)
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
with open(f"output/sample-{sample_name}.json", "w") as f:
json.dump(result, f, indent=3)
# TO-DO: remove the hard-coded path of the RWA file
# ideally the script should download a TXT/CSV file from each layer
# (IF PRESENT ←→ also handle missing file error)
# and merge all data in a single file to analyse it
# WARNING: fails if file is missing
with open("tests/Realtime_Window_Analysis.txt", "r") as o:
osc = np.loadtxt(o, delimiter="\t")
try:
rheed_osc = (
analyse_rheed_data(data=osc) or None
) # analyze rheed data first, build the file later
except ValueError as ve:
raise ValueError(
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
)
# This one tries to open a png image.
# Emiliano said to keep it to one image per layer tops.
# In this test I will only consider one image.
# TO-DO: make it format-agnostic. If not possible, make it PNG-only.
if os.path.isfile("tests/LAO_16min50s_736C_STO.bmp"): # if BMP
# if os.path.isfile("tests/LAO_16min50s_736C_STO.png"): # if PNG
img = Image.open("tests/LAO_16min50s_736C_STO.bmp").convert("L")
mx = np.array(img, dtype=np.uint8)
# mx = mx.astype(np.float32) / 255.0 # consider deleting???
build_nexus_file(
result,
output_path=f"output/sample-{sample_name}-nexus.h5",
rheed_osc=rheed_osc,
heatmap_matrix=mx,
)