MAJOR: fundamental functions of the parser are ready and tested!

TO-DO:
1. follow the "TO-DO" comments to clean the code
2. filename should be NFFA-DI compliant like:
	nffa-di_NA01_Napoli_Na-26-015.h5
3. rheed data analysis should take two distinct functions
   one for the raw stream and one for the image
4. if time allows: consider moving most of main.py in separate modules
This commit is contained in:
2026-05-12 15:38:06 +02:00
parent df927b7c0e
commit 19a802694f
7 changed files with 76008 additions and 94 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 301 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 301 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@@ -85,6 +85,7 @@ class APIHandler:
"""
Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
Only returns its binary data. Use method download_attachment_to_disk to save to file.
NOTE: Output is a dictionary where:
* The key is the attachment's filename;
* The value is the attachment's binary data.
@@ -100,7 +101,7 @@ class APIHandler:
)
config = elabapi.Configuration()
config.api_key["api_key"] = api_key
config.api_key["api_key"] = self.api_key
config.api_key_prefix["api_key"] = "Authorization"
config.host = self.elaburl
config.debug = False
@@ -132,7 +133,7 @@ class APIHandler:
"""
Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
Downloads their binary data through method download_attachments_data and dumps it to dump_dir.
Returns nothing on success.
Returns full path of the output file.
Args:
elabid: eLabFTW internal ID of the selected resource.
@@ -151,9 +152,10 @@ class APIHandler:
uploads = self.download_attachment_data(elabid, upload_id, entryType=entryType)
for file in uploads:
raw_data = uploads[file]
with open(os.path.join(dump_dir, f"exp{elabid}-{file}"), "wb") as f:
full_path = os.path.join(dump_dir, f"exp{elabid}-{file}")
with open(full_path, "wb") as f:
f.write(raw_data)
return
return full_path
# Testing methods

View File

