Compare commits

12 Commits

Author SHA256 Message Date
ee96100a73 uses dotenv to store api key and other important variables
if a value is not found in .env it will be prompted, but not checked
next step is user docs
2026-05-13 12:31:26 +02:00
686f869d10 documents all the functions/classes/methods (by hand)
no AI used, it took more than I'm willing to admit but it's done
2026-05-13 12:12:32 +02:00
2eea3fc2dd ignores output/attachments 2026-05-13 10:27:40 +02:00
cbf5cdd115 clears comments 2026-05-13 10:26:15 +02:00
a6d4c72f9c adds dependency: dotenv 2026-05-13 09:53:57 +02:00
7e808509cc THIS should solve the naming problem
new class for the Proposals, only outputs their names
if name contains "Proposal ", that gets cropped out
if no proposal is specified the name of the sample shall not include one
2026-05-12 22:59:19 +02:00
2bbab96ca7 rm unnecessary fstring 2026-05-12 16:48:04 +02:00
f84478a7a4 this should solve the filename problem 2026-05-12 16:08:49 +02:00
19a802694f MAJOR: fundamental functions of the parser are ready and tested!
TO-DO:
1. follow the "TO-DO" comments to clean the code
2. filename should be NFFA-DI compliant like:
	nffa-di_NA01_Napoli_Na-26-015.h5
3. rheed data analysis should take two distinct functions
   one for the raw stream and one for the image
