Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from murfey.client.context import Context
from murfey.client.contexts.atlas import AtlasContext
from murfey.client.contexts.clem import CLEMContext
from murfey.client.contexts.fib import FIBContext
from murfey.client.contexts.spa import SPAModularContext
from murfey.client.contexts.spa_metadata import SPAMetadataContext
from murfey.client.contexts.tomo import TomographyContext
Expand Down Expand Up @@ -125,20 +126,62 @@ def _find_context(self, file_path: Path) -> bool:
"""
logger.debug(f"Finding context using file {str(file_path)!r}")

# -----------------------------------------------------------------------------
# CLEM workflow checks
# Look for LIF and XLIF files
if file_path.suffix in (".lif", ".xlif"):
# -----------------------------------------------------------------------------
if (
# Look for LIF and XLIF files
file_path.suffix in (".lif", ".xlif")
or (
# TIFF files have "--Stage", "--Z", and/or "--C" in their file stem
file_path.suffix in (".tiff", ".tif")
and any(
pattern in file_path.stem for pattern in ("--Stage", "--Z", "--C")
)
)
):
self._context = CLEMContext("leica", self._basepath, self._token)
return True
# Look for TIFF files associated with CLEM workflow
# CLEM TIFF files will have "--Stage", "--Z", and/or "--C" in their file stem
if any(
pattern in file_path.stem for pattern in ("--Stage", "--Z", "--C")
) and file_path.suffix in (".tiff", ".tif"):
self._context = CLEMContext("leica", self._basepath, self._token)

# -----------------------------------------------------------------------------
# FIB workflow checks
# -----------------------------------------------------------------------------
# Determine if it's from AutoTEM
if (
# AutoTEM generates a "ProjectData.dat" file
file_path.name == "ProjectData.dat"
or (
# Images are stored in ".../Sites/Lamella (N)/..."
any(path.startswith("Lamella") for path in file_path.parts)
and "Sites" in file_path.parts
)
):
self._context = FIBContext("autotem", self._basepath, self._token)
return True

# Determine if it's from Maps
if (
# Electron snapshot metadata in "EMproject.emxml"
file_path.name == "EMproject.emxml"
or (
# Key images are stored in ".../LayersData/Layer/..."
all(path in file_path.parts for path in ("LayersData", "Layer"))
)
):
self._context = FIBContext("maps", self._basepath, self._token)
return True

# Determine if it's from Meteor
if (
# Image metadata stored in "features.json" file
file_path.name == "features.json" or ()
):
self._context = FIBContext("meteor", self._basepath, self._token)
return True

# -----------------------------------------------------------------------------
# Tomography and SPA workflow checks
# -----------------------------------------------------------------------------
if "atlas" in file_path.parts:
self._context = AtlasContext(
"serialem" if self._serialem else "epu", self._basepath, self._token
Expand Down Expand Up @@ -321,6 +364,12 @@ def _analyse(self):
)
self.post_transfer(transferred_file)

elif isinstance(self._context, FIBContext):
logger.debug(
f"File {transferred_file.name!r} will be processed as part of the FIB workflow"
)
self.post_transfer(transferred_file)

elif isinstance(self._context, AtlasContext):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)
Expand Down
221 changes: 214 additions & 7 deletions src/murfey/client/contexts/fib.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,174 @@
from __future__ import annotations

import logging
import re
import threading
from datetime import datetime
from pathlib import Path
from typing import Dict, List, NamedTuple, Optional
from typing import NamedTuple
from xml.etree import ElementTree as ET

import xmltodict

from murfey.client.context import Context
from murfey.client.instance_environment import MurfeyInstanceEnvironment
from murfey.util.client import capture_post
from murfey.util.client import capture_post, get_machine_config_client

logger = logging.getLogger("murfey.client.contexts.fib")

lock = threading.Lock()


class Lamella(NamedTuple):
name: str
number: int
angle: Optional[float] = None
angle: float | None = None


class MillingProgress(NamedTuple):
file: Path
timestamp: float


class ElectronSnapshotMetadata(NamedTuple):
slot_num: int | None # Which slot in the FIB-SEM it is from
image_num: int
image_dir: str # Partial path from EMproject.emxml parent to the image
status: str
x_len: float | None
y_len: float | None
z_len: float | None
x_center: float | None
y_center: float | None
z_center: float | None
extent: tuple[float, float, float, float] | None
rotation_angle: float | None


def _number_from_name(name: str) -> int:
return int(
name.strip().replace("Lamella", "").replace("(", "").replace(")", "") or 1
"""
In the AutoTEM and Maps workflows for the FIB, the sites and images are
auto-incremented with parenthesised numbers (e.g. "Lamella (2)"), with
the first site/image typically not having a number.

