Compare commits
5 Commits
v0.1.0
...
f84478a7a4
| Author | SHA256 | Date | |
|---|---|---|---|
| f84478a7a4 | |||
| 19a802694f | |||
| df927b7c0e | |||
| ccf74fca26 | |||
| 07aac3e6b3 |
BIN
output/attachments/exp56-LAO_16min50s_736C_STO.bmp
Normal file
BIN
output/attachments/exp56-LAO_16min50s_736C_STO.bmp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 301 KiB |
37931
output/attachments/exp56-Real-time Window Analysis ( Peak Int. ).txt
Normal file
37931
output/attachments/exp56-Real-time Window Analysis ( Peak Int. ).txt
Normal file
File diff suppressed because it is too large
Load Diff
BIN
output/attachments/exp58-Image10.bmp
Normal file
BIN
output/attachments/exp58-Image10.bmp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 301 KiB |
37931
output/attachments/exp58-Realtime_Window_Analysis_Noise.txt
Normal file
37931
output/attachments/exp58-Realtime_Window_Analysis_Noise.txt
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,5 @@
|
|||||||
import os, requests
|
import os, requests
|
||||||
|
from getpass import getpass
|
||||||
import elabapi_python as elabapi
|
import elabapi_python as elabapi
|
||||||
|
|
||||||
|
|
||||||
@@ -80,16 +81,18 @@ class APIHandler:
|
|||||||
entry_data = response.json()
|
entry_data = response.json()
|
||||||
return entry_data
|
return entry_data
|
||||||
|
|
||||||
def download_attachments_data(self, elabid, entryType="experiments"):
|
def download_attachment_data(self, elabid, upload_id, entryType="experiments"):
|
||||||
"""
|
"""
|
||||||
Downloads attachments of a certain eLabFTW experiment (default) or item.
|
Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
|
||||||
Only returns their binary data. Use method download_attachments_to_disk to save to file.
|
Only returns its binary data. Use method download_attachment_to_disk to save to file.
|
||||||
|
|
||||||
NOTE: Output is a dictionary where:
|
NOTE: Output is a dictionary where:
|
||||||
* The keys are the attachments' filenames;
|
* The key is the attachment's filename;
|
||||||
* The values are the binary data for those attachments.
|
* The value is the attachment's binary data.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
elabid: eLabFTW internal ID of the selected resource.
|
elabid: eLabFTW internal ID of the selected resource.
|
||||||
|
upload_id: eLabFTW internal ID of the selected upload.
|
||||||
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error.
|
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error.
|
||||||
"""
|
"""
|
||||||
if entryType not in ["experiments", "items"]:
|
if entryType not in ["experiments", "items"]:
|
||||||
@@ -98,7 +101,7 @@ class APIHandler:
|
|||||||
)
|
)
|
||||||
|
|
||||||
config = elabapi.Configuration()
|
config = elabapi.Configuration()
|
||||||
config.api_key["api_key"] = api_key
|
config.api_key["api_key"] = self.api_key
|
||||||
config.api_key_prefix["api_key"] = "Authorization"
|
config.api_key_prefix["api_key"] = "Authorization"
|
||||||
config.host = self.elaburl
|
config.host = self.elaburl
|
||||||
config.debug = False
|
config.debug = False
|
||||||
@@ -108,29 +111,33 @@ class APIHandler:
|
|||||||
)
|
)
|
||||||
uploads_api = elabapi.UploadsApi(api_client)
|
uploads_api = elabapi.UploadsApi(api_client)
|
||||||
|
|
||||||
# Actual uploads (dictionary):
|
# Scans through the attachments and selects the one with corresponing ID.
|
||||||
uploads = {
|
attachment = {
|
||||||
upload.real_name: uploads_api.read_upload(
|
upload.real_name: uploads_api.read_upload(
|
||||||
entryType, elabid, upload.id, format="binary", _preload_content=False
|
entryType, elabid, upload_id, format="binary", _preload_content=False
|
||||||
).data
|
).data
|
||||||
for upload in uploads_api.read_uploads(entryType, elabid)
|
for upload in uploads_api.read_uploads(entryType, elabid)
|
||||||
|
if upload.id == upload_id
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploads
|
return attachment
|
||||||
|
|
||||||
def download_attachments_to_disk(
|
def download_attachment_to_disk(
|
||||||
self,
|
self,
|
||||||
elabid,
|
elabid,
|
||||||
|
upload_id,
|
||||||
entryType="experiments",
|
entryType="experiments",
|
||||||
dump_dir="output/attachments",
|
dump_dir="output/attachments",
|
||||||
# persistent=True,
|
# persistent=True,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Downloads attachments of a certain eLabFTW experiment (default) or item.
|
Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
|
||||||
Downloads their binary data through method download_attachments_data and dumps it to dump_dir.
|
Downloads their binary data through method download_attachments_data and dumps it to dump_dir.
|
||||||
|
Returns full path of the output file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
elabid: eLabFTW internal ID of the selected resource.
|
elabid: eLabFTW internal ID of the selected resource.
|
||||||
|
upload_id: eLabFTW internal ID of the selected upload.
|
||||||
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error.
|
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error.
|
||||||
dump_dir: Directory to which to save the attachments. Default is "output/attachments".
|
dump_dir: Directory to which to save the attachments. Default is "output/attachments".
|
||||||
persistent: [Unused] Decides if the files will stay on disk after all operations are completed.
|
persistent: [Unused] Decides if the files will stay on disk after all operations are completed.
|
||||||
@@ -142,9 +149,17 @@ class APIHandler:
|
|||||||
"You can only download attachments from experiments or items."
|
"You can only download attachments from experiments or items."
|
||||||
)
|
)
|
||||||
|
|
||||||
uploads = download_attachments_data(elabid, entryType=entryType)
|
uploads = self.download_attachment_data(elabid, upload_id, entryType=entryType)
|
||||||
for file in uploads:
|
for file in uploads:
|
||||||
raw_data = uploads["file"]
|
raw_data = uploads[file]
|
||||||
with open(os.path.join(dump_dir, f"exp{elabid}-{file}"), "wb") as f:
|
full_path = os.path.join(dump_dir, f"exp{elabid}-{file}")
|
||||||
|
with open(full_path, "wb") as f:
|
||||||
f.write(raw_data)
|
f.write(raw_data)
|
||||||
return
|
return full_path
|
||||||
|
|
||||||
|
|
||||||
|
# Testing methods
|
||||||
|
if __name__ == "__main__":
|
||||||
|
api_key = getpass("Paste API key here [no echo]: ")
|
||||||
|
handler = APIHandler(api_key=api_key)
|
||||||
|
handler.download_attachment_to_disk(elabid=58, upload_id=81)
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import os, json, requests
|
import os, json, requests
|
||||||
|
from getpass import getpass
|
||||||
from APIHandler import APIHandler
|
from APIHandler import APIHandler
|
||||||
|
|
||||||
|
|
||||||
@@ -149,16 +150,21 @@ class Layer:
|
|||||||
def list_attachments(self):
|
def list_attachments(self):
|
||||||
"""
|
"""
|
||||||
Returns a dictionary of all the attachments linked to the layer, where:
|
Returns a dictionary of all the attachments linked to the layer, where:
|
||||||
* Each key is the attachment's elabid;
|
* Each key is the attachment's progressive ID (0, 1...);
|
||||||
* Each value is a dictionary containing the attachment's filename, hashname and related experiment elabid (= self.elabid).
|
* Each value is a dictionary containing the attachment's elabid, filename, hashname and related experiment elabid (= self.elabid).
|
||||||
|
|
||||||
Data is already in layer_data, so the API key is unrequired. Same goes for:
|
Data is already in layer_data, so the API key is unrequired. Same goes for:
|
||||||
* fetch_textual_uploads() - no arguments;
|
* fetch_textual_uploads() - no arguments;
|
||||||
* fetch_images() - no arguments.
|
* fetch_images() - no arguments.
|
||||||
|
|
||||||
|
Exception: returns {} (empty dictionary) if no uploads/attachments on Layer.
|
||||||
"""
|
"""
|
||||||
# Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint.
|
# Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint.
|
||||||
|
if self.uploads == []:
|
||||||
|
return {}
|
||||||
attachments = {
|
attachments = {
|
||||||
attachment["id"]: {
|
self.uploads.index(attachment): {
|
||||||
|
"id": attachment["id"],
|
||||||
"filename": attachment["real_name"],
|
"filename": attachment["real_name"],
|
||||||
"hashname": attachment["long_name"],
|
"hashname": attachment["long_name"],
|
||||||
"related_experiment": attachment["item_id"],
|
"related_experiment": attachment["item_id"],
|
||||||
@@ -180,7 +186,7 @@ class Layer:
|
|||||||
textual_uploads = {
|
textual_uploads = {
|
||||||
attachment: attachments[attachment]
|
attachment: attachments[attachment]
|
||||||
for attachment in attachments
|
for attachment in attachments
|
||||||
if attachments[attachments]["filename"][-4:] in (".txt", ".csv", ".tsv")
|
if attachments[attachment]["filename"][-4:] in (".txt", ".csv", ".tsv")
|
||||||
}
|
}
|
||||||
return textual_uploads
|
return textual_uploads
|
||||||
|
|
||||||
@@ -195,12 +201,12 @@ class Layer:
|
|||||||
That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename.
|
That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename.
|
||||||
"""
|
"""
|
||||||
attachments = self.list_attachments()
|
attachments = self.list_attachments()
|
||||||
pictures = {
|
images = {
|
||||||
attachment: attachments[attachment]
|
attachment: attachments[attachment]
|
||||||
for attachment in attachments
|
for attachment in attachments
|
||||||
if attachments[attachments]["filename"][-4:] in (".png", ".bmp")
|
if attachments[attachment]["filename"][-4:] in (".png", ".bmp")
|
||||||
}
|
}
|
||||||
return pictures
|
return images
|
||||||
|
|
||||||
|
|
||||||
class Entrypoint:
|
class Entrypoint:
|
||||||
@@ -218,6 +224,7 @@ class Entrypoint:
|
|||||||
self.extra = sample_data["metadata_decoded"]["extra_fields"]
|
self.extra = sample_data["metadata_decoded"]["extra_fields"]
|
||||||
self.linked_items = sample_data["items_links"] # dict
|
self.linked_items = sample_data["items_links"] # dict
|
||||||
self.batch_elabid = self.extra["Substrate batch"]["value"] # elabid
|
self.batch_elabid = self.extra["Substrate batch"]["value"] # elabid
|
||||||
|
self.proposal = self.extra["Proposal"].get("value") or None # proposal
|
||||||
self.linked_experiments = sample_data["related_experiments_links"] # dict
|
self.linked_experiments = sample_data["related_experiments_links"] # dict
|
||||||
self.linked_experiments_elabid = [
|
self.linked_experiments_elabid = [
|
||||||
i["entityid"] for i in self.linked_experiments
|
i["entityid"] for i in self.linked_experiments
|
||||||
@@ -325,6 +332,13 @@ class Target(Material):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
head = APIHandler("MyApiKey-123456789abcdef")
|
# head = APIHandler("MyApiKey-123456789abcdef")
|
||||||
print(f"Example header:\n\t{head.header}\n")
|
# print(f"Example header:\n\t{head.header}\n")
|
||||||
print("Warning: you're not supposed to be running this as the main program.")
|
# print("Warning: you're not supposed to be running this as the main program.")
|
||||||
|
api_key = getpass("Paste API key here [no echo]: ")
|
||||||
|
handler = APIHandler(api_key=api_key)
|
||||||
|
exp58 = handler.get_entry_from_elabid(elabid=58, entryType="experiments")
|
||||||
|
layer58 = Layer(exp58)
|
||||||
|
print(layer58.list_attachments())
|
||||||
|
print(layer58.fetch_textual_uploads())
|
||||||
|
print(layer58.fetch_images())
|
||||||
|
|||||||
309
src/main.py
309
src/main.py
@@ -15,7 +15,7 @@ def call_entrypoint_from_elabid(elabid):
|
|||||||
If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError.
|
If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
sample_data = APIHandler(apikey).get_entry_from_elabid(
|
sample_data = APIHandler(api_key).get_entry_from_elabid(
|
||||||
elabid, entryType="items"
|
elabid, entryType="items"
|
||||||
)
|
)
|
||||||
if not sample_data.get("category_title") == "Sample":
|
if not sample_data.get("category_title") == "Sample":
|
||||||
@@ -36,7 +36,7 @@ def call_material_from_elabid(elabid):
|
|||||||
Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted.
|
Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
material_data = APIHandler(apikey).get_entry_from_elabid(
|
material_data = APIHandler(api_key).get_entry_from_elabid(
|
||||||
elabid, entryType="items"
|
elabid, entryType="items"
|
||||||
)
|
)
|
||||||
material_category = material_data.get("category_title")
|
material_category = material_data.get("category_title")
|
||||||
@@ -64,7 +64,7 @@ def call_layers_from_list(elabid_list):
|
|||||||
list_of_layers = []
|
list_of_layers = []
|
||||||
for elabid in elabid_list:
|
for elabid in elabid_list:
|
||||||
try:
|
try:
|
||||||
layer_data = APIHandler(apikey).get_entry_from_elabid(
|
layer_data = APIHandler(api_key).get_entry_from_elabid(
|
||||||
elabid, entryType="experiments"
|
elabid, entryType="experiments"
|
||||||
)
|
)
|
||||||
if not layer_data.get("category_title") == "PLD Deposition":
|
if not layer_data.get("category_title") == "PLD Deposition":
|
||||||
@@ -134,7 +134,7 @@ def deduplicate_instruments_from_layers(layers):
|
|||||||
rheeds = []
|
rheeds = []
|
||||||
elegant_dict = {}
|
elegant_dict = {}
|
||||||
for lyr in layers:
|
for lyr in layers:
|
||||||
instruments = lyr.get_instruments(apikey)
|
instruments = lyr.get_instruments(api_key)
|
||||||
lasers.append(instruments["laser_system"])
|
lasers.append(instruments["laser_system"])
|
||||||
chambers.append(instruments["deposition_chamber"])
|
chambers.append(instruments["deposition_chamber"])
|
||||||
rheeds.append(instruments["rheed_system"])
|
rheeds.append(instruments["rheed_system"])
|
||||||
@@ -204,6 +204,93 @@ def deduplicate_instruments_from_layers(layers):
|
|||||||
# }
|
# }
|
||||||
|
|
||||||
|
|
||||||
|
def select_rheed_data(layer):
|
||||||
|
"""
|
||||||
|
Takes a Layer-class object and selects the attachments to use to create the RHEED dataset for the NeXus file.
|
||||||
|
|
||||||
|
There are two categories of attachments considered: text-files and pictures.
|
||||||
|
The only accepted formats are ".txt", ".csv" and ".tsv" for the first ones, and ".png" or ".bmp" for the others.
|
||||||
|
The function is extension-sensitive, and only one attachment for each category will be downloaded.
|
||||||
|
|
||||||
|
If there are more than one attachment for each category, the user is prompted to select one of them from a list.
|
||||||
|
If there are no attachments for a category the function will return {} (empty dictionary) for that category.
|
||||||
|
|
||||||
|
Returns the set: (rheed_data_file, rheed_image_file). Both variables are dictionaries in the following format:
|
||||||
|
{
|
||||||
|
"fullname": real_name (with extension),
|
||||||
|
"hashname": long_name (with extension),
|
||||||
|
"related_experiment": elabid
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
n = layer.layer_number
|
||||||
|
textual_uploads = layer.fetch_textual_uploads()
|
||||||
|
images = layer.fetch_images()
|
||||||
|
|
||||||
|
# Check for length. Three cases:
|
||||||
|
# 1. len is 0, no file of this category → return {}
|
||||||
|
# 2. len is more than 1, user must select
|
||||||
|
# 3. len is 1, God's in his heaven, all's right with the world
|
||||||
|
if len(textual_uploads) == 0:
|
||||||
|
rheed_data_file = {}
|
||||||
|
elif len(textual_uploads) > 1:
|
||||||
|
# prompt user to select from list
|
||||||
|
print(f"Attention: Layer {n} contains multiple TEXTUAL attachments.\n")
|
||||||
|
print("These are used to populate the 'RHEED intensities' dataset.")
|
||||||
|
print("=== USER INTERVENTION REQUIRED ===")
|
||||||
|
for id in textual_uploads:
|
||||||
|
print(f"{id} - {textual_uploads[id]}")
|
||||||
|
ans = None
|
||||||
|
while not type(ans) == int or not ans in range(0, len(textual_uploads)):
|
||||||
|
ans = (
|
||||||
|
input(
|
||||||
|
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
|
||||||
|
)
|
||||||
|
or 0
|
||||||
|
)
|
||||||
|
if ans.isdigit():
|
||||||
|
ans = int(ans)
|
||||||
|
continue
|
||||||
|
rheed_data_file = textual_uploads[ans] # still a dictionary
|
||||||
|
else:
|
||||||
|
rheed_data_file = textual_uploads[
|
||||||
|
next(iter(textual_uploads))
|
||||||
|
] # this prism of pork gets the value of the only key in the dictionary
|
||||||
|
# it's proof like no other that my code is human-generated, and that I suck at coding. It's hubris manifest.
|
||||||
|
|
||||||
|
# As above so below
|
||||||
|
if len(images) == 0:
|
||||||
|
rheed_image_file = {}
|
||||||
|
elif len(images) > 1:
|
||||||
|
# prompt user to select from list
|
||||||
|
print(f"Attention: Layer {n} contains multiple PNG/BMP attachments.\n")
|
||||||
|
print("These are used to create the RHEED heatmap.")
|
||||||
|
print("=== USER INTERVENTION REQUIRED ===")
|
||||||
|
for id in images:
|
||||||
|
print(f"{id} - {images[id]}")
|
||||||
|
ans = None
|
||||||
|
while not type(ans) == int or not ans in range(0, len(images)):
|
||||||
|
ans = (
|
||||||
|
input(
|
||||||
|
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
|
||||||
|
)
|
||||||
|
or 0
|
||||||
|
)
|
||||||
|
if ans.isdigit():
|
||||||
|
ans = int(ans)
|
||||||
|
continue
|
||||||
|
rheed_image_file = images[ans] # still a dictionary
|
||||||
|
else:
|
||||||
|
rheed_image_file = images[next(iter(images))]
|
||||||
|
|
||||||
|
return (rheed_data_file, rheed_image_file)
|
||||||
|
|
||||||
|
|
||||||
|
def download_rheed_data():
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def analyse_rheed_data(data):
|
def analyse_rheed_data(data):
|
||||||
"""
|
"""
|
||||||
Takes the content of a tsv file and returns a dictionary with timestamps and intensities.
|
Takes the content of a tsv file and returns a dictionary with timestamps and intensities.
|
||||||
@@ -236,7 +323,7 @@ def analyse_rheed_data(data):
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Insufficient number of columns: expected 4, got n_cols = {n_cols}."
|
f"Insufficient number of columns: expected 4, got n_cols = {n_cols}."
|
||||||
)
|
)
|
||||||
n_time_points = data.shape[0]
|
# n_time_points = data.shape[0]
|
||||||
|
|
||||||
# Get time (all rows of col 0) as Float64:
|
# Get time (all rows of col 0) as Float64:
|
||||||
time = data[:, 0].astype(
|
time = data[:, 0].astype(
|
||||||
@@ -254,14 +341,18 @@ def analyse_rheed_data(data):
|
|||||||
|
|
||||||
def make_nexus_schema_dictionary(substrate_object, layers):
|
def make_nexus_schema_dictionary(substrate_object, layers):
|
||||||
"""
|
"""
|
||||||
Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications.
|
Main function, takes all the other functions to reconstruct the full dataset.
|
||||||
|
Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function),
|
||||||
|
and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function).
|
||||||
|
|
||||||
|
Returns dictionary with the same schema as the NeXus standard for PLD fabrications.
|
||||||
"""
|
"""
|
||||||
instruments = deduplicate_instruments_from_layers(layers)
|
instruments = deduplicate_instruments_from_layers(layers)
|
||||||
pld_fabrication = {
|
pld_fabrication = {
|
||||||
"sample": {
|
"sample": {
|
||||||
"substrate": {
|
"substrate": {
|
||||||
"name": substrate_object.name,
|
"name": substrate_object.name,
|
||||||
"chemical_formula": substrate_object.get_compound_formula(apikey),
|
"chemical_formula": substrate_object.get_compound_formula(api_key),
|
||||||
"orientation": substrate_object.orientation,
|
"orientation": substrate_object.orientation,
|
||||||
"miscut_angle": {
|
"miscut_angle": {
|
||||||
"value": substrate_object.miscut_angle,
|
"value": substrate_object.miscut_angle,
|
||||||
@@ -280,14 +371,16 @@ def make_nexus_schema_dictionary(substrate_object, layers):
|
|||||||
"multilayer": {},
|
"multilayer": {},
|
||||||
},
|
},
|
||||||
"instruments_used": instruments["multilayer"],
|
"instruments_used": instruments["multilayer"],
|
||||||
|
"rheed_data": {},
|
||||||
}
|
}
|
||||||
multilayer = pld_fabrication["sample"]["multilayer"]
|
multilayer = pld_fabrication["sample"]["multilayer"]
|
||||||
|
rheed_data = pld_fabrication["rheed_data"]
|
||||||
for layer in layers:
|
for layer in layers:
|
||||||
name = "layer_" + layer.layer_number
|
name = "layer_" + layer.layer_number
|
||||||
target_object = chain_layer_to_target(layer)
|
target_object = chain_layer_to_target(layer)
|
||||||
target_dict = {
|
target_dict = {
|
||||||
"name": target_object.name,
|
"name": target_object.name,
|
||||||
"chemical_formula": target_object.get_compound_formula(apikey),
|
"chemical_formula": target_object.get_compound_formula(api_key),
|
||||||
"description": target_object.description,
|
"description": target_object.description,
|
||||||
"shape": target_object.shape,
|
"shape": target_object.shape,
|
||||||
"dimensions": target_object.dimensions,
|
"dimensions": target_object.dimensions,
|
||||||
@@ -381,10 +474,16 @@ def make_nexus_schema_dictionary(substrate_object, layers):
|
|||||||
},
|
},
|
||||||
"instruments_used": instruments[name],
|
"instruments_used": instruments[name],
|
||||||
}
|
}
|
||||||
|
rheed_data[name] = {
|
||||||
|
"layer_number": layer.layer_number,
|
||||||
|
"data": select_rheed_data(
|
||||||
|
layer
|
||||||
|
), # tuple: (rheed_data_file, rheed_image_file)
|
||||||
|
}
|
||||||
return pld_fabrication
|
return pld_fabrication
|
||||||
|
|
||||||
|
|
||||||
def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matrix=None):
|
def build_nexus_file(pld_fabrication, output_path):
|
||||||
# NOTE: look at the mail attachment from Emiliano...
|
# NOTE: look at the mail attachment from Emiliano...
|
||||||
with h5py.File(output_path, "w") as f:
|
with h5py.File(output_path, "w") as f:
|
||||||
nx_pld_entry = f.create_group("pld_fabrication")
|
nx_pld_entry = f.create_group("pld_fabrication")
|
||||||
@@ -650,67 +749,150 @@ def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matri
|
|||||||
nx_rheed = nx_pld_entry.create_group("rheed_data")
|
nx_rheed = nx_pld_entry.create_group("rheed_data")
|
||||||
nx_rheed.attrs["NX_class"] = "NXdata"
|
nx_rheed.attrs["NX_class"] = "NXdata"
|
||||||
|
|
||||||
if rheed_osc is not None:
|
rheed_data = pld_fabrication["rheed_data"]
|
||||||
# Asse temporale
|
for layer in rheed_data:
|
||||||
t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"])
|
nx_rheed_layer = nx_rheed.create_group(layer)
|
||||||
t_ds.attrs["units"] = "s"
|
|
||||||
t_ds.attrs["long_name"] = "Time"
|
|
||||||
|
|
||||||
# Intensità: shape (n_layers, n_timepoints, 3)
|
layer_dict = rheed_data[layer]
|
||||||
i_ds = nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
|
n = layer_dict["layer_number"]
|
||||||
i_ds.attrs["units"] = "a.u."
|
rheed_data_file = layer_dict["data"][0] # first in the tuple
|
||||||
i_ds.attrs["long_name"] = "RHEED Intensity"
|
rheed_image_file = layer_dict["data"][1] # second in the tuple
|
||||||
|
handler = APIHandler(api_key)
|
||||||
|
|
||||||
# Attributi NXdata — notazione NeXus 3.x corretta
|
# TO-DO: maybe make a dedicated function???
|
||||||
nx_rheed.attrs["signal"] = "intensity"
|
data_path = None
|
||||||
nx_rheed.attrs["axes"] = [
|
image_path = None
|
||||||
".",
|
|
||||||
"time",
|
|
||||||
".",
|
|
||||||
] # solo l'asse 1 (time) è denominato
|
|
||||||
nx_rheed.attrs["time_indices"] = np.array([1], dtype=np.int32)
|
|
||||||
# ###########
|
|
||||||
# nx_rheed = nx_pld_entry.create_group("rheed_data")
|
|
||||||
# nx_rheed.attrs["NX_class"] = "NXdata"
|
|
||||||
|
|
||||||
# nx_rheed.create_dataset("time", data=rheed_osc["time"])
|
if rheed_data_file != {}:
|
||||||
# nx_rheed["time"].attrs["units"] = "s"
|
try:
|
||||||
|
elabid = rheed_data_file["related_experiment"]
|
||||||
|
upload_id = rheed_data_file["id"]
|
||||||
|
except KeyError as ke:
|
||||||
|
raise KeyError(
|
||||||
|
f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
|
||||||
|
)
|
||||||
|
data_path = handler.download_attachment_to_disk(
|
||||||
|
elabid=elabid, upload_id=upload_id
|
||||||
|
)
|
||||||
|
|
||||||
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
|
if rheed_image_file != {}:
|
||||||
# #nx_rheed["intensity"].attrs["units"] = "counts"
|
try:
|
||||||
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
|
upload_id = rheed_image_file["id"]
|
||||||
# nx_rheed.attrs["signal"] = "intensity"
|
elabid = rheed_image_file["related_experiment"]
|
||||||
# nx_rheed.attrs["axes"] = "layer:time:channel"
|
except KeyError as ke:
|
||||||
# nx_rheed.attrs["layer_indices"] = [0] # asse layer
|
raise KeyError(
|
||||||
# nx_rheed.attrs["time_indices"] = [1] # asse tempo
|
f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
|
||||||
# nx_rheed.attrs["channel_indices"] = [2]
|
)
|
||||||
if heatmap_matrix is not None:
|
image_path = handler.download_attachment_to_disk(
|
||||||
heatmap = nx_rheed.create_dataset("diffraction_image", data=heatmap_matrix)
|
elabid=elabid, upload_id=upload_id
|
||||||
heatmap.attrs["long_name"] = "Diffraction Image"
|
)
|
||||||
heatmap.attrs["units"] = "a.u."
|
|
||||||
# this is of my own initiative. good???
|
if data_path and os.path.isfile(data_path):
|
||||||
heatmap.attrs["interpretation"] = "spectrum"
|
with open(data_path, "r") as o:
|
||||||
# suggested by DeepSeek, useful? probably not.
|
osc = np.loadtxt(o, delimiter="\t")
|
||||||
# heatmap.attrs["suggested_colormap"] = "inferno"
|
try:
|
||||||
# heatmap.attrs["scale_min"] = 0.0
|
rheed_osc = (
|
||||||
# heatmap.attrs["scale_max"] = 1.0
|
analyse_rheed_data(data=osc) or None
|
||||||
|
) # analyze rheed data first, build the file later
|
||||||
|
except ValueError as ve:
|
||||||
|
raise ValueError(
|
||||||
|
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
|
||||||
|
)
|
||||||
|
if rheed_osc is not None:
|
||||||
|
# Time axis (needed?)
|
||||||
|
t_ds = nx_rheed_layer.create_dataset("time", data=rheed_osc["time"])
|
||||||
|
t_ds.attrs["units"] = "s"
|
||||||
|
t_ds.attrs["long_name"] = "Time"
|
||||||
|
|
||||||
|
# Intensity shape (n_layers, n_timepoints, 3)
|
||||||
|
i_ds = nx_rheed_layer.create_dataset(
|
||||||
|
"intensity", data=rheed_osc["intensity"]
|
||||||
|
)
|
||||||
|
i_ds.attrs["units"] = "a.u."
|
||||||
|
i_ds.attrs["long_name"] = "RHEED Intensity"
|
||||||
|
|
||||||
|
# NXdata attributes — NeXus 3.x notation
|
||||||
|
nx_rheed_layer.attrs["signal"] = "intensity"
|
||||||
|
nx_rheed_layer.attrs["axes"] = [
|
||||||
|
".",
|
||||||
|
"time",
|
||||||
|
".",
|
||||||
|
] # only time axis (1) is named
|
||||||
|
nx_rheed_layer.attrs["time_indices"] = np.array([1], dtype=np.int32)
|
||||||
|
|
||||||
|
if image_path and os.path.isfile(image_path):
|
||||||
|
img = Image.open(image_path).convert("L")
|
||||||
|
heatmap_matrix = np.array(img, dtype=np.uint8) # or None
|
||||||
|
|
||||||
|
if heatmap_matrix is not None:
|
||||||
|
heatmap = nx_rheed_layer.create_dataset(
|
||||||
|
"diffraction_image", data=heatmap_matrix
|
||||||
|
)
|
||||||
|
heatmap.attrs["long_name"] = "Diffraction Image"
|
||||||
|
heatmap.attrs["units"] = "a.u."
|
||||||
|
heatmap.attrs["interpretation"] = "spectrum"
|
||||||
return
|
return
|
||||||
|
# TO-DO: ↓↓↓ comment cleanup ↓↓↓
|
||||||
|
#
|
||||||
|
# here's what we gon do: (to be read with the voice of Mike from Breaking Bad)
|
||||||
|
# 1. rheed_osc and heatmap_matrix are NOT given in input to the function so no need for checking that
|
||||||
|
# 2. loop through the layers, each with its elabid and metadata
|
||||||
|
# 2a. read said metadata for each layer, print list of txt and png files (dedicated Layer class methods)
|
||||||
|
# 2b. prompt the user for file choice (1 text file per layer - in tsv format, 1 picture file - either png [default] or bmp)
|
||||||
|
# 2c. download the chosen file
|
||||||
|
# 2d. with chosen file do analysis as before
|
||||||
|
# 3. the schema should be:
|
||||||
|
# * /rheed_data
|
||||||
|
# * /layer_n
|
||||||
|
# * time (rheed_osc)
|
||||||
|
# * intensity (rheed_osc)
|
||||||
|
# * diffraction_image (heatmap_matrix)
|
||||||
|
# first problem is probably finding out how to recover the following meta from the original Layer object:
|
||||||
|
# * Layer.elabid - integer
|
||||||
|
# * Layer.fetch_textual_uploads() - dictionary
|
||||||
|
# * Layer.fetch_images() - dictionary
|
||||||
|
|
||||||
|
# nx_rheed = nx_pld_entry.create_group("rheed_data")
|
||||||
|
# nx_rheed.attrs["NX_class"] = "NXdata"
|
||||||
|
|
||||||
|
# nx_rheed.create_dataset("time", data=rheed_osc["time"])
|
||||||
|
# nx_rheed["time"].attrs["units"] = "s"
|
||||||
|
|
||||||
|
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"])
|
||||||
|
# #nx_rheed["intensity"].attrs["units"] = "counts"
|
||||||
|
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity"
|
||||||
|
# nx_rheed.attrs["signal"] = "intensity"
|
||||||
|
# nx_rheed.attrs["axes"] = "layer:time:channel"
|
||||||
|
# nx_rheed.attrs["layer_indices"] = [0] # asse layer
|
||||||
|
# nx_rheed.attrs["time_indices"] = [1] # asse tempo
|
||||||
|
# nx_rheed.attrs["channel_indices"] = [2]
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# TO-DO: place the API base URL somewhere else.
|
# TO-DO: place the API base URL somewhere else.
|
||||||
ELABFTW_API_URL = "https://elabftw.fisica.unina.it/api/v2"
|
ELABFTW_API_URL = "https://elabftw.fisica.unina.it/api/v2"
|
||||||
apikey = getpass("Paste API key here: ")
|
api_key = getpass("Paste API key here: ")
|
||||||
elabid = input("Enter elabid of your starting sample [default = 1111]: ") or 1111
|
elabid = input("Enter elabid of your starting sample [default = 1111]: ") or 1111
|
||||||
data = APIHandler(apikey).get_entry_from_elabid(elabid)
|
handler = APIHandler(api_key)
|
||||||
|
data = handler.get_entry_from_elabid(elabid)
|
||||||
sample = Entrypoint(data)
|
sample = Entrypoint(data)
|
||||||
sample_name = sample.name.strip().replace(" ", "_")
|
sample_name = sample.name.strip().replace(" ", "_")
|
||||||
|
if sample.proposal:
|
||||||
|
sample_proposal = sample.proposal.strip().replace(" ", "_")
|
||||||
|
else:
|
||||||
|
sample_proposal = None
|
||||||
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
|
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
|
||||||
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
|
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
|
||||||
n_layers = len(layers) # total number of layers on the sample
|
n_layers = len(layers) # total number of layers on the sample
|
||||||
result = make_nexus_schema_dictionary(substrate_object, layers)
|
result = make_nexus_schema_dictionary(substrate_object, layers)
|
||||||
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
|
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
|
||||||
with open(f"output/sample-{sample_name}.json", "w") as f:
|
fn_base = (
|
||||||
|
"nffa-di_"
|
||||||
|
+ (f"{sample_proposal}_" if sample_proposal else "")
|
||||||
|
+ "Napoli_"
|
||||||
|
+ sample_name
|
||||||
|
)
|
||||||
|
with open(f"output/{fn_base}.json", "w") as f:
|
||||||
json.dump(result, f, indent=3)
|
json.dump(result, f, indent=3)
|
||||||
# TO-DO: remove the hard-coded path of the RWA file
|
# TO-DO: remove the hard-coded path of the RWA file
|
||||||
# ideally the script should download a TXT/CSV file from each layer
|
# ideally the script should download a TXT/CSV file from each layer
|
||||||
@@ -718,28 +900,9 @@ if __name__ == "__main__":
|
|||||||
# and merge all data in a single file to analyse it
|
# and merge all data in a single file to analyse it
|
||||||
# WARNING: fails if file is missing
|
# WARNING: fails if file is missing
|
||||||
|
|
||||||
with open("tests/Realtime_Window_Analysis.txt", "r") as o:
|
|
||||||
osc = np.loadtxt(o, delimiter="\t")
|
|
||||||
try:
|
|
||||||
rheed_osc = (
|
|
||||||
analyse_rheed_data(data=osc) or None
|
|
||||||
) # analyze rheed data first, build the file later
|
|
||||||
except ValueError as ve:
|
|
||||||
raise ValueError(
|
|
||||||
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
|
|
||||||
)
|
|
||||||
# This one tries to open a png image.
|
# This one tries to open a png image.
|
||||||
# Emiliano said to keep it to one image per layer tops.
|
# Emiliano said to keep it to one image per layer tops.
|
||||||
# In this test I will only consider one image.
|
# In this test I will only consider one image.
|
||||||
# TO-DO: make it format-agnostic. If not possible, make it PNG-only.
|
# TO-DO: make it format-agnostic. If not possible, make it PNG-only.
|
||||||
if os.path.isfile("tests/LAO_16min50s_736C_STO.bmp"): # if BMP
|
# mx = mx.astype(np.float32) / 255.0 # consider deleting???
|
||||||
# if os.path.isfile("tests/LAO_16min50s_736C_STO.png"): # if PNG
|
build_nexus_file(result, output_path=f"output/{fn_base}.h5")
|
||||||
img = Image.open("tests/LAO_16min50s_736C_STO.bmp").convert("L")
|
|
||||||
mx = np.array(img, dtype=np.uint8)
|
|
||||||
# mx = mx.astype(np.float32) / 255.0 # consider deleting???
|
|
||||||
build_nexus_file(
|
|
||||||
result,
|
|
||||||
output_path=f"output/sample-{sample_name}-nexus.h5",
|
|
||||||
rheed_osc=rheed_osc,
|
|
||||||
heatmap_matrix=mx,
|
|
||||||
)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user