unfinished work

This commit is contained in:
2026-05-12 12:54:16 +02:00
parent c5b17bb3f8
commit 07aac3e6b3
3 changed files with 115 additions and 8 deletions

View File

@@ -80,7 +80,7 @@ class APIHandler:
entry_data = response.json() entry_data = response.json()
return entry_data return entry_data
def download_attachments_data(self, elabid, entryType="experiments"): def download_all_attachments_data(self, elabid, entryType="experiments"):
""" """
Downloads attachments of a certain eLabFTW experiment (default) or item. Downloads attachments of a certain eLabFTW experiment (default) or item.
Only returns their binary data. Use method download_attachments_to_disk to save to file. Only returns their binary data. Use method download_attachments_to_disk to save to file.
@@ -142,7 +142,7 @@ class APIHandler:
"You can only download attachments from experiments or items." "You can only download attachments from experiments or items."
) )
uploads = download_attachments_data(elabid, entryType=entryType) uploads = download_all_attachments_data(elabid, entryType=entryType)
for file in uploads: for file in uploads:
raw_data = uploads["file"] raw_data = uploads["file"]
with open(os.path.join(dump_dir, f"exp{elabid}-{file}"), "wb") as f: with open(os.path.join(dump_dir, f"exp{elabid}-{file}"), "wb") as f:

View File