4. if time allows: consider moving most of main.py in separate modules
2026-05-12 15:38:06 +02:00
df927b7c0e Layer class methods to list attachments up and tested 2026-05-12 13:51:59 +02:00
ccf74fca26 methods to download experiments attachments up and tested
to-do: clean code
2026-05-12 13:36:52 +02:00
07aac3e6b3 unfinished work 2026-05-12 12:54:16 +02:00
5 changed files with 512 additions and 199 deletions

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
output/*.json output/*.json
output/*.h5 output/*.h5
output/*.nxs output/*.nxs
output/attachments/*.*
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View File

@@ -3,3 +3,4 @@ asyncio
h5py h5py
pillow pillow
elabapi_python elabapi_python
dotenv

View File

@@ -1,4 +1,6 @@
import os, requests import os, requests
from dotenv import load_dotenv
from getpass import getpass
import elabapi_python as elabapi import elabapi_python as elabapi
@@ -10,19 +12,22 @@ class APIHandler:
(since the API doesn't support downloading attachments AFAIK). (since the API doesn't support downloading attachments AFAIK).
Args: Args:
api_key: A valid API key for the eLabFTW instance where the data is stored, with permissions to access the relevant entries. api_key: str: A valid API key for the eLabFTW instance where the data is stored, with permissions to access the relevant entries.
eLabFTW's API keys are well documented here: https://doc.elabftw.net/docs/usage/api/. eLabFTW's API keys are well documented here: https://doc.elabftw.net/docs/usage/api/.
If you don't have an API key and are uncapable of creating one, contact your eLabFTW administrator. If you don't have an API key and are uncapable of creating one, contact your eLabFTW administrator.
Or RTFM and create one yourself, it's not that hard. Or RTFM and create one yourself, it's not that hard.
ELABFTW_API_URL: Complete URL of the eLabFTW instance's root for the API endpoints. ELABFTW_API_URL: str: Complete URL of the eLabFTW instance's root for the API endpoints.
In full caps because it won't (shouldn't) be changed much. In full caps because it won't (shouldn't) be changed much.
""" """
# TO-DO: remove static url. # TO-DO: remove static url.
def __init__( def __init__(self, api_key="", ELABFTW_API_URL=None):
self, api_key="", ELABFTW_API_URL="https://elabftw.fisica.unina.it/api/v2" """Init method, api_key suggested but not required (empty by default)."""
): if not ELABFTW_API_URL:
"""Init method, apikey suggested but not required (empty by default).""" load_dotenv()
ELABFTW_API_URL = os.getenv("ELABFTW_API_URL") or input(
"Enter a valid eLabFTW API URL (ends with '/api/v2)': "
)
self.api_key = api_key self.api_key = api_key
self.auth = {"Authorization": api_key} self.auth = {"Authorization": api_key}
self.content = {"Content-Type": "application/json"} self.content = {"Content-Type": "application/json"}
@@ -33,9 +38,9 @@ class APIHandler:
""" """
Returns raw data (as dictionary) from its elabid and entry type. Returns raw data (as dictionary) from its elabid and entry type.
args: Args:
elabid: elabftw internal id of the selected resource. elabid: int: elabftw internal id of the selected resource.
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error. entryType: str: Resource type. Anything other than "experiments" or "items" WILL raise an error.
""" """
if entryType not in ["experiments", "items"]: if entryType not in ["experiments", "items"]:
raise Exception( raise Exception(
@@ -64,12 +69,12 @@ class APIHandler:
case 404: case 404:
# Lapalissian: # Lapalissian:
raise ConnectionError( raise ConnectionError(
f"404: Not Found. This means there's no resource with this elabid (wrong elabid?) on your eLabFTW (wrong endpoint?)." "404: Not Found. This means there's no resource with this elabid (wrong elabid?) on your eLabFTW (wrong endpoint?)."
) )
case 400: case 400:
# I genuinely have no idea: # I genuinely have no idea:
raise ConnectionError( raise ConnectionError(
f"400: Bad Request. This means the API endpoint you tried to reach is invalid. Did you tamper with the source code? If not, contact the developer." "400: Bad Request. This means the API endpoint you tried to reach is invalid. Did you tamper with the source code? If not, contact the developer."
) )
case _: case _:
# For some fucking reason, this is the only error I actually get from the API... # For some fucking reason, this is the only error I actually get from the API...
@@ -80,17 +85,19 @@ class APIHandler:
entry_data = response.json() entry_data = response.json()
return entry_data return entry_data
def download_attachments_data(self, elabid, entryType="experiments"): def download_attachment_data(self, elabid, upload_id, entryType="experiments"):
""" """
Downloads attachments of a certain eLabFTW experiment (default) or item. Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
Only returns their binary data. Use method download_attachments_to_disk to save to file. Only returns its binary data. Use method download_attachment_to_disk to save to file.
NOTE: Output is a dictionary where: NOTE: Output is a dictionary where:
* The keys are the attachments' filenames; * The key is the attachment's filename;
* The values are the binary data for those attachments. * The value is the attachment's binary data.
Args: Args:
elabid: eLabFTW internal ID of the selected resource. elabid: int: eLabFTW internal ID of the selected resource.
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error. upload_id: int: eLabFTW internal ID of the selected upload.
entryType: str: Resource type. Anything other than "experiments" or "items" WILL raise an error.
""" """
if entryType not in ["experiments", "items"]: if entryType not in ["experiments", "items"]:
raise Exception( raise Exception(
@@ -98,7 +105,7 @@ class APIHandler:
) )
config = elabapi.Configuration() config = elabapi.Configuration()
config.api_key["api_key"] = api_key config.api_key["api_key"] = self.api_key
config.api_key_prefix["api_key"] = "Authorization" config.api_key_prefix["api_key"] = "Authorization"
config.host = self.elaburl config.host = self.elaburl
config.debug = False config.debug = False
@@ -108,33 +115,37 @@ class APIHandler:
) )
uploads_api = elabapi.UploadsApi(api_client) uploads_api = elabapi.UploadsApi(api_client)
# Actual uploads (dictionary): # Scans through the attachments and selects the one with corresponing ID.
uploads = { attachment = {
upload.real_name: uploads_api.read_upload( upload.real_name: uploads_api.read_upload(
entryType, elabid, upload.id, format="binary", _preload_content=False entryType, elabid, upload_id, format="binary", _preload_content=False
).data ).data
for upload in uploads_api.read_uploads(entryType, elabid) for upload in uploads_api.read_uploads(entryType, elabid)
if upload.id == upload_id
} }
return uploads return attachment
def download_attachments_to_disk( def download_attachment_to_disk(
self, self,
elabid, elabid,
upload_id,
entryType="experiments", entryType="experiments",
dump_dir="output/attachments", dump_dir="output/attachments",
# persistent=True, # persistent=True,
): ):
""" """
Downloads attachments of a certain eLabFTW experiment (default) or item. Downloads a specific attachment of a certain eLabFTW experiment (default) or item.
Downloads their binary data through method download_attachments_data and dumps it to dump_dir. Downloads their binary data through method download_attachments_data and dumps it to dump_dir.
Returns full path of the output file.
Args: Args:
elabid: eLabFTW internal ID of the selected resource. elabid: int: eLabFTW internal ID of the selected resource.
entryType: Resource type. Anything other than "experiments" or "items" WILL raise an error. upload_id: int: eLabFTW internal ID of the selected upload.
dump_dir: Directory to which to save the attachments. Default is "output/attachments". entryType: str: Resource type. Anything other than "experiments" or "items" WILL raise an error.
persistent: [Unused] Decides if the files will stay on disk after all operations are completed. dump_dir: str: Directory to which to save the attachments. Default is "output/attachments".
If set to False, deletes the file upon exiting. persistent: bool: [Unused] Decides if the files will stay on disk after all operations are completed.
If set to False, deletes the file upon exiting. Default = True.
""" """
if entryType not in ["experiments", "items"]: if entryType not in ["experiments", "items"]:
@@ -142,9 +153,17 @@ class APIHandler:
"You can only download attachments from experiments or items." "You can only download attachments from experiments or items."
) )
uploads = download_attachments_data(elabid, entryType=entryType) uploads = self.download_attachment_data(elabid, upload_id, entryType=entryType)
for file in uploads: for file in uploads:
raw_data = uploads["file"] raw_data = uploads[file]
with open(os.path.join(dump_dir, f"exp{elabid}-{file}"), "wb") as f: full_path = os.path.join(dump_dir, f"exp{elabid}-{file}")
with open(full_path, "wb") as f:
f.write(raw_data) f.write(raw_data)
return return full_path
# Testing methods
if __name__ == "__main__":
api_key = getpass("Paste API key here [no echo]: ")
handler = APIHandler(api_key=api_key)
handler.download_attachment_to_disk(elabid=58, upload_id=81)

View File

@@ -1,4 +1,5 @@
import os, json, requests import os, json, requests
from getpass import getpass
from APIHandler import APIHandler from APIHandler import APIHandler
@@ -14,6 +15,10 @@ class Layer:
""" """
def __init__(self, layer_data): def __init__(self, layer_data):
"""
Properties/Attributes:
Too many to list.
"""
try: try:
self.elabid = layer_data["id"] self.elabid = layer_data["id"]
self.operator = layer_data["fullname"] self.operator = layer_data["fullname"]
@@ -130,6 +135,20 @@ class Layer:
self.description = layer_data.get("body") or None self.description = layer_data.get("body") or None
def get_instruments(self, api_key): def get_instruments(self, api_key):
"""
Retruns a dictionary of all the instruments used to create the layer.
The format of the dictionary is:
{
"laser_system": str,
"deposition_chamber": str,
"rheed_system": str
}
Arg: api_key: str: A valid API key for the eLabFTW instance where the data is stored, with permissions to access the relevant entries.
eLabFTW's API keys are well documented here: https://doc.elabftw.net/docs/usage/api/.
If you don't have an API key and are uncapable of creating one, contact your eLabFTW administrator.
Or RTFM and create one yourself, it's not that hard.
"""
raw_lasersys_data = APIHandler(api_key).get_entry_from_elabid( raw_lasersys_data = APIHandler(api_key).get_entry_from_elabid(
self.laser_system_elabid, entryType="items" self.laser_system_elabid, entryType="items"
) )
@@ -149,16 +168,21 @@ class Layer:
def list_attachments(self): def list_attachments(self):
""" """
Returns a dictionary of all the attachments linked to the layer, where: Returns a dictionary of all the attachments linked to the layer, where:
* Each key is the attachment's elabid; * Each key is the attachment's progressive ID (0, 1...);
* Each value is a dictionary containing the attachment's filename, hashname and related experiment elabid (= self.elabid). * Each value is a dictionary containing the attachment's elabid, filename, hashname and related experiment elabid (= self.elabid).
Data is already in layer_data, so the API key is unrequired. Same goes for: Data is already in layer_data, so the API key is unrequired. Same goes for:
* fetch_textual_uploads() - no arguments; * fetch_textual_uploads() - no arguments;
* fetch_images() - no arguments. * fetch_images() - no arguments.
Exception: returns {} (empty dictionary) if no uploads/attachments on Layer.
""" """
# Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint. # Remember: Layers are experiments, so we only need to look for attachments in the experiment endpoint.
if self.uploads == []:
return {}
attachments = { attachments = {
attachment["id"]: { self.uploads.index(attachment): {
"id": attachment["id"],
"filename": attachment["real_name"], "filename": attachment["real_name"],
"hashname": attachment["long_name"], "hashname": attachment["long_name"],
"related_experiment": attachment["item_id"], "related_experiment": attachment["item_id"],
@@ -180,7 +204,7 @@ class Layer:
textual_uploads = { textual_uploads = {
attachment: attachments[attachment] attachment: attachments[attachment]
for attachment in attachments for attachment in attachments
if attachments[attachments]["filename"][-4:] in (".txt", ".csv", ".tsv") if attachments[attachment]["filename"][-4:] in (".txt", ".csv", ".tsv")
} }
return textual_uploads return textual_uploads
@@ -195,12 +219,12 @@ class Layer:
That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename. That's because the API (v5.3.11) doesn't provide MIME Type or similar metadata on the attachments, so the only way to know if an attachment is an image or not is through its filename.
""" """
attachments = self.list_attachments() attachments = self.list_attachments()
pictures = { images = {
attachment: attachments[attachment] attachment: attachments[attachment]
for attachment in attachments for attachment in attachments
if attachments[attachments]["filename"][-4:] in (".png", ".bmp") if attachments[attachment]["filename"][-4:] in (".png", ".bmp")
} }
return pictures return images
class Entrypoint: class Entrypoint:
@@ -214,10 +238,20 @@ class Entrypoint:
""" """
def __init__(self, sample_data): def __init__(self, sample_data):
"""
Properties/Attributes:
* name: str: Name of the sample. Fairly important, and always present unless someone screws up REALLY bad.
* linked_items: dict: Dictionary generated by eLabFTW containing metadata on the items linked to the entrypoint.
* batch_elabid: int: eLabFTW internal id of the batch of the substrate used as the foundation of the sample.
* proposal: int: eLabFTW internal id of the proposal linked to the sample.
* linked_experiments: dict: Dictionary generated by eLabFTW containing metadata on the experiments linked to the entrypoint.
* linked_experiments_elabid: list: List of eLabFTW internal id's of the experiments linked to the entrypoint.
"""
try: try:
self.extra = sample_data["metadata_decoded"]["extra_fields"] self.extra = sample_data["metadata_decoded"]["extra_fields"]
self.linked_items = sample_data["items_links"] # dict self.linked_items = sample_data["items_links"] # dict
self.batch_elabid = self.extra["Substrate batch"]["value"] # elabid self.batch_elabid = self.extra["Substrate batch"]["value"] # elabid
self.proposal = self.extra["Proposal"].get("value") or None # proposal
self.linked_experiments = sample_data["related_experiments_links"] # dict self.linked_experiments = sample_data["related_experiments_links"] # dict
self.linked_experiments_elabid = [ self.linked_experiments_elabid = [
i["entityid"] for i in self.linked_experiments i["entityid"] for i in self.linked_experiments
@@ -232,6 +266,7 @@ class Entrypoint:
self.name = ( self.name = (
sample_data.get("title") or None sample_data.get("title") or None
) # error prevention is more important than preventing empty fields here ) # error prevention is more important than preventing empty fields here
# although I don't think it's even possible to fuck up this bad...
class Material: class Material:
@@ -247,6 +282,14 @@ class Material:
""" """
def __init__(self, material_data): def __init__(self, material_data):
"""
Properties/Attributes:
* name: str: Name of the material.
* compound_elabid: int: eLabFTW internal id of the compound.
* dimensions: str: Dimensions of the material, in standard format.
The class recognizes the unit of measurement and acts consequently.
* dimensions_unit: str: Unit of measurement - either "mm x mm", "inches" or None.
"""
try: try:
self.name = material_data["title"] # required self.name = material_data["title"] # required
self.extra = material_data["metadata_decoded"]["extra_fields"] self.extra = material_data["metadata_decoded"]["extra_fields"]
@@ -268,6 +311,20 @@ class Material:
) )
def get_compound_data(self, apikey): def get_compound_data(self, apikey):
"""
Returns a dictionary with the relevant data on the compound of which the material is made.
The format of the dictionary is:
{
"name": str,
"chemical_formula": str,
"cas_number": str
}
Arg: api_key: str: A valid API key for the eLabFTW instance where the data is stored, with permissions to access the relevant entries.
eLabFTW's API keys are well documented here: https://doc.elabftw.net/docs/usage/api/.
If you don't have an API key and are uncapable of creating one, contact your eLabFTW administrator.
Or RTFM and create one yourself, it's not that hard.
"""
raw_compound_data = APIHandler(apikey).get_entry_from_elabid( raw_compound_data = APIHandler(apikey).get_entry_from_elabid(
self.compound_elabid, entryType="items" self.compound_elabid, entryType="items"
) )
@@ -288,7 +345,32 @@ class Material:
class Substrate(Material): class Substrate(Material):
"""
Substrate(material_data) - where material_data is a Python dictionary.
Inherits from Material and it's meant to be used exclusively for eLabFTW Resources of the "Substrate" category.
"""
def __init__(self, material_data): def __init__(self, material_data):
"""
Properties/Attributes common to all Materials:
* name: str: Name of the material.
* compound_elabid: int: eLabFTW internal id of the compound.
* dimensions: str: Dimensions of the material, in standard format.
The class recognizes the unit of measurement and acts consequently.
* dimensions_unit: str: Unit of measurement - either "mm x mm", "inches" or None.
Specific properties/attributes:
* orientation: str:
* miscut_angle: str:
* miscut_angle_unit: str:
* miscut_direction: str:
* thickness: str:
* thickness_unit: str:
* surface_treatment: str:
* manufacturer: str:
* batch_id: str:
"""
super().__init__(material_data) super().__init__(material_data)
try: try:
self.orientation = self.extra["Orientation"]["value"] self.orientation = self.extra["Orientation"]["value"]
@@ -308,7 +390,28 @@ class Substrate(Material):
class Target(Material): class Target(Material):
"""
Target(material_data) - where material_data is a Python dictionary.
Inherits from Material and it's meant to be used exclusively for eLabFTW Resources of the "PLD Target" category.
"""
def __init__(self, material_data): def __init__(self, material_data):
"""
Properties/Attributes common to all Materials:
* name: str: Name of the material.
* compound_elabid: int: eLabFTW internal id of the compound.
* dimensions: str: Dimensions of the material, in standard format.
The class recognizes the unit of measurement and acts consequently.
* dimensions_unit: str: Unit of measurement - either "mm x mm", "inches" or None.
Specific properties/attributes:
* thickness: str:
* thickness_unit: str:
* shape: str:
* solid_form: str:
* manufacturer: str:
"""
super().__init__(material_data) super().__init__(material_data)
try: try:
self.thickness = self.extra["Thickness"]["value"] self.thickness = self.extra["Thickness"]["value"]
@@ -324,7 +427,36 @@ class Target(Material):
self.description = material_data.get("body") or "" self.description = material_data.get("body") or ""
class Proposal:
"""
Proposal(proposal_data) - where proposal_data is a Python dictionary.
Recovers only the relevant info on a proposal linked to the entrypoint sample.
Which currently is just its name.
If the name starts with "Proposal " (space included) that gets omitted from the output.
"""
def __init__(self, proposal_data):
"""
Properties/Attributes:
* name: str: Name of the proposal.
If the name starts with "Proposal " (space included) that gets omitted from the output.
"""
if "Proposal " in proposal_data["title"]:
self.name = proposal_data["title"].replace("Proposal ", "")
else:
self.name = proposal_data["title"]
if __name__ == "__main__": if __name__ == "__main__":
head = APIHandler("MyApiKey-123456789abcdef") # head = APIHandler("MyApiKey-123456789abcdef")
print(f"Example header:\n\t{head.header}\n") # print(f"Example header:\n\t{head.header}\n")
print("Warning: you're not supposed to be running this as the main program.") # print("Warning: you're not supposed to be running this as the main program.")
api_key = getpass("Paste API key here [no echo]: ")
handler = APIHandler(api_key=api_key)
exp58 = handler.get_entry_from_elabid(elabid=58, entryType="experiments")
layer58 = Layer(exp58)
print(layer58.list_attachments())
print(layer58.fetch_textual_uploads())
print(layer58.fetch_images())

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os, json, requests, h5py import os, json, requests, h5py
import numpy as np import numpy as np
from dotenv import load_dotenv
from getpass import getpass from getpass import getpass
from APIHandler import APIHandler from APIHandler import APIHandler
from classes import * from classes import *
@@ -10,12 +11,16 @@ from PIL import Image
def call_entrypoint_from_elabid(elabid): def call_entrypoint_from_elabid(elabid):
""" """
Calls an entrypoint sample from eLabFTW using its elabid, then returns an object of the Entrypoint class. Calls a sample from eLabFTW through its elabid, then returns an object of the Entrypoint class.
The Entrypoint serves as the starting point in the construction of the dataset.
If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError. If the entry is not a sample (category_title not matching exactly "Sample") returns ValueError.
It's most likely the first error you might encounter (with a valid API key).
Arg: elabid: int eLabFTW internal id of the selected resource.
""" """
try: try:
sample_data = APIHandler(apikey).get_entry_from_elabid( sample_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="items" elabid, entryType="items"
) )
if not sample_data.get("category_title") == "Sample": if not sample_data.get("category_title") == "Sample":
@@ -32,11 +37,14 @@ def call_material_from_elabid(elabid):
""" """
Calls a material from eLabFTW using its elabid, then returns an object of the Material class. Calls a material from eLabFTW using its elabid, then returns an object of the Material class.
If the entry is neither a PLD Target or a Substrate batch returns ValueError. Such entries always have a category_title key with its value matching exactly "PLD Target" or "Substrate". If the entry is neither a PLD Target or a Substrate batch returns ValueError.
Such entries always have a category_title key with its value matching exactly "PLD Target" or "Substrate".
Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted. Because of an old typo, the value "Subtrate" (second 's' is missing) is also accepted.
arg: elabid: int eLabFTW internal id of the selected resource.
""" """
try: try:
material_data = APIHandler(apikey).get_entry_from_elabid( material_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="items" elabid, entryType="items"
) )
material_category = material_data.get("category_title") material_category = material_data.get("category_title")
@@ -57,14 +65,17 @@ def call_material_from_elabid(elabid):
def call_layers_from_list(elabid_list): def call_layers_from_list(elabid_list):
""" """
Calls a list of (PLD deposition) experiments from eLabFTW using their elabid - which means the input must be a list of integers instead of a single one - then returns a list of Layer-class objects. Calls a list of (PLD deposition) experiments from eLabFTW through their elabid's, then returns a list of Layer-class objects.
If one of the entries is not related to a deposition layer (category_title not matching exactly "PLD Deposition") that entry is skipped, with no error raised. If one of the entries is not related to a deposition layer (category_title not matching exactly "PLD Deposition")
that entry is skipped, with no error raised.
Arg: elabid_list: list(int): list of eLabFTW experiments.
""" """
list_of_layers = [] list_of_layers = []
for elabid in elabid_list: for elabid in elabid_list:
try: try:
layer_data = APIHandler(apikey).get_entry_from_elabid( layer_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="experiments" elabid, entryType="experiments"
) )
if not layer_data.get("category_title") == "PLD Deposition": if not layer_data.get("category_title") == "PLD Deposition":
@@ -80,15 +91,46 @@ def call_layers_from_list(elabid_list):
+ str(e) + str(e)
+ f"\nPlease solve the problem before retrying." + f"\nPlease solve the problem before retrying."
+ "\n\n" + "\n\n"
+ f"Last resource attempted to call: {ELABFTW_API_URL}/experiments/{elabid}" + f"Last resource attempted to call at base url: /experiments/{elabid}"
) )
return list_of_layers # list of Layer-class objects return list_of_layers # list of Layer-class objects
def call_proposal_from_elabid(elabid):
"""
Calls a proposal item from eLabFTW using their elabid and creates a Proposal-class object.
Returns the proposal's name (method Proposal.name -> str)
If the name starts with "Proposal " (space included) that gets omitted from the output.
Arg: elabid: int eLabFTW internal id of the selected resource.
"""
try:
proposal_data = APIHandler(api_key).get_entry_from_elabid(
elabid, entryType="items"
)
proposal_category = proposal_data.get("category_title")
# TO-DO: correct this typo on elabftw: Subtrate → Substrate.
if (
"Proposal" not in proposal_category
): # to avoid that same old problem with trailing spaces
print(f"Category of the resource: {proposal_category}.")
raise ValueError(
f"The referenced resource (elabid = {elabid}) is not a proposal."
)
else:
proposal = Proposal(proposal_data)
except ConnectionError as e:
raise ConnectionError(e)
return proposal.name # String
def chain_entrypoint_to_batch(sample_object): def chain_entrypoint_to_batch(sample_object):
""" """
Takes an Entrypoint-class object, looks at its .batch_elabid attribute and returns a Material-class object containing data on the substrate batch associated to the starting sample. Takes an Entrypoint-class object, looks at its .batch_elabid attribute and retrieves data on the substrate batch associated to the starting sample.
Returns a Material-class object.
Arg: sample_object: Entrypoint: Entrypoint-class object.
Dependency: call_material_from_elabid. Dependency: call_material_from_elabid.
""" """
material_elabid = sample_object.batch_elabid material_elabid = sample_object.batch_elabid
@@ -98,10 +140,11 @@ def chain_entrypoint_to_batch(sample_object):
def chain_entrypoint_to_layers(sample_object): def chain_entrypoint_to_layers(sample_object):
""" """
Takes an Entrypoint-class object, looks at its .linked_experiments_elabid attribute (list) and returns a list of Layer-class objects containing data on the deposition layers associated to the starting sample - using the function call_layers_from_list. Takes an Entrypoint-class object, looks at its .linked_experiments_elabid attribute (list) and
retrieves data on the deposition layers associated to the starting sample.
The list is sorted by progressive layer number (layer_number attribute). Returns a list of Layer-class objects, sorted by progressive layer number (layer_number attribute).
Arg: sample_object: Entrypoint: Entrypoint-class object.
Dependency: call_layers_from_list. Dependency: call_layers_from_list.
""" """
linked_experiments_elabid = ( linked_experiments_elabid = (
@@ -114,8 +157,10 @@ def chain_entrypoint_to_layers(sample_object):
def chain_layer_to_target(layer_object): def chain_layer_to_target(layer_object):
""" """
Takes a Layer-class object, looks at its .target_elabid attribute and returns a Material-class object containing data on the PLD target used in the deposition of said layer. Takes a Layer-class object, looks at its .target_elabid attribute and retrieves data on the PLD target used in the deposition of said layer.
Returns a Material-class object.
Arg: layer_object: Layer: Layer-class object.
Dependency: call_material_from_elabid. Dependency: call_material_from_elabid.
""" """
target_elabid = layer_object.target_elabid target_elabid = layer_object.target_elabid
@@ -125,16 +170,23 @@ def chain_layer_to_target(layer_object):
def deduplicate_instruments_from_layers(layers): def deduplicate_instruments_from_layers(layers):
""" """
Takes a list of Layer-class objects and for each layer gets the instruments used (laser, depo chamber and RHEED), returns dictionary with one item per category. This means that if more layers share the same instruments it returns a dictionary with just their names as strings (no lists or sub-dictionaries). For each layer gets the instruments used (laser, depo chamber and RHEED).
Creates three sets with all the instruments used regardless of the layer they've been used for.
Turns the sets into three strings (joined with commas), then returns a dictionary in the format:
{
"laser_system": "Laser A, Laser B...",
"deposition_chamber": "DC A, DC B...",
"rheed_system": RHEED A, RHEED B..."
}
If different layers have different instruments (e.g. laser systems) the user is prompted to only select one. Arg: layers: list(Layer): List of Layer-class objects.
""" """
lasers = [] lasers = []
chambers = [] chambers = []
rheeds = [] rheeds = []
elegant_dict = {} elegant_dict = {}
for lyr in layers: for lyr in layers:
instruments = lyr.get_instruments(apikey) instruments = lyr.get_instruments(api_key)
lasers.append(instruments["laser_system"]) lasers.append(instruments["laser_system"])
chambers.append(instruments["deposition_chamber"]) chambers.append(instruments["deposition_chamber"])
rheeds.append(instruments["rheed_system"]) rheeds.append(instruments["rheed_system"])
@@ -152,56 +204,89 @@ def deduplicate_instruments_from_layers(layers):
"deposition_chamber": ", ".join(ded_chambers), "deposition_chamber": ", ".join(ded_chambers),
"rheed_system": ", ".join(ded_rheeds), "rheed_system": ", ".join(ded_rheeds),
} # dictionary's name is a joke } # dictionary's name is a joke
# updated_dict = {} # use this for containing the final dataset
# for ded in elegant_dict:
# if len(elegant_dict[ded]) == 0:
# # if len of list is 0 - empty list - raise error
# raise IndexError(f"Missing data: no Laser System, Chamber and/or RHEED System is specified in any of the Deposition-type experiments related to this sample. Fix this on eLabFTW before retrying. Affected list: {ded}.")
# elif len(elegant_dict[ded]) > 1:
# # if len of list is > 1 - too many values - allow the user to pick one
# print("Warning: different instruments have been used for different layers - which is currently not allowed.")
# # there's a better way to do this but I can't remember now for the life of me...
# i = 0
# while i < len(elegant_dict[ded]):
# print(f"{i} - {elegant_dict[ded][i]}")
# i += 1
# ans = None
# while not type(ans) == int or not ans in range(0, len(elegant_dict[ded])):
# ans = input("Please pick one of the previous (0, 1, ...) [default = 0]: ") or "0"
# if ans.isdigit():
# ans = int(ans)
# continue # unnecessary?
# updated_dict[ded] = elegant_dict[ded][ans]
# elif elegant_dict[ded][0] in ["", 0, None]:
# # if len is 1 BUT value is "", 0 or None raise error
# raise ValueError(f"Missing data: a Laser System, Chamber and/or RHEED System which is specified across all the Deposition-type experiments related to this sample is either empty or invalid. Fix this on eLabFTW before retrying. Affected list: {ded}.")
# else:
# # if none of the previous (only 1 value), that single value is used
# updated_dict[ded] = elegant_dict[ded][0]
# instruments_used_dict = {
# "laser_system": updated_dict["Laser Systems"],
# "deposition_chamber": updated_dict["Deposition Chamber"],
# "rheed_system": updated_dict["RHEED Systems"],
# }
return elegant_dict return elegant_dict
### OLD CODE
# if 0 in [ len(i) for i in elegant_list ]: def select_rheed_data(layer):
# # i.e. if length of one of the lists in elegant_list is zero (missing data): """
# raise IndexError("Missing data: no Laser System, Chamber and/or RHEED System is specified in any of the Deposition-type experiments related to this sample.") Takes a Layer-class object and selects the attachments to use to create the RHEED dataset for the NeXus file.
# if not all([ len(i) == 1 for i in elegant_list ]):
# print("Warning: different instruments have been used for different layers - which is currently not allowed.") There are two categories of attachments considered: text-files and pictures.
# # for every element in elegant list check if len > 1 and if it is The only accepted formats are ".txt", ".csv" and ".tsv" for the first ones, and ".png" or ".bmp" for the others.
# print("Selecting the first occurence for every category...") The function is extension-sensitive, and only one attachment for each category will be downloaded.
###
# lasers = { f"layer_{lyr.layer_number}": lyr.laser_system for lyr in layers } If there are more than one attachment for each category, the user is prompted to select one of them from a list.
# chambers = { f"layer_{lyr.layer_number}": lyr.deposition_chamber for lyr in layers } If there are no attachments for a category the function will return {} (empty dictionary) for that category.
# rheeds = { f"layer_{lyr.layer_number}": lyr.rheed_system for lyr in layers }
# instruments_used_dict = { Returns the set: (rheed_data_file, rheed_image_file). Both variables are dictionaries in the following format:
# "laser_system": lasers, {
# "deposition_chamber": chambers, "fullname": real_name (with extension),
# "rheed_system": rheeds, "hashname": long_name (with extension),
# } "related_experiment": elabid
}
"""
n = layer.layer_number
textual_uploads = layer.fetch_textual_uploads()
images = layer.fetch_images()
# Check for length. Three cases:
# 1. len is 0, no file of this category → return {}
# 2. len is more than 1, user must select
# 3. len is 1, God's in his heaven, all's right with the world
if len(textual_uploads) == 0:
rheed_data_file = {}
elif len(textual_uploads) > 1:
# prompt user to select from list
print(f"Attention: Layer {n} contains multiple TEXTUAL attachments.\n")
print("These are used to populate the 'RHEED intensities' dataset.")
print("=== USER INTERVENTION REQUIRED ===")
for id in textual_uploads:
print(f"{id} - {textual_uploads[id]}")
ans = None
while not type(ans) == int or not ans in range(0, len(textual_uploads)):
ans = (
input(
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
)
or 0
)
if ans.isdigit():
ans = int(ans)
continue
rheed_data_file = textual_uploads[ans] # still a dictionary
else:
rheed_data_file = textual_uploads[
next(iter(textual_uploads))
] # this prism of pork gets the value of the only key in the dictionary
# it's proof like no other that my code is human-generated, and that I suck at coding. It's hubris manifest.
# As above so below
if len(images) == 0:
rheed_image_file = {}
elif len(images) > 1:
# prompt user to select from list
print(f"Attention: Layer {n} contains multiple PNG/BMP attachments.\n")
print("These are used to create the RHEED heatmap.")
print("=== USER INTERVENTION REQUIRED ===")
for id in images:
print(f"{id} - {images[id]}")
ans = None
while not type(ans) == int or not ans in range(0, len(images)):
ans = (
input(
"Select one of the attachments from the list (0, 1, ...) [default = 0]: "
)
or 0
)
if ans.isdigit():
ans = int(ans)
continue
rheed_image_file = images[ans] # still a dictionary
else:
rheed_image_file = images[next(iter(images))]
return (rheed_data_file, rheed_image_file)
def analyse_rheed_data(data): def analyse_rheed_data(data):
@@ -213,13 +298,15 @@ def analyse_rheed_data(data):
Time Layer1_Int1 Layer1_Int2 Layer1_Int3 Time Layer1_Int1 Layer1_Int2 Layer1_Int3
----- -----
Distinct ValueErrors are raised if: Exceptions:
1. Distinct ValueErrors are raised if:
* The array is not 2-dimensional; * The array is not 2-dimensional;
* The total number of columns does not equate exactly 1+3 (= 4). * The total number of columns does not equate at least 1+3 (= 4).
2. If the file has more than 4 columns the function prints a warning, then ignores the other columns.
3. No exception is made for files where the first column is not the time, or the others are not intensities.
Time is expressed in seconds, intensities are adimensional on 8 bits (min. 0, max. 255). Time is expressed in seconds, intensities are adimensional on 8 bits (min. 0, max. 255).
# TO-DO: complete this description...
Written with help from DeepSeek. Written with help from DeepSeek.
""" """
# Verifying the format of the input file: # Verifying the format of the input file:
@@ -236,7 +323,7 @@ def analyse_rheed_data(data):
raise ValueError( raise ValueError(
f"Insufficient number of columns: expected 4, got n_cols = {n_cols}." f"Insufficient number of columns: expected 4, got n_cols = {n_cols}."
) )
n_time_points = data.shape[0] # n_time_points = data.shape[0]
# Get time (all rows of col 0) as Float64: # Get time (all rows of col 0) as Float64:
time = data[:, 0].astype( time = data[:, 0].astype(
@@ -254,14 +341,22 @@ def analyse_rheed_data(data):
def make_nexus_schema_dictionary(substrate_object, layers): def make_nexus_schema_dictionary(substrate_object, layers):
""" """
Main function, takes all the other functions to reconstruct the full dataset. Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function) and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function), returns dictionary with the same schema as the NeXus standard for PLD fabrications. Main function, takes all the other functions to reconstruct the full dataset.
Takes a Substrate-class object (output of the chain_entrypoint_to_batch() function),
and a list of Layer-class objects (output of the chain_entrypoint_to_layers() function).
Returns dictionary with the same schema as the NeXus standard for PLD fabrications.
Args:
substrate_object: Substrate: Substrate-class object.
layers: list(Layer): List of Layer-class objects.
""" """
instruments = deduplicate_instruments_from_layers(layers) instruments = deduplicate_instruments_from_layers(layers)
pld_fabrication = { pld_fabrication = {
"sample": { "sample": {
"substrate": { "substrate": {
"name": substrate_object.name, "name": substrate_object.name,
"chemical_formula": substrate_object.get_compound_formula(apikey), "chemical_formula": substrate_object.get_compound_formula(api_key),
"orientation": substrate_object.orientation, "orientation": substrate_object.orientation,
"miscut_angle": { "miscut_angle": {
"value": substrate_object.miscut_angle, "value": substrate_object.miscut_angle,
@@ -280,14 +375,16 @@ def make_nexus_schema_dictionary(substrate_object, layers):
"multilayer": {}, "multilayer": {},
}, },
"instruments_used": instruments["multilayer"], "instruments_used": instruments["multilayer"],
"rheed_data": {},
} }
multilayer = pld_fabrication["sample"]["multilayer"] multilayer = pld_fabrication["sample"]["multilayer"]
rheed_data = pld_fabrication["rheed_data"]
for layer in layers: for layer in layers:
name = "layer_" + layer.layer_number name = "layer_" + layer.layer_number
target_object = chain_layer_to_target(layer) target_object = chain_layer_to_target(layer)
target_dict = { target_dict = {
"name": target_object.name, "name": target_object.name,
"chemical_formula": target_object.get_compound_formula(apikey), "chemical_formula": target_object.get_compound_formula(api_key),
"description": target_object.description, "description": target_object.description,
"shape": target_object.shape, "shape": target_object.shape,
"dimensions": target_object.dimensions, "dimensions": target_object.dimensions,
@@ -381,10 +478,27 @@ def make_nexus_schema_dictionary(substrate_object, layers):
}, },
"instruments_used": instruments[name], "instruments_used": instruments[name],
} }
rheed_data[name] = {
"layer_number": layer.layer_number,
"data": select_rheed_data(
layer
), # tuple: (rheed_data_file, rheed_image_file)
}
return pld_fabrication return pld_fabrication
def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matrix=None): def build_nexus_file(pld_fabrication, output_path="output/nffa-di_unnamed.h5"):
"""
The function which actually builds the NeXus file for *PLD DEPOSITIONS*.
Saves the file in the specified directory.
Args:
pld_fabrication: A dictionary with a specific schema, one only the function
make_nexus_schema_dictionary should make.
output_path: The full path to the output file, including filename complete with extension.
It's a string, which should be produced with os.path.
Default value is: "output/nffa-di_unnamed.h5" - which is NOT NFFA-DI compliant.
"""
# NOTE: look at the mail attachment from Emiliano... # NOTE: look at the mail attachment from Emiliano...
with h5py.File(output_path, "w") as f: with h5py.File(output_path, "w") as f:
nx_pld_entry = f.create_group("pld_fabrication") nx_pld_entry = f.create_group("pld_fabrication")
@@ -650,75 +764,46 @@ def build_nexus_file(pld_fabrication, output_path, rheed_osc=None, heatmap_matri
nx_rheed = nx_pld_entry.create_group("rheed_data") nx_rheed = nx_pld_entry.create_group("rheed_data")
nx_rheed.attrs["NX_class"] = "NXdata" nx_rheed.attrs["NX_class"] = "NXdata"
if rheed_osc is not None: rheed_data = pld_fabrication["rheed_data"]
# Asse temporale for layer in rheed_data:
t_ds = nx_rheed.create_dataset("time", data=rheed_osc["time"]) nx_rheed_layer = nx_rheed.create_group(layer)
t_ds.attrs["units"] = "s"
t_ds.attrs["long_name"] = "Time"
# Intensità: shape (n_layers, n_timepoints, 3) layer_dict = rheed_data[layer]
i_ds = nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"]) n = layer_dict["layer_number"]
i_ds.attrs["units"] = "a.u." rheed_data_file = layer_dict["data"][0] # first in the tuple
i_ds.attrs["long_name"] = "RHEED Intensity" rheed_image_file = layer_dict["data"][1] # second in the tuple
handler = APIHandler(api_key)
# Attributi NXdata — notazione NeXus 3.x corretta # TO-DO: maybe make a dedicated function???
nx_rheed.attrs["signal"] = "intensity" data_path = None
nx_rheed.attrs["axes"] = [ image_path = None
".",
"time",
".",
] # solo l'asse 1 (time) è denominato
nx_rheed.attrs["time_indices"] = np.array([1], dtype=np.int32)
# ###########
# nx_rheed = nx_pld_entry.create_group("rheed_data")
# nx_rheed.attrs["NX_class"] = "NXdata"
# nx_rheed.create_dataset("time", data=rheed_osc["time"]) if rheed_data_file != {}:
# nx_rheed["time"].attrs["units"] = "s" try:
elabid = rheed_data_file["related_experiment"]
upload_id = rheed_data_file["id"]
except KeyError as ke:
raise KeyError(
f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
)
data_path = handler.download_attachment_to_disk(
elabid=elabid, upload_id=upload_id
)
# nx_rheed.create_dataset("intensity", data=rheed_osc["intensity"]) if rheed_image_file != {}:
# #nx_rheed["intensity"].attrs["units"] = "counts" try:
# nx_rheed["intensity"].attrs["long_name"] = "RHEED intensity" upload_id = rheed_image_file["id"]
# nx_rheed.attrs["signal"] = "intensity" elabid = rheed_image_file["related_experiment"]
# nx_rheed.attrs["axes"] = "layer:time:channel" except KeyError as ke:
# nx_rheed.attrs["layer_indices"] = [0] # asse layer raise KeyError(
# nx_rheed.attrs["time_indices"] = [1] # asse tempo f"Missing key in your file: {rheed_data_file.get('filename') or '<missing name>'}: {ke}"
# nx_rheed.attrs["channel_indices"] = [2] )
if heatmap_matrix is not None: image_path = handler.download_attachment_to_disk(
heatmap = nx_rheed.create_dataset("diffraction_image", data=heatmap_matrix) elabid=elabid, upload_id=upload_id
heatmap.attrs["long_name"] = "Diffraction Image" )
heatmap.attrs["units"] = "a.u."
# this is of my own initiative. good???
heatmap.attrs["interpretation"] = "spectrum"
# suggested by DeepSeek, useful? probably not.
# heatmap.attrs["suggested_colormap"] = "inferno"
# heatmap.attrs["scale_min"] = 0.0
# heatmap.attrs["scale_max"] = 1.0
return
if data_path and os.path.isfile(data_path):
if __name__ == "__main__": with open(data_path, "r") as o:
# TO-DO: place the API base URL somewhere else.
ELABFTW_API_URL = "https://elabftw.fisica.unina.it/api/v2"
apikey = getpass("Paste API key here: ")
elabid = input("Enter elabid of your starting sample [default = 1111]: ") or 1111
data = APIHandler(apikey).get_entry_from_elabid(elabid)
sample = Entrypoint(data)
sample_name = sample.name.strip().replace(" ", "_")
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
n_layers = len(layers) # total number of layers on the sample
result = make_nexus_schema_dictionary(substrate_object, layers)
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
with open(f"output/sample-{sample_name}.json", "w") as f:
json.dump(result, f, indent=3)
# TO-DO: remove the hard-coded path of the RWA file
# ideally the script should download a TXT/CSV file from each layer
# (IF PRESENT ←→ also handle missing file error)
# and merge all data in a single file to analyse it
# WARNING: fails if file is missing
with open("tests/Realtime_Window_Analysis.txt", "r") as o:
osc = np.loadtxt(o, delimiter="\t") osc = np.loadtxt(o, delimiter="\t")
try: try:
rheed_osc = ( rheed_osc = (
@@ -728,18 +813,93 @@ if __name__ == "__main__":
raise ValueError( raise ValueError(
f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities." f"Error with function analyse_rheed_data. {ve}\nPlease make sure the Realtime Window Analysis file is exactly 4 columns wide - where the first column represents time and the others are RHEED intensities."
) )
# This one tries to open a png image. if rheed_osc is not None:
# Emiliano said to keep it to one image per layer tops. # Time axis (needed?)
# In this test I will only consider one image. t_ds = nx_rheed_layer.create_dataset("time", data=rheed_osc["time"])
# TO-DO: make it format-agnostic. If not possible, make it PNG-only. t_ds.attrs["units"] = "s"
if os.path.isfile("tests/LAO_16min50s_736C_STO.bmp"): # if BMP t_ds.attrs["long_name"] = "Time"
# if os.path.isfile("tests/LAO_16min50s_736C_STO.png"): # if PNG
img = Image.open("tests/LAO_16min50s_736C_STO.bmp").convert("L") # Intensity shape (n_layers, n_timepoints, 3)
mx = np.array(img, dtype=np.uint8) i_ds = nx_rheed_layer.create_dataset(
# mx = mx.astype(np.float32) / 255.0 # consider deleting??? "intensity", data=rheed_osc["intensity"]
build_nexus_file(
result,
output_path=f"output/sample-{sample_name}-nexus.h5",
rheed_osc=rheed_osc,
heatmap_matrix=mx,
) )
i_ds.attrs["units"] = "a.u."
i_ds.attrs["long_name"] = "RHEED Intensity"
# NXdata attributes — NeXus 3.x notation
nx_rheed_layer.attrs["signal"] = "intensity"
nx_rheed_layer.attrs["axes"] = [
".",
"time",
".",
] # only time axis (1) is named
nx_rheed_layer.attrs["time_indices"] = np.array([1], dtype=np.int32)
if image_path and os.path.isfile(image_path):
img = Image.open(image_path).convert("L")
heatmap_matrix = np.array(img, dtype=np.uint8) # or None
# heatmap_matrix = heatmap_matrix.astype(np.float32) / 255.0 # toggle to normalize matrix values
if heatmap_matrix is not None:
heatmap = nx_rheed_layer.create_dataset(
"diffraction_image", data=heatmap_matrix
)
heatmap.attrs["long_name"] = "Diffraction Image"
heatmap.attrs["units"] = "a.u."
heatmap.attrs["interpretation"] = "spectrum"
return
# TO-DO: ↓↓↓ comment cleanup ↓↓↓
#
# here's what we gon do: (to be read with the voice of Mike from Breaking Bad)
# 1. rheed_osc and heatmap_matrix are NOT given in input to the function so no need for checking that
# 2. loop through the layers, each with its elabid and metadata
# 2a. read said metadata for each layer, print list of txt and png files (dedicated Layer class methods)
# 2b. prompt the user for file choice (1 text file per layer - in tsv format, 1 picture file - either png [default] or bmp)
# 2c. download the chosen file
# 2d. with chosen file do analysis as before
# 3. the schema should be:
# * /rheed_data
# * /layer_n
# * time (rheed_osc)
# * intensity (rheed_osc)
# * diffraction_image (heatmap_matrix)
# first problem is probably finding out how to recover the following meta from the original Layer object:
# * Layer.elabid - integer
# * Layer.fetch_textual_uploads() - dictionary
# * Layer.fetch_images() - dictionary
if __name__ == "__main__":
load_dotenv()
api_key = os.getenv("api_key") or getpass("Paste API key here: ")
elabid = (
os.getenv("elabid")
or input("Enter elabid of your starting sample [default = 1111]: ")
or 1111
)
handler = APIHandler(api_key)
data = handler.get_entry_from_elabid(elabid)
sample = Entrypoint(data)
sample_name = sample.name.strip().replace(" ", "_")
operative_unit = os.getenv("operative_unit") or None
if sample.proposal:
sample_proposal = call_proposal_from_elabid(sample.proposal)
else:
sample_proposal = None
substrate_object = chain_entrypoint_to_batch(sample) # Substrate-class object
layers = chain_entrypoint_to_layers(sample) # list of Layer-class objects
n_layers = len(layers) # total number of layers on the sample
# print(make_nexus_schema_dictionary(substrate_object, layers)) # debug
fn_base = (
"nffa-di_"
+ (f"{sample_proposal}_" if sample_proposal else "")
+ (f"{operative_unit}_" if operative_unit else "")
+ "_"
+ sample_name
)
result = make_nexus_schema_dictionary(substrate_object, layers)
with open(f"output/{fn_base}.json", "w") as f:
json.dump(result, f, indent=3)
build_nexus_file(result, output_path=f"output/{fn_base}.h5")