This function extracts the number from the file name, and returns 1 if
no such number is found.
"""
return (
int(match.group(1))
if (match := re.search(r"^[\w\s]+\((\d+)\)$", name)) is not None
else 1
)


def _get_source(file_path: Path, environment: MurfeyInstanceEnvironment) -> Path | None:
"""
Returns the Path of the file on the client PC.
"""
for s in environment.sources:
if file_path.is_relative_to(s):
return s
return None


def _file_transferred_to(
environment: MurfeyInstanceEnvironment, source: Path, file_path: Path, token: str
) -> Path | None:
"""
Returns the Path of the transferred file on the DLS file system.
"""
machine_config = get_machine_config_client(
str(environment.url.geturl()),
token,
instrument_name=environment.instrument_name,
)

# Construct destination path
base_destination = Path(machine_config.get("rsync_basepath", "")) / Path(
environment.default_destinations[source]
)
# Add visit number to the path if it's not present in default destination
if environment.visit not in environment.default_destinations[source]:
base_destination = base_destination / environment.visit
destination = base_destination / file_path.relative_to(source)
return destination


def _parse_electron_snapshot_metadata(xml_file: Path):
metadata_dict = {}
root = ET.parse(xml_file).getroot()
datasets = root.findall(".//Datasets/Dataset")
for dataset in datasets:
# Extract all string-based values
name, image_dir, status = [
node.text
if ((node := dataset.find(node_path)) is not None and node.text is not None)
else ""
for node_path in (
".//Name",
".//FinalImages",
".//Status",
)
]

# Extract all float values
cx, cy, cz, x_len, y_len, z_len, rotation_angle = [
float(node.text)
if ((node := dataset.find(node_path)) is not None and node.text is not None)
else None
for node_path in (
".//BoxCenter/CenterX",
".//BoxCenter/CenterY",
".//BoxCenter/CenterZ",
".//BoxSize/SizeX",
".//BoxSize/SizeY",
".//BoxSize/SizeZ",
".//RotationAngle",
)
]

# Calculate the extent of the image
extent = None
if (
cx is not None
and cy is not None
and x_len is not None
and y_len is not None
):
extent = (
x_len - (cx / 2),
x_len + (cx / 2),
y_len - (cy / 2),
y_len - (cy / 2),
)

# Append metadata for current site to dict
metadata_dict[name] = ElectronSnapshotMetadata(
slot_num=None if cx is None else (1 if cx < 0 else 2),
image_num=_number_from_name(name),
status=status,
image_dir=image_dir,
x_len=x_len,
y_len=y_len,
z_len=z_len,
x_center=cx,
y_center=cy,
z_center=cz,
extent=extent,
rotation_angle=rotation_angle,
)
return metadata_dict


class FIBContext(Context):
def __init__(self, acquisition_software: str, basepath: Path, token: str):
super().__init__("FIB", acquisition_software, token)
self._basepath = basepath
self._milling: Dict[int, List[MillingProgress]] = {}
self._lamellae: Dict[int, Lamella] = {}
self._milling: dict[int, list[MillingProgress]] = {}
self._lamellae: dict[int, Lamella] = {}
self._electron_snapshots: dict[str, Path] = {}
self._electron_snapshot_metadata: dict[str, ElectronSnapshotMetadata] = {}
self._electron_snapshots_submitted: set[str] = set()

def post_transfer(
self,
Expand All @@ -45,6 +177,13 @@ def post_transfer(
**kwargs,
):
super().post_transfer(transferred_file, environment=environment, **kwargs)
if environment is None:
logger.warning("No environment passed in")
return

# -----------------------------------------------------------------------------
# AutoTEM
# -----------------------------------------------------------------------------
if self._acquisition_software == "autotem":
parts = transferred_file.parts
if "DCImages" in parts and transferred_file.suffix == ".png":
Expand Down Expand Up @@ -123,3 +262,71 @@ def post_transfer(
self._lamellae[number]._replace(
angle=float(milling_angle.split(" ")[0])
)
# -----------------------------------------------------------------------------
# Maps
# -----------------------------------------------------------------------------
elif self._acquisition_software == "maps":
# Electron snapshot metadata file
if transferred_file.name == "EMproject.emxml":
# Extract all "Electron Snapshot" metadata and store it
self._electron_snapshot_metadata = _parse_electron_snapshot_metadata(
transferred_file
)
# If dataset hasn't been transferred, register it
for dataset_name in list(self._electron_snapshot_metadata.keys()):
if dataset_name not in self._electron_snapshots_submitted:
if dataset_name in self._electron_snapshots:
logger.info(f"Registering {dataset_name!r}")

## Workflow to trigger goes here

# Clear old entry after triggering workflow
self._electron_snapshots_submitted.add(dataset_name)
with lock:
self._electron_snapshots.pop(dataset_name, None)
self._electron_snapshot_metadata.pop(dataset_name, None)
else:
logger.debug(f"Waiting for image for {dataset_name}")
# Electron snapshot image
elif (
"Electron Snapshot" in transferred_file.name
and transferred_file.suffix in (".tif", ".tiff")
):
# Store file in Context memory
dataset_name = transferred_file.stem
if not (source := _get_source(transferred_file, environment)):
logger.warning(f"No source found for file {transferred_file}")
return
if not (
destination_file := _file_transferred_to(
environment=environment,
source=source,
file_path=transferred_file,
token=self._token,
)
):
logger.warning(
f"File {transferred_file.name!r} not found on storage system"
)
return
self._electron_snapshots[dataset_name] = destination_file

if dataset_name not in self._electron_snapshots_submitted:
# If the metadata and image are both present, register dataset
if dataset_name in list(self._electron_snapshot_metadata.keys()):
logger.info(f"Registering {dataset_name!r}")

## Workflow to trigger goes here

# Clear old entry after triggering workflow
self._electron_snapshots_submitted.add(dataset_name)
with lock:
self._electron_snapshots.pop(dataset_name, None)
self._electron_snapshot_metadata.pop(dataset_name, None)
else:
logger.debug(f"Waiting for metadata for {dataset_name}")
# -----------------------------------------------------------------------------
# Meteor
# -----------------------------------------------------------------------------
elif self._acquisition_software == "meteor":
pass
Loading
Loading