@@ -15,7 +15,7 @@ def call_entrypoint_from_elabid(elabid):
If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError.
"""
try:
sample_data = APIHandler(apikey).get_entry_from_elabid(
sample_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="items"
)
if not sample_data.get("category_title") == "Sample":
@@ -36,7 +36,7 @@ def call_material_from_elabid(elabid):
Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted.
"""
try:
material_data = APIHandler(apikey).get_entry_from_elabid(
material_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="items"
)
material_category = material_data.get("category_title")
@@ -64,7 +64,7 @@ def call_layers_from_list(elabid_list):
list_of_layers = []
for elabid in elabid_list:
try:
layer_data = APIHandler(apikey).get_entry_from_elabid(
layer_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="experiments"
)
if not layer_data.get("category_title") == "PLD Deposition":
@@ -134,7 +134,7 @@ def deduplicate_instruments_from_layers(layers):
rheeds = []
elegant_dict = {}
for lyr in layers:
instruments = lyr.get_instruments(apikey)
instruments = lyr.get_instruments(api_key)
lasers.append(instruments["laser_system"])
chambers.append(instruments["deposition_chamber"])
rheeds.append(instruments["rheed_system"])
@@ -222,10 +222,15 @@ def select_rheed_data(layer):
"related_experiment": elabid
}
"""
n = layer.layer_number
textual_uploads = layer.fetch_textual_uploads()
images = layer.fetch_images()
# Check for length. Three cases:
# 1. len is 0, no file of this category → return {}
# 2. len is more than 1, user must select
# 3. len is 1, God's in his heaven, all's right with the world
if len(textual_uploads) == 0:
rheed_data_file = {}
elif len(textual_uploads) > 1:
@@ -248,8 +253,12 @@ def select_rheed_data(layer):
continue
rheed_data_file = textual_uploads[ans] # still a dictionary
else:
rheed_data_file = textual_uploads[0]
rheed_data_file = textual_uploads[
next(iter(textual_uploads))
] # this prism of pork gets the value of the only key in the dictionary
# it's proof like no other that my code is human-generated, and that I suck at coding. It's hubris manifest.
# As above so below
if len(images) == 0:
rheed_image_file = {}
elif len(images) > 1:
@@ -272,7 +281,7 @@ def select_rheed_data(layer):
continue
rheed_image_file = images[ans] # still a dictionary
else:
rheed_image_file = images[0]
rheed_image_file = images[next(iter(images))]
return (rheed_data_file, rheed_image_file)
@@ -343,7 +352,7 @@ def make_nexus_schema_dictionary(substrate_object, layers):
"sample": {
"substrate": {
"name": substrate_object.name,
"chemical_formula": substrate_object.get_compound_formula(apikey),
"chemical_formula": substrate_object.get_compound_formula(api_key),
"orientation": substrate_object.orientation,
"miscut_angle": {
"value": substrate_object.miscut_angle,
@@ -371,7 +380,7 @@ def make_nexus_schema_dictionary(substrate_object, layers):
target_object = chain_layer_to_target(layer)
target_dict = {
"name": target_object.name,
"chemical_formula": target_object.get_compound_formula(apikey),
"chemical_formula": target_object.get_compound_formula(api_key),
"description": target_object.description,
"shape": target_object.shape,
"dimensions": target_object.dimensions,
@@ -465,11 +474,16 @@ def make_nexus_schema_dictionary(substrate_object, layers):
},
"instruments_used": instruments[name],
}
rheed_data[name] = {}
rheed_data[name] = {
"layer_number": layer.layer_number,
"data": select_rheed_data(
layer
), # tuple: (rheed_data_file, rheed_image_file)
}
return pld_fabrication
def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matrix=None):
def build_nexus_file(pld_fabrication, output_path):
# NOTE: look at the mail attachment from Emiliano...
with h5py.File(output_path, "w") as f:
nx_pld_entry = f.create_group("pld_fabrication")
@@ -735,77 +749,132 @@ def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matri
nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata"
# here's what we gon do: (to be read with the voice of Mike from Breaking Bad)
# 1. rheed_osc and heatmap_matrix are NOT given in input to the function so no need for checking that
# 2. loop through the layers, each with its elabid and metadata
# 2a. read said metadata for each layer, print list of txt and png files (dedicated Layer class methods)
# 2b. prompt the user for file choice (1 text file per layer - in tsv format, 1 picture file - either png [default] or bmp)
# 2c. download the chosen file
# 2d. with chosen file do analysis as before
# 3. the schema should be:
# * /rheed_data
# * /layer_n
# * time (rheed_osc)
# * intensity (rheed_osc)
# * diffraction_image (heatmap_matrix)
# first problem is probably finding out how to recover the following meta from the original Layer object:
# * Layer.elabid - integer
# * Layer.fetch_textual_uploads() - dictionary
# * Layer.fetch_images() - dictionary
rheed_data = pld_fabrication["rheed_data"]
for layer in rheed_data:
nx_rheed_layer = nx_rheed.create_group(layer)
if rheed_osc is not None:
# Asse temporale
t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"])
t_ds.attrs["units"] = "s"
t_ds.attrs["long_name"] = "Time"
layer_dict = rheed_data[layer]
n = layer_dict["layer_number"]
rheed_data_file = layer_dict["data"][0] # first in the tuple
rheed_image_file = layer_dict["data"][1] # second in the tuple
handler = APIHandler(api_key)
# Intensità: shape (n_layers, n_timepoints, 3)
i_ds = nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
i_ds.attrs["units"] = "a.u."
i_ds.attrs["long_name"] = "RHEED Intensity"
# TO-DO: maybe make a dedicated function???
data_path = None
image_path = None
# Attributi NXdata — notazione NeXus 3.x corretta
nx_rheed.attrs["signal"] = "intensity"
nx_rheed.attrs["axes"] = [
".",
"time",
".",
] # solo l'asse 1 (time) è denominato
nx_rheed.attrs["time_indices"] = np.array([1], dtype=np.int32)
# ###########
# nx_rheed = nx_pld_entry.create_group("rheed_data")
# nx_rheed.attrs["NX_class"] = "NXdata"
if rheed_data_file != {}:
try:
elabid = rheed_data_file["related_experiment"]
upload_id = rheed_data_file["id"]
except KeyError as ke:
raise KeyError(
f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
)
data_path = handler.download_attachment_to_disk(
elabid=elabid, upload_id=upload_id
)
# nx_rheed.create_dataset("time", data=rheed_osc["time"])
# nx_rheed["time"].attrs["units"] = "s"
if rheed_image_file != {}:
try:
upload_id = rheed_image_file["id"]
elabid = rheed_image_file["related_experiment"]
except KeyError as ke:
raise KeyError(
f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
)
image_path = handler.download_attachment_to_disk(
elabid=elabid, upload_id=upload_id
)
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
# #nx_rheed["intensity"].attrs["units"] = "counts"
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
# nx_rheed.attrs["signal"] = "intensity"
# nx_rheed.attrs["axes"] = "layer:time:channel"
# nx_rheed.attrs["layer_indices"] = [0] # asse layer
# nx_rheed.attrs["time_indices"] = [1] # asse tempo
# nx_rheed.attrs["channel_indices"] = [2]
if heatmap_matrix is not None:
heatmap = nx_rheed.create_dataset("diffraction_image", data=heatmap_matrix)
heatmap.attrs["long_name"] = "Diffraction Image"
heatmap.attrs["units"] = "a.u."
# this is of my own initiative. good???
heatmap.attrs["interpretation"] = "spectrum"
# suggested by DeepSeek, useful? probably not.
# heatmap.attrs["suggested_colormap"] = "inferno"
# heatmap.attrs["scale_min"] = 0.0
# heatmap.attrs["scale_max"] = 1.0
if data_path and os.path.isfile(data_path):
with open(data_path, "r") as o:
osc = np.loadtxt(o, delimiter="\t")
try:
rheed_osc = (
analyse_rheed_data(data=osc) or None
) # analyze rheed data first, build the file later
except ValueError as ve:
raise ValueError(
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
)
if rheed_osc is not None:
# Time axis (needed?)
t_ds = nx_rheed_layer.create_dataset("time", data=rheed_osc["time"])
t_ds.attrs["units"] = "s"
t_ds.attrs["long_name"] = "Time"
# Intensity shape (n_layers, n_timepoints, 3)
i_ds = nx_rheed_layer.create_dataset(
"intensity", data=rheed_osc["intensity"]
)
i_ds.attrs["units"] = "a.u."
i_ds.attrs["long_name"] = "RHEED Intensity"
# NXdata attributes — NeXus 3.x notation
nx_rheed_layer.attrs["signal"] = "intensity"
nx_rheed_layer.attrs["axes"] = [
".",
"time",
".",
] # only time axis (1) is named
nx_rheed_layer.attrs["time_indices"] = np.array([1], dtype=np.int32)
if image_path and os.path.isfile(image_path):
img = Image.open(image_path).convert("L")
heatmap_matrix = np.array(img, dtype=np.uint8) # or None
if heatmap_matrix is not None:
heatmap = nx_rheed_layer.create_dataset(
"diffraction_image", data=heatmap_matrix
)
heatmap.attrs["long_name"] = "Diffraction Image"
heatmap.attrs["units"] = "a.u."
heatmap.attrs["interpretation"] = "spectrum"
return
# TO-DO: ↓↓↓ comment cleanup ↓↓↓
#
# here's what we gon do: (to be read with the voice of Mike from Breaking Bad)
# 1. rheed_osc and heatmap_matrix are NOT given in input to the function so no need for checking that
# 2. loop through the layers, each with its elabid and metadata
# 2a. read said metadata for each layer, print list of txt and png files (dedicated Layer class methods)
# 2b. prompt the user for file choice (1 text file per layer - in tsv format, 1 picture file - either png [default] or bmp)
# 2c. download the chosen file
# 2d. with chosen file do analysis as before
# 3. the schema should be:
# * /rheed_data
# * /layer_n
# * time (rheed_osc)
# * intensity (rheed_osc)
# * diffraction_image (heatmap_matrix)
# first problem is probably finding out how to recover the following meta from the original Layer object:
# * Layer.elabid - integer
# * Layer.fetch_textual_uploads() - dictionary
# * Layer.fetch_images() - dictionary
# nx_rheed = nx_pld_entry.create_group("rheed_data")
# nx_rheed.attrs["NX_class"] = "NXdata"
# nx_rheed.create_dataset("time", data=rheed_osc["time"])
# nx_rheed["time"].attrs["units"] = "s"
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
# #nx_rheed["intensity"].attrs["units"] = "counts"
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
# nx_rheed.attrs["signal"] = "intensity"
# nx_rheed.attrs["axes"] = "layer:time:channel"
# nx_rheed.attrs["layer_indices"] = [0] # asse layer
# nx_rheed.attrs["time_indices"] = [1] # asse tempo
# nx_rheed.attrs["channel_indices"] = [2]
if __name__ == "__main__":
# TO-DO: place the API base URL somewhere else.
ELABFTW_API_URL = "https://elabftw.fisica.unina.it/api/v2"
apikey = getpass("Paste API key here: ")
api_key = getpass("Paste API key here: ")
elabid = input("Enter elabid of your starting sample [default = 1111]: ") or 1111
data = APIHandler(apikey).get_entry_from_elabid(elabid)
handler = APIHandler(api_key)
data = handler.get_entry_from_elabid(elabid)
sample = Entrypoint(data)
sample_name = sample.name.strip().replace(" ", "_")
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
@@ -821,28 +890,9 @@ if __name__ == "__main__":
# and merge all data in a single file to analyse it
# WARNING: fails if file is missing
with open("tests/Realtime_Window_Analysis.txt", "r") as o:
osc = np.loadtxt(o, delimiter="\t")
try:
rheed_osc = (
analyse_rheed_data(data=osc) or None
) # analyze rheed data first, build the file later
except ValueError as ve:
raise ValueError(
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
)
# This one tries to open a png image.
# Emiliano said to keep it to one image per layer tops.
# In this test I will only consider one image.
# TO-DO: make it format-agnostic. If not possible, make it PNG-only.
if os.path.isfile("tests/LAO_16min50s_736C_STO.bmp"): # if BMP
# if os.path.isfile("tests/LAO_16min50s_736C_STO.png"): # if PNG
img = Image.open("tests/LAO_16min50s_736C_STO.bmp").convert("L")
mx = np.array(img, dtype=np.uint8)
# mx = mx.astype(np.float32) / 255.0 # consider deleting???
build_nexus_file(
result,
output_path=f"output/sample-{sample_name}-nexus.h5",
rheed_osc=rheed_osc,
heatmap_matrix=mx,
)
# mx = mx.astype(np.float32) / 255.0 # consider deleting???
build_nexus_file(result, output_path=f"output/sample-{sample_name}-nexus.nx")