@@ -155,8 +155,12 @@ class Layer:
Data is already in layer_data, so the API key is unrequired. Same goes for: Data is already in layer_data, so the API key is unrequired. Same goes for:
* fetch_textual_uploads() - no arguments; * fetch_textual_uploads() - no arguments;
* fetch_images() - no arguments. * fetch_images() - no arguments.
Exception: returns {} (empty dictionary) if no uploads/attachments on Layer.
""" """
# Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint. # Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint.
if self.uploads == []:
return {}
attachments = { attachments = {
attachment["id"]: { attachment["id"]: {
"filename": attachment["real_name"], "filename": attachment["real_name"],
@@ -180,7 +184,7 @@ class Layer:
textual_uploads = { textual_uploads = {
attachment: attachments[attachment] attachment: attachments[attachment]
for attachment in attachments for attachment in attachments
if attachments[attachments]["filename"][-4:] in (".txt", ".csv", ".tsv") if attachments[attachment]["filename"][-4:] in (".txt", ".csv", ".tsv")
} }
return textual_uploads return textual_uploads
@@ -195,12 +199,12 @@ class Layer:
That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename. That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename.
""" """
attachments = self.list_attachments() attachments = self.list_attachments()
pictures = { images = {
attachment: attachments[attachment] attachment: attachments[attachment]
for attachment in attachments for attachment in attachments
if attachments[attachments]["filename"][-4:] in (".png", ".bmp") if attachments[attachment]["filename"][-4:] in (".png", ".bmp")
} }
return pictures return images
class Entrypoint: class Entrypoint:

View File

@@ -204,6 +204,84 @@ def deduplicate_instruments_from_layers(layers):
# } # }
def select_rheed_data(layer):
"""
Takes a Layer-class object and selects the attachments to use to create the RHEED dataset for the NeXus file.
There are two categories of attachments considered: text-files and pictures.
The only accepted formats are ".txt", ".csv" and ".tsv" for the first ones, and ".png" or ".bmp" for the others.
The function is extension-sensitive, and only one attachment for each category will be downloaded.
If there are more than one attachment for each category, the user is prompted to select one of them from a list.
If there are no attachments for a category the function will return {} (empty dictionary) for that category.
Returns the set: (rheed_data_file, rheed_image_file). Both variables are dictionaries in the following format:
{
"fullname": real_name (with extension),
"hashname": long_name (with extension),
"related_experiment": elabid
}
"""
n = layer.layer_number
textual_uploads = layer.fetch_textual_uploads()
images = layer.fetch_images()
if len(textual_uploads) == 0:
rheed_data_file = {}
elif len(textual_uploads) > 1:
# prompt user to select from list
print(f"Attention: Layer {n} contains multiple TEXTUAL attachments.\n")
print("These are used to populate the 'RHEED intensities' dataset.")
print("=== USER INTERVENTION REQUIRED ===")
for id in textual_uploads:
print(f"{id} - {textual_uploads[id]}")
ans = None
while not type(ans) == int or not ans in range(0, len(textual_uploads)):
ans = (
input(
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
)
or 0
)
if ans.isdigit():
ans = int(ans)
continue
rheed_data_file = textual_uploads[ans] # still a dictionary
else:
rheed_data_file = textual_uploads[0]
if len(images) == 0:
rheed_image_file = {}
elif len(images) > 1:
# prompt user to select from list
print(f"Attention: Layer {n} contains multiple PNG/BMP attachments.\n")
print("These are used to create the RHEED heatmap.")
print("=== USER INTERVENTION REQUIRED ===")
for id in images:
print(f"{id} - {images[id]}")
ans = None
while not type(ans) == int or not ans in range(0, len(images)):
ans = (
input(
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
)
or 0
)
if ans.isdigit():
ans = int(ans)
continue
rheed_image_file = images[ans] # still a dictionary
else:
rheed_image_file = images[0]
return (rheed_data_file, rheed_image_file)
def download_rheed_data():
return
def analyse_rheed_data(data): def analyse_rheed_data(data):
""" """
Takes the content of a tsv file and returns a dictionary with timestamps and intensities. Takes the content of a tsv file and returns a dictionary with timestamps and intensities.
@@ -236,7 +314,7 @@ def analyse_rheed_data(data):
raise ValueError( raise ValueError(
f"Insufficient number of columns: expected 4, got n_cols = {n_cols}." f"Insufficient number of columns: expected 4, got n_cols = {n_cols}."
) )
n_time_points = data.shape[0] # n_time_points = data.shape[0]
# Get time (all rows of col 0) as Float64: # Get time (all rows of col 0) as Float64:
time = data[:, 0].astype( time = data[:, 0].astype(
@@ -254,7 +332,11 @@ def analyse_rheed_data(data):
def make_nexus_schema_dictionary(substrate_object, layers): def make_nexus_schema_dictionary(substrate_object, layers):
""" """
Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications. Main function, takes all the other functions to reconstruct the full dataset.
Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function),
and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function).
Returns dictionary with the same schema as the NeXus standard for PLD fabrications.
""" """
instruments = deduplicate_instruments_from_layers(layers) instruments = deduplicate_instruments_from_layers(layers)
pld_fabrication = { pld_fabrication = {
@@ -280,8 +362,10 @@ def make_nexus_schema_dictionary(substrate_object, layers):
"multilayer": {}, "multilayer": {},
}, },
"instruments_used": instruments["multilayer"], "instruments_used": instruments["multilayer"],
"rheed_data": {},
} }
multilayer = pld_fabrication["sample"]["multilayer"] multilayer = pld_fabrication["sample"]["multilayer"]
rheed_data = pld_fabrication["rheed_data"]
for layer in layers: for layer in layers:
name = "layer_" + layer.layer_number name = "layer_" + layer.layer_number
target_object = chain_layer_to_target(layer) target_object = chain_layer_to_target(layer)
@@ -381,6 +465,7 @@ def make_nexus_schema_dictionary(substrate_object, layers):
}, },
"instruments_used": instruments[name], "instruments_used": instruments[name],
} }
rheed_data[name] = {}
return pld_fabrication return pld_fabrication
@@ -650,6 +735,24 @@ def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matri
nx_rheed = nx_pld_entry.create_group("rheed_data") nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata" nx_rheed.attrs["NX_class"] = "NXdata"
# here's what we gon do: (to be read with the voice of Mike from Breaking Bad)
# 1. rheed_osc and heatmap_matrix are NOT given in input to the function so no need for checking that
# 2. loop through the layers, each with its elabid and metadata
# 2a. read said metadata for each layer, print list of txt and png files (dedicated Layer class methods)
# 2b. prompt the user for file choice (1 text file per layer - in tsv format, 1 picture file - either png [default] or bmp)
# 2c. download the chosen file
# 2d. with chosen file do analysis as before
# 3. the schema should be:
# * /rheed_data
# * /layer_n
# * time (rheed_osc)
# * intensity (rheed_osc)
# * diffraction_image (heatmap_matrix)
# first problem is probably finding out how to recover the following meta from the original Layer object:
# * Layer.elabid - integer
# * Layer.fetch_textual_uploads() - dictionary
# * Layer.fetch_images() - dictionary
if rheed_osc is not None: if rheed_osc is not None:
# Asse temporale # Asse temporale
t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"]) t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"])