Source code for autoemxsp.core.EM_particle_finder

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Automated Electron Microscopy (EM) Particle Analysis Toolkit

This module provides the EM_Particle_Finder class for particle detection, selection of X-ray spectra (EDS, WDS) acquisition spots,
and particle size/statistics analysis.
It is designed to interface with an EM driver and an EM_Controller object to streamline the workflow from image acquisition to data export.

Currently only supports SEM.

Main Class
----------
EM_Particle_Finder
    Automates particle detection, mask generation, and X-ray spectra spot selection on detected particles.
    Supports both fully automated and manual (user-guided) collection modes.
    Provides methods for collecting particle statistics and managing frame navigation.

Example Usage
-------------
# EDS spot acquisition workflow:
>>> EM_controller = EM_Controller(...)
>>> EM_controller.initialise_SEM()
>>> particle_finder = EM_Particle_Finder(EM_controller, PowderMeasurementConfig, ...)
>>> while particle_finder.go_to_next_particle():
...     xy_spot_list = particle_finder.get_XS_acquisition_spots_coord_list()
...     for (x,y) in xy_spot_list:
...         EM_controller.acquire_XS_spectrum()


# Particle analysis workflow:
>>> EM_controller = EM_Controller(...)
>>> EM_controller.initialise_SEM()
>>> particle_finder = EM_Particle_Finder(EM_controller, PowderMeasurementConfig, results_dir="./results")
>>> particle_finder.get_particle_stats(n_par_target=500)

Notes
-----
- Requires an initialised EM_Controller object, unless used in development mode
- Requires a working EM_driver and appropriate hardware configuration, with the following API functions:
    Microscope Status & Image Acquisition
    -------------------------------------
    - is_at_EM
        Boolean attribute; True if running at the actual electron microscope.
    
    - get_image_data(width, height, channel)
        Acquire an image from the microscope with specified dimensions and channels.
    
    - get_frame_width()
        Get the current field of view/frame width (in mm).
    
    Stage & Navigation Control
    --------------------------
    - move_to(x, y)
        Move the microscope stage to the specified (x, y) position.

Created on Wed Jul 31 09:28:07 2024

@author: Andrea
"""
# Standard library imports
import os
import time
import random
import warnings

# Third-party imports
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from typing import List, Optional

# Local project imports
from autoemxsp.utils import (
    Prompt_User,
    draw_scalebar,
    print_double_separator,
)
from autoemxsp.config import (
    PowderMeasurementConfig,
)
import autoemxsp.utils.constants as cnst
from autoemxsp import EM_driver
import autoemxsp.core.particle_segmentation_models as par_seg_models

#%% Electron Microscope Particle Finder class    
[docs] class EM_Particle_Finder: """ Automated particle analysis and X-ray spectra (EDS, WDS) acquisition in an electron microscope (EM). This class provides methods for particle detection, selection of X-ray spectra (EDS, WDS) acquisition spots, and particle size/statistics analysis. It is designed to interface with an EM driver and an EM_Controller object to streamline the workflow from image acquisition to data export. Main Methods ------------ initialise_SEM() Wakes up the SEM microscope, sets measurement parameters, and evaluates locations to scan for particles. go_to_next_particle() Locates and moves to the next detected particle. get_XS_acquisition_spots_coord_list() Determines X-ray spectra (EDS, WDS) acquisition spots on the currently detected particle. get_particle_stats() Scans the full sample, collecting particle size distribution statistics. Examples -------- # EDS spot acquisition workflow: >>> EM_controller = EM_Controller(...) >>> EM_controller.initialise_SEM() >>> particle_finder = EM_Particle_Finder(EM_controller, PowderMeasurementConfig, ...) >>> while particle_finder.go_to_next_particle(): ... xy_spot_list = particle_finder.get_XS_acquisition_spots_coord_list() ... for (x,y) in xy_spot_list: ... EM_controller.acquire_XS_spectrum() # Particle analysis workflow: >>> EM_controller = EM_Controller(...) >>> EM_controller.initialise_SEM() >>> particle_finder = EM_Particle_Finder(EM_controller, PowderMeasurementConfig, results_dir="./results") >>> particle_finder.get_particle_stats(n_par_target=500) Attributes ---------- EM : EM_Controller Reference to the parent EM_Controller instance (must be initialised before use). powder_meas_cfg : PowderMeasurementConfig Configuration object for powder measurement (see dataclass for details). is_manual_particle_selection : bool If True, prompts user to center image around next particle to analyse. results_dir : str or None Directory for saving result images and data. verbose : bool If True, print progress and information to the console. development_mode : bool If True, enables extra visualizations and debug image saving. Internal Attributes ------------------- _sample_ID : str Identifier for the sample. Inherited from EM_Controller. _im_width : int Image width in pixels. Inherited from EM_Controller. _im_height : int Image height in pixels. Inherited from EM_Controller. tot_par_cntr : int Counter for the total number of particles analyzed. analyzed_pars : list(tuple(int, str, float)) List storing tuple containing Particle ID, Frame ID and particles areas (in μm²) of each detected particle. Used only when collecting particles size distribution statistics Notes ----- - This class assumes that an EM_Controller object is already initialised and passed as `EM_controller`. - All configuration validation is performed in the respective dataclasses. - Attributes with a leading underscore are for internal use and should not be accessed directly by users. """ def __init__( self, EM_controller, powder_meas_cfg: PowderMeasurementConfig, is_manual_particle_selection: bool = False, results_dir: Optional[str] = None, verbose: bool = True, development_mode: bool = True ): """ Initialize an EM_Particle_Finder object for automated/manual electron microscopy particle analysis. This class requires that an EM_Controller object has already been fully initialised and passed as `EM_controller`. Parameters ---------- EM_controller : EM_Controller Reference to the parent EM_Controller instance (must be initialised before use). powder_meas_cfg : PowderMeasurementConfig, optional Configuration object for powder measurement (see dataclass for details). is_manual_particle_selection : bool, optional If True, enables manual particle selection mode (default: False). results_dir : str, optional Directory to save result images and data (default: None). verbose : bool, optional If True, print progress and information to the console (default: True). development_mode : bool, optional If True, enables extra visualizations and debug image saving (default: True). Raises ------ None Notes ----- - All configuration validation is performed in the respective dataclasses. - This initializer assumes that configuration dataclasses are valid and complete. - Additional internal attributes are initialized for particle tracking and statistics. """ # Electron microscope controller object self.EM = EM_controller self.powder_meas_cfg = powder_meas_cfg ### Inherit attributes if EM_controller is not None: # EM_controller is set to None when simply processing data self._sample_ID = EM_controller.sample_cfg.ID self._im_width = EM_controller.im_width self._im_height = EM_controller.im_height self.is_manual_particle_selection = is_manual_particle_selection # NOTE: self.tot_par_cntr is initialized below, so _select_par_prompt_title will be set on first use if self.is_manual_particle_selection: self._select_par_prompt_title = f"Select position for particle #{self.tot_par_cntr}" self._select_par_prompt_message = ( "Center the image around the particle you want\n" "to analyse, then click OK." ) # --- General options self.results_dir = results_dir self.verbose = verbose self.development_mode = development_mode # Set whether in code development mode. Triggers visualisations and image saving # --- Variable initializations self.tot_par_cntr = 0 # Keeps track of total number of particles analysed self._fr_par_cntr = 0 self._num_par_in_frame = 0 self.analyzed_pars: List[tuple(float, str)] = [] def _check_EM_controller_initialization(self) -> None: """ Check whether the associated EM_Controller instance is initialized. Raises ------ RuntimeError If the EM_Controller is not initialized. Notes ----- This method should be called before performing any operation that requires an initialized microscope. """ if not self.EM.is_initialized: raise RuntimeError( "EM_Controller is not initialized. Please call initialise_SEM() " "(or initialise_STEM(), if supported) before using this method." ) #%% Particle Navigation # =============================================================================
[docs] def go_to_next_particle(self): ''' Moves the microscope to the next particle, centering and zooming on it. It also re-adjusts brightness, contrast and focus. In automated mode, this function finds the next particle in the current frame. If all particles in the frame have been analyzed, it advances to the next frame and searches for particles there. It then moves the stage to center the next particle and zooms in. In manual mode, it prompts the user for input. Returns ------- bool True if it could successfully move to a particle. False if no more particles are present in the sample or if execution is stopped by the user. ''' if not self.is_manual_particle_selection: self._check_EM_controller_initialization() # Automated search of particles # Check if all (or the max allowed amount) particles have been analysed in the current frames. # If yes, re-calculate particle positions in the next frame if self._fr_par_cntr >= self._num_par_in_frame or self._fr_par_cntr >= self.powder_meas_cfg.max_n_par_per_frame: # Loop until a frame with particles is found were_particles_found = False while not were_particles_found: were_particles_found = self._get_particles_coordinates_in_frame() # Check if all frames in sample have been analysed if were_particles_found is None: return False # Move center of image to the centroid of the particle self.EM.move_to_pos(self.par_pos_abs_mm[self._fr_par_cntr]) # Set frame width to zoom around particle frame_width_mm = self.par_fw_mm[self._fr_par_cntr] self.EM.set_frame_width(frame_width_mm) # Adjust EM settings (focus, contrast, brightness) self.EM.adjust_BCF() # Update counter of particles in current frame self._fr_par_cntr += 1 # Update counter of total number of particles analysed self.tot_par_cntr += 1 else: # Manually look for particle prompt = Prompt_User(self._select_par_prompt_title, self._select_par_prompt_message) prompt.run() if prompt.execution_stopped: # Check if execution was stopped after the loop print("Execution stopped by the user.") return False if prompt.ok_pressed: frame_width_mm = EM_driver.get_frame_width() self.EM.pixel_size_um = frame_width_mm / self._im_width * 10**3 # um self.tot_par_cntr += 1 return True
#%% Particle Segmentation Operations # ============================================================================= def _get_particles_coordinates_in_frame(self, frame_image=None, pixel_size=None, results_dir=None): ''' Detects and extracts center coordinates and widths of suitable particles in the current frame. If running at the microscope, moves to the next frame and collects the image. If an image and pixel size are provided, it uses these instead. The function applies a mask to segment particles, filters them by area, and determines their positions and frame sizes. It also saves a visualization image with circles drawn around detected particles for development purposes. Parameters ---------- frame_image : ndarray, optional Grayscale image of the current frame. If not provided and running at the EM, the image is acquired. pixel_size : float, optional Pixel size in micrometers. Required if not running at the EM. results_dir : str, optional Directory to save results. Required for saving images in development mode. Returns ------- bool or None True if at least one suitable particle was found in the frame. False if no particles were found. None if there are no more frames available. Notes ----- - The function saves a visualization image with detected particles for development. - All commented-out imshow and development lines are preserved for debugging. - Particle positions and frame widths are stored as class attributes for later use. Potential Improvements --------------------- - Exclude particles that are close to larger particles in the direction of the EDS (Energy Dispersive Spectroscopy) detector. Large particles in the path can absorb or scatter X-rays emitted from smaller particles, degrading the quality and accuracy of the spectral signal for those particles. Suggested implementation: - For each detected particle, determine if there are larger particles located "upstream" (i.e., between the particle and the EDS detector direction). - To do this, add a control '_is_particle_shadowed()' together with '_is_particle_area_ok()' to select valid particles. ''' if EM_driver.is_at_EM: self._check_EM_controller_initialization() move_to_frame_success = self.EM.go_to_next_frame() if move_to_frame_success: # Collect image frame_image = EM_driver.get_image_data(self._im_width, self._im_height, 1) else: # No more frames are available return None elif frame_image is not None and pixel_size is not None: self._im_height, self._im_width = frame_image.shape self.EM.pixel_size_um = pixel_size if results_dir: self.results_dir = results_dir else: raise ValueError('Function "_get_particles_coordinates_in_frame()" must be run at the microscope, or it needs to be passed both image and its pixel size') ### Select particles on substrate # Apply a Gaussian blur to remove single bright pixels frame_image = cv2.GaussianBlur(frame_image, (5, 5), 0) # Get mask of particles on substrate par_mask, _ = self._get_particles_on_substrate_mask(frame_image) # Find connected components num_labels, labels, stats, centroids = self._get_connected_components_with_stats(par_mask) ### Filter out particles that are too small or too big, and store their centroid + size par_pos_pixels = [] par_fw_pixels = [] par_radius_pixels = [] fw_margin_um = 5 # Pixel size in frame is a few um, so the particle will be shifted # fw_scale_factor = 1.8 # Multiplicative factor to obtain margins from particle for i in range(1, num_labels): # Skip the background component (index 0) if self._is_particle_area_ok(stats[i, cv2.CC_STAT_AREA]): # Append particle centroid par_pos_pixels.append(centroids[i]) # Calculate what would be the frame width in pixels in order to contain the particle fully in its width fw_width = stats[i, cv2.CC_STAT_WIDTH] # Same as above, but to contain the particle in its height fw_height = stats[i, cv2.CC_STAT_HEIGHT] / self._im_height * self._im_width # Select the largest frame width to make sure it contains the particle fully par_fw = max([fw_width, fw_height]) # Append to list of frame_widths par_fw_pixels.append(par_fw) # Save particle radius, for proper circle size in saved image par_radius_pixels.append(max(fw_width, stats[i, cv2.CC_STAT_HEIGHT]) / 2) ### Visualize selected particles. Used for code development # else: # # Blacken pixels corresponding to excluded components # par_mask[labels == i] = 0 # cv2.imshow('Filtered mask', par_mask) # Save frame image annotating it with the identified particles filename = f"{self._sample_ID}_fr{self.EM.current_frame_label}_particles" im_annotations = [{self.EM.an_circle_key : (int(rad), center.astype(int), 2)} for rad, center in zip(par_radius_pixels, par_pos_pixels)] self.EM.save_frame_image(filename, im_annotations = im_annotations) # Return false if no particles were detected in the frame num_par = len(par_pos_pixels) if self.verbose: if num_par == 1: par_string = 'particle was' else: par_string = 'particles were' print(f"{num_par} {par_string} found in current frame") if num_par == 0: return False ### Convert dimensions from pixels to mm # Calculate absolute position of particle within the EM, in mm par_pos_abs_mm = self.EM.convert_pixel_pos_to_mm(par_pos_pixels) # Convert the frame width to mm par_fw_mm = (np.array(par_fw_pixels) * self.EM.pixel_size_um + 2 * fw_margin_um) * 10**-3 # Store particle information self.par_pos_abs_mm = par_pos_abs_mm self.par_fw_mm = par_fw_mm self._num_par_in_frame = num_par # Initialise counter to keep track at how many particles within the frame have been analysed self._fr_par_cntr = 0 # Returns True if at least 1 particle was found in the frame, otherwise returns False return True def _get_particles_on_substrate_mask(self, frame_image, save_image=False): ''' Generates a binary mask of detected particles on the substrate from the input frame image. This function applies a brightness threshold to the input image to segment particles, finds contours, and fills inner contours to ensure particles are fully masked. Optionally, the mask image can be saved to disk. The function returns the mask and the path where the mask image would be saved. Parameters ---------- frame_image : ndarray The grayscale input image of the current frame. save_image : bool, optional If True, saves the binary mask image to disk (default: False). Returns ------- par_mask : ndarray Either a binary mask of detected particles, or a labels array, where the pixels of each particle are identified by a different integer (same as labels returned by cv2.ConnectedComponents) mask_img_path : str File path for where the mask image is (or would be) saved. Note ---- The commented `cv2.imshow` line can be enabled for debugging visualization. ''' if self.powder_meas_cfg.par_segmentation_model not in self.powder_meas_cfg.AVAILABLE_PAR_SEGMENTATION_MODELS: self.powder_meas_cfg.par_segmentation_model = "threshold_bright" warnings.warn( f"Chosen particle segmentation model {self.powder_meas_cfg.par_segmentation_model} not available." "Defaulting to 'threshold_bright'", UserWarning ) if self.powder_meas_cfg.par_segmentation_model == "threshold_bright": # Apply the threshold to get a binary image _, par_mask = cv2.threshold(frame_image, self.powder_meas_cfg.par_brightness_thresh, 255, cv2.THRESH_BINARY) # Find all contours in the image and fill them contours, hierarchy = cv2.findContours(par_mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) for i, contour in enumerate(contours): if hierarchy[0][i][3] != -1: # If contour is inside another contour cv2.drawContours(par_mask, [contour], 0, 255, -1) elif self.powder_meas_cfg.par_segmentation_model in self.powder_meas_cfg.AVAILABLE_PAR_SEGMENTATION_MODELS: model_module = par_seg_models.PAR_SEGMENTATION_MODEL_REGISTRY[self.powder_meas_cfg.par_segmentation_model] par_mask = model_module.segment_particles(frame_image, self.powder_meas_cfg, save_image, self.EM) else: raise ValueError(f"Unknown error with current particle segmentation model {self.powder_meas_cfg.par_segmentation_model}") # cv2.imshow('Segmented Particles Mask', par_mask) mask_img_path = os.path.join(self.results_dir, self._sample_ID + f'_fr{self.EM.current_frame_label}' + '_mask.png') # Mask is always saved when collecting particles. Avoids double saving save_image = save_image and not self.EM.measurement_cfg.type == self.EM.measurement_cfg.PARTICLE_STATS_MEAS_TYPE_KEY if self.development_mode or save_image: draw_scalebar(par_mask, self.EM.pixel_size_um) cv2.imwrite(mask_img_path, par_mask) return par_mask, mask_img_path def _get_connected_components_with_stats(self, par_mask: np.ndarray): """ Compute connected components with statistics. This function accepts either: 1. A binary mask (0/255 or boolean), in which case it directly calls cv2.connectedComponentsWithStats. 2. A pre-labeled image (integer labels, like output of cv2.connectedComponents), in which case stats and centroids are recomputed manually. Parameters ---------- par_mask : np.ndarray Input binary mask (0/255 or bool) or label image (int32). Returns ------- Same as cv2.connectedComponentsWithStats num_labels : int Number of connected components (including background). labels : np.ndarray Labeled image of the same size as input. stats : np.ndarray Statistics for each label. Shape: (num_labels, 5). Columns indexed by: cv2.CC_STAT_LEFT (x) cv2.CC_STAT_TOP (y) cv2.CC_STAT_WIDTH (width) cv2.CC_STAT_HEIGHT (height) cv2.CC_STAT_AREA (area) centroids : np.ndarray Centroids of each component. Shape: (num_labels, 2). """ # --- Case 1: Binary image --- if par_mask.dtype == np.bool_ or np.array_equal(np.unique(par_mask), [0, 255]): num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( par_mask.astype(np.uint8), connectivity=8, ltype=cv2.CV_32S ) else: # --- Case 2: Already a label image --- labels = par_mask.astype(np.int32, copy=False) num_labels = labels.max() + 1 stats = np.zeros((num_labels, 5), dtype=np.int32) centroids = np.zeros((num_labels, 2), dtype=np.float64) for label in range(num_labels): mask = labels == label if not np.any(mask): continue ys, xs = np.where(mask) x_min, x_max = xs.min(), xs.max() y_min, y_max = ys.min(), ys.max() w = x_max - x_min + 1 h = y_max - y_min + 1 area = mask.sum() stats[label, cv2.CC_STAT_LEFT] = x_min stats[label, cv2.CC_STAT_TOP] = y_min stats[label, cv2.CC_STAT_WIDTH] = w stats[label, cv2.CC_STAT_HEIGHT] = h stats[label, cv2.CC_STAT_AREA] = area centroids[label] = [xs.mean(), ys.mean()] return num_labels, labels, stats, centroids
[docs] def is_particle_at_frame_edge(self, stats, i): ''' Determines whether a particle is located at or near the edge of the image frame. A margin is used to account for detection tolerances. If any part of the particle's bounding box is within `pixel_margin` pixels of the image border, it is considered to be at the edge. Parameters ---------- stats : ndarray Connected components statistics array (as returned by OpenCV), where each row corresponds to a detected particle and columns to bounding box info (LEFT, TOP, WIDTH, HEIGHT, etc.). i : int Index of the particle in the stats array. Returns ------- bool True if the particle is at or near the edge of the image frame, False otherwise. ''' # Apply margin to account for detection tolerances pixel_margin = 3 # Check if the particle's bounding box touches or is near any image edge is_particle_at_frame_edge = any([ stats[i, cv2.CC_STAT_LEFT] <= pixel_margin, stats[i, cv2.CC_STAT_TOP] <= pixel_margin, stats[i, cv2.CC_STAT_LEFT] + stats[i, cv2.CC_STAT_WIDTH] >= self._im_width - pixel_margin, stats[i, cv2.CC_STAT_TOP] + stats[i, cv2.CC_STAT_HEIGHT] >= self._im_height - pixel_margin ]) return is_particle_at_frame_edge
def _is_particle_area_ok(self, area): ''' Checks whether the given particle area is within acceptable limits. The function converts the minimum and maximum allowed particle areas from µm² to pixel units using the current pixel size. It then determines if the provided area is within this range. If in manual collection mode and the area is out of bounds, it prints a message to the user. Parameters ---------- area : float The area of the particle in pixels. Returns ------- bool True if the particle area is within the allowed range, False otherwise. ''' # Calculate acceptable particle area dimensions with current pixel size min_area_pixels = self.powder_meas_cfg.min_area_par / self.EM.pixel_size_um**2 # e.g., 4um^2 for a 2um x 2um particle max_area_pixels = self.powder_meas_cfg.max_area_par / self.EM.pixel_size_um**2 # e.g., 100um^2 for a 10um x 10um particle is_par_large_enough = area >= min_area_pixels if not is_par_large_enough and self.is_manual_particle_selection: print(f"Selected particle is too small ({area*self.EM.pixel_size_um**2:.2f} um^2), select another one") is_par_small_enough = area <= max_area_pixels if not is_par_small_enough and self.is_manual_particle_selection: print(f"Selected particle is too large ({area*self.EM.pixel_size_um**2:.2f} um^2), select another one") is_par_area_ok = is_par_large_enough and is_par_small_enough return is_par_area_ok def _get_particle_mask(self, par_image=None, pixel_size_um=None, results_dir=None, centering=False): ''' Returns a binary mask of the particle at the center of the image, if the particle is large enough. If no particle is present at the center, the function searches for and centers on the next closest sufficiently large particle. This approach makes the function robust against drift or movement of the particle during analysis. If no valid particle is found, the function returns None. Parameters ---------- par_image : ndarray, optional Grayscale image of the current frame. If not provided and running at the EM, the image is acquired. pixel_size_um : float, optional Pixel size in micrometers. Required if not running at the EM. results_dir : str, optional Directory to save results. Required for saving the mask in development mode. centering : bool, optional If True, indicates this is a second attempt at centering on a particle (prevents infinite recursion). Returns ------- tuple or None (par_mask, par_image): Binary mask of the selected particle and the corresponding image. None: If no suitable particle is found. Notes ----- - The function is robust to small misalignments: if the center particle is not valid, it finds the next closest. - In development mode, it saves the resulting mask with a scalebar overlay. - The commented `cv2.imshow` lines can be enabled for debugging visualization. ''' # Get particle mask if EM_driver.is_at_EM: self._check_EM_controller_initialization() par_image = EM_driver.get_image_data(self._im_width, self._im_height, 1) elif par_image is not None and pixel_size_um is not None: self._im_height, self._im_width = par_image.shape self.EM.pixel_size_um = pixel_size_um if results_dir: self.results_dir = results_dir else: raise ValueError('This function must be run at the microscope, or it needs to be passed both image and its pixel size') # Apply the threshold to get a binary image _, par_mask = cv2.threshold(par_image, self.powder_meas_cfg.par_brightness_thresh, 255, cv2.THRESH_BINARY) # cv2.imshow('Initial mask of particles', par_mask) # Find connected components num_labels, labels, stats, centroids = self._get_connected_components_with_stats(par_mask) # Make sure particles are present if num_labels == 1: return None # Identify component that contains the center of the image (our particle of interest) center_x = int(self._im_width / 2) center_y = int(self._im_height / 2) par_label = labels[center_y, center_x] # Check if the center particle is valid if par_label > 0 and self._is_particle_area_ok(stats[par_label, cv2.CC_STAT_AREA]): # Set all pixels outside the particle to 0 par_mask[labels != par_label] = 0 elif not centering: # Select the next closest valid particle distances = np.linalg.norm(centroids[1:] - np.array([center_x, center_y]), axis=1) sorted_indices = np.argsort(distances) + 1 # Skip background (index 0) for label in sorted_indices: if not EM_driver.is_at_EM: par_label = label break elif self._is_particle_area_ok(stats[label, cv2.CC_STAT_AREA]): new_center = self.EM.convert_pixel_pos_to_mm(centroids[label]) self.EM.move_to_pos(new_center) par_mask_return = self._get_particle_mask(centering=True) if par_mask_return is not None: par_mask, par_image = par_mask_return else: return None par_label = label break else: par_label = None else: # Already attempted once at centering the particle. Did not work return None # Check if there is at least one particle of sufficient size if par_label is None: return None # cv2.imshow('Center Particle', par_mask) if self.development_mode and self.results_dir: par_mask = draw_scalebar(par_mask, self.EM.pixel_size_um) # Save mask, only for development cv2.imwrite(os.path.join( self.results_dir, self._sample_ID + f'_par{self.tot_par_cntr}_fr{self.EM.current_frame_label}_mask.png' ), par_mask) return (par_mask, par_image) def _erode_particle_mask(self, par_mask, margin, erode_inner=False): """ Erode a binary particle mask by a specified margin. This function erodes the mask either only at the outer boundary or at both the outer boundary and inner holes, depending on the `erode_inner` flag. Parameters ---------- par_mask : np.ndarray Binary mask of the particle (dtype uint8, values 0 and 255). margin : int The erosion margin (in pixels). The structuring element will be of size (2 * margin + 1). erode_inner : bool, optional If False (default), only the outer boundary of the mask is eroded, leaving holes inside the particle unaffected. If True, both the outer boundary and any internal holes are eroded. Returns ------- final_mask : np.ndarray The eroded binary mask (same dtype and shape as `par_mask`). Notes ----- - This method assumes the mask is a binary 8-bit image (0 for background, 255 for foreground). - The erosion uses an elliptical structuring element. Examples -------- >>> eroded = self._erode_particle_mask(par_mask, margin=3, erode_inner=False) """ kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * margin + 1, 2 * margin + 1)) if erode_inner: # Erode everywhere (outer and inner edges) final_mask = cv2.erode(par_mask, kernel) else: # Erode only the outer contour, not holes contours, hierarchy = cv2.findContours(par_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) outer_contour_mask = np.zeros_like(par_mask) cv2.drawContours(outer_contour_mask, contours, -1, 255, -1) # Fill outer boundaries eroded_outer_contour_mask = cv2.erode(outer_contour_mask, kernel) final_mask = cv2.bitwise_and(par_mask, eroded_outer_contour_mask) return final_mask def _find_particle_bright_regions(self, final_mask, par_image): """ Detect bright regions within a particle mask in the input image, in order to acquire X-ray spectra from thickest regions of particle and maximize X-ray intensity. This function applies a mask to the input image, blurs it, normalizes the intensity, thresholds to find bright spots, and returns the thresholded image and the minimum area (in pixels) for spot detection. Parameters ---------- final_mask : np.ndarray Binary mask (uint8, values 0 and 255) specifying the region of interest (the particle). par_image : np.ndarray Grayscale image (uint8) of the particle region. Returns ------- thresholded_image : np.ndarray Binary image (uint8, values 0 and 255) showing detected bright spots within the particle mask. min_area_pixels : int Minimum spot area in pixels, calculated as corresponding to 0.1 μm². Notes ----- - The function applies a Gaussian blur before normalization and thresholding. - The threshold value is taken from `self.powder_meas_cfg.par_xy_spots_thresh`. - The minimum area is computed from `self.EM.pixel_size_um`. - If `self.development_mode` is True and `self.results_dir` is set, a debug image is saved with a scalebar. Examples -------- >>> spots_mask, min_area = self._find_particle_bright_regions(final_mask, par_image) """ masked_image = cv2.bitwise_and(par_image, par_image, mask=final_mask) # cv2.imshow('CV Image', masked_image) if np.all(masked_image == 0): return np.zeros_like(masked_image), 0 masked_im_blurred = cv2.GaussianBlur(masked_image, (5, 5), 0) # cv2.imshow('Blurred Image', masked_im_blurred) # Normalise particle intensity to select thickest regions, regardless of intensity of neighbouring features norm_masked_image = (masked_im_blurred / np.max(masked_im_blurred) * 255).astype(np.uint8) _, thresholded_image = cv2.threshold(norm_masked_image, self.powder_meas_cfg.par_xy_spots_thresh, 255, cv2.THRESH_BINARY) # cv2.imshow('CV Image', thresholded_image) # cv2.imshow('CV Image 2', cv2.bitwise_and(par_image, thresholded_image)) min_area_pixels = int(0.1 / self.EM.pixel_size_um ** 2) if self.development_mode and self.results_dir: eroded_par_mask = draw_scalebar(thresholded_image, self.EM.pixel_size_um) cv2.imwrite(os.path.join(self.results_dir, self._sample_ID + f'_par{self.tot_par_cntr}_fr{self.EM.current_frame_label}_maskXY.png'), eroded_par_mask) return thresholded_image, min_area_pixels
[docs] def prepare_mask_for_visualization(self, mask: np.ndarray) -> np.ndarray: """ Prepare a segmentation mask for visualization. Behavior: - Binary masks with values in {0, 1} are scaled to [0, 255]. - Binary masks with values in {0, 255} are returned unchanged. - Integer label masks have brightness reversed so higher labels are brighter, with a minimum brightness of 30 for positive values. Background (0) remains black. - Floating-point masks are normalized to [0, 255] with the same rules for positive values and background. """ mask = np.asarray(mask) unique_vals = np.unique(mask) # --- Binary mask [0, 1] --- if np.array_equal(unique_vals, [0, 1]): return (mask * 255).astype(np.uint8) # --- Binary mask [0, 255] --- elif np.array_equal(unique_vals, [0, 255]): return mask.astype(np.uint8) # --- Integer label masks --- elif np.issubdtype(mask.dtype, np.integer): max_val = mask.max() if max_val > 0: scaled = mask.astype(np.float32) pos_mask = scaled > 0 # Reverse intensity so higher labels → brighter scaled[pos_mask] = (max_val - scaled[pos_mask]) * (255.0 / max_val) # Minimum brightness for positive labels scaled[pos_mask] = np.clip(scaled[pos_mask], 30, 255) scaled[~pos_mask] = 0 return scaled.astype(np.uint8) else: return mask.astype(np.uint8) # --- Floating-point masks --- else: min_val = mask.min() max_val = mask.max() if max_val > min_val: scaled = (mask - min_val) / (max_val - min_val) * 255.0 scaled = scaled.astype(np.float32) pos_mask = mask > 0 scaled[pos_mask] = np.clip(scaled[pos_mask], 30, 255) scaled[~pos_mask] = 0 return scaled.astype(np.uint8) else: return np.zeros_like(mask, dtype=np.uint8)
#%% Selection of spots for X-Ray spectra acquisition # =============================================================================
[docs] def get_XS_acquisition_spots_coord_list( self, n_tot_sp_collected, par_image=None, pixel_size_um=None, results_dir=None): ''' Returns a list of coordinates (relative, in the current image) for X-ray spectrum spot collection on a particle. The function finds a suitable particle mask, erodes the mask to avoid particle edges, finds bright regions (or peak spots), and selects up to `powder_meas_cfg.max_spectra_per_par` spots per particle with a minimum distance between them (determined through 'powder_meas_cfg.par_mask_margin'). The selection strategy for features and spacing can be controlled via powder_meas_cfg.par_feature_selection and powder_meas_cfg.par_spot_spacing. Parameters ---------- n_tot_sp_collected : int Counter for the total number of spectra collected (used for labeling). par_image : ndarray, optional Grayscale image of the current frame. If not provided and running at the SEM, the image is acquired. pixel_size_um : float, optional Pixel size in micrometers. Required if not running at the SEM. results_dir : str, optional Directory to save result images. Returns ------- pts_rel_coords : ndarray Array of selected (x, y) spot coordinates in relative units (centered at 0). Coordinate System ---------------- The coordinates are expressed in a normalized, aspect-ratio-correct system centered at the image center: - The origin (0, 0) is at the image center. - The x-axis is horizontal, increasing to the right, ranging from -0.5 (left) to +0.5 (right). - The y-axis is vertical, increasing downward, and scaled by the aspect ratio (height/width): * Top edge: y = -0.5 × (height / width) * Bottom edge: y = +0.5 × (height / width) | (-0.5, -0.5*height/width) (0.5, -0.5*height/width) | +-------------------------+ | | | | | | | | +(0,0) |-----> +x | | | | | | v +y +-------------------------+ (-0.5, 0.5*height/width) (0.5, 0.5*height/width) This ensures the coordinate system is always centered and aspect-ratio-correct, regardless of image size. Potential Improvements --------------------- - Select points to ensure all phases are individuated, biasing spot selection to ensure representation of phases with different bright/dark contrast. Currently, only the brightest spots are picked, which may miss some phases. - Add detection of both peaks and valleys to ensure both bright and dark spots are tested. - Exclude points that are close to larger particles along the X-ray emission path to the EDS detector, as these may absorb emitted X-rays and degrade spectral signal. ''' # --- 1. Acquire or prepare the particle mask and image --- if EM_driver.is_at_EM: par_mask_return = self._get_particle_mask() elif par_image is not None and pixel_size_um is not None: self._im_height, self._im_width = par_image.shape self.EM.pixel_size_um = pixel_size_um par_mask_return = self._get_particle_mask(par_image, pixel_size_um) if results_dir: self.results_dir = results_dir else: raise ValueError('This function must be run at the microscope, or it needs to be passed both image and its pixel size') # Check if a particle was detected. If not, return empty list if par_mask_return is None: return [] else: par_mask, par_image = par_mask_return # --- 2. Erode the particle mask to avoid edge effects --- margin = max(10, int(self.powder_meas_cfg.par_mask_margin / self.EM.pixel_size_um)) final_mask = self._erode_particle_mask(par_mask, margin) # --- 3. Find bright points in image, which indicate highest regions on particle--- thresholded_image, min_area_pixels = self._find_particle_bright_regions(final_mask, par_image) # --- 4. Collect candidate points from thresholded image --- all_points = self._collect_candidate_points( thresholded_image, par_image, self.powder_meas_cfg.par_feature_selection, min_area_pixels ) if len(all_points) == 0: return [] # --- 5. Select points with minimum distance and maximum count --- min_distance_xsp_spots = (self.powder_meas_cfg.xsp_spots_distance_um / self.EM.pixel_size_um) if self.powder_meas_cfg.par_spot_spacing == 'random': selected_points = EM_Particle_Finder._select_XSspots_randomly( all_points, max_points=self.powder_meas_cfg.max_spectra_per_par, min_distance=min_distance_xsp_spots ) elif self.powder_meas_cfg.par_spot_spacing == 'maximized': selected_points = EM_Particle_Finder._select_evenly_spaced_XSspots( all_points, max_points=self.powder_meas_cfg.max_spectra_per_par, min_distance=min_distance_xsp_spots ) # --- 6. Convert pixel coordinates to relative image coordinates --- pts_rel_coords = EM_driver.frame_pixel_to_rel_coords( selected_points, img_width=self._im_width, img_height=self._im_height ) # --- 7. Annotate image and save --- if self.development_mode and self.results_dir: color_image = cv2.cvtColor(par_image, cv2.COLOR_GRAY2BGR) for center in selected_points: # Add circle indicating where X-ray spectrum was collected cv2.circle(color_image, center, 10, (0, 0, 255), -1) label_pos = (center[0] - 30, center[1] - 15) cv2.putText(color_image, str(n_tot_sp_collected), label_pos, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) n_tot_sp_collected += 1 color_image = draw_scalebar(color_image, self.EM.pixel_size_um) # cv2.imshow('Selected XS spots', color_image) cv2.imwrite(os.path.join(self.results_dir, self._sample_ID + f'_par{self.tot_par_cntr}_fr{self.EM.current_frame_label}_xyspots.png'), color_image) return pts_rel_coords
def _collect_candidate_points(self, thresholded_image, par_image, feature_selection, min_area_pixels): """ Collect candidate EDS/WDS spot coordinates from thresholded regions. This function identifies connected components (regions) in the thresholded image, filters them by minimum area, and collects candidate points for X-ray spot acquisition. The method of selection depends on the `feature_selection` argument: - 'random': all pixel coordinates within the component are collected. - 'peaks': only the brightest pixel within each component is selected. Parameters ---------- thresholded_image : np.ndarray Binary image (uint8, values 0 and 255) indicating candidate regions for spot selection. par_image : np.ndarray Grayscale image (uint8) of the particle, used for peak detection. feature_selection : str 'random' to select all pixels in the region, 'peaks' to select local maxima (bright spots). min_area_pixels : int Minimum area (in pixels) for a component to be considered. Returns ------- all_points : list of tuple List of (x, y) coordinates of candidate points (in pixel units). Notes ----- - For 'random', all pixels in each sufficiently large component are returned. - For 'peaks', local maxima are found by repeatedly masking out regions around each peak. - The margin for masking out peaks is determined by `self.powder_meas_cfg.par_mask_margin` and `self.EM.pixel_size_um`. - The maximum number of iterations for peak finding is `100 * self.powder_meas_cfg.max_spectra_per_par`. - Only components with area >= `min_area_pixels` are considered. """ all_points = [] num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(thresholded_image, connectivity=8) margin = max(10, int(self.powder_meas_cfg.par_mask_margin / self.EM.pixel_size_um)) for i in range(1, num_labels): # Skip background if stats[i, cv2.CC_STAT_AREA] >= min_area_pixels: if feature_selection == 'random': components_coords = np.where(labels == i) all_points += list(zip(components_coords[1], components_coords[0])) elif feature_selection == 'peaks': component_mask = (labels == i).astype(np.uint8) * 255 component_image = cv2.bitwise_and(par_image, par_image, mask=component_mask) component_image = cv2.GaussianBlur(component_image, (5, 5), 0) max_iter = 100 * self.powder_meas_cfg.max_spectra_per_par iter_cntr = 0 while iter_cntr < max_iter: iter_cntr += 1 min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(component_image) if max_val < self.powder_meas_cfg.par_xy_spots_thresh: break _, thresh = cv2.threshold(component_image, int(max_val * 0.97), 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for contour in contours: x, y, w, h = cv2.boundingRect(contour) center_x = x + w // 2 center_y = y + h // 2 center_pnt = (center_x, center_y) all_points.append(center_pnt) cv2.circle(component_image, center_pnt, margin, (0, 0, 0), -1) return all_points @staticmethod def _select_XSspots_randomly(points, max_points, min_distance): ''' Randomly selects up to `max_points` from a list of acquisition spots, ensuring that each selected point is at least `min_distance` away from all others already selected. Parameters ---------- points : list of tuple or ndarray List of (x, y) coordinates to choose from. max_points : int Maximum number of points to select. min_distance : float Minimum allowed distance between any two selected points. Returns ------- selected_points : list List of selected (x, y) points. ''' # Shuffle points to unbias the choice of position random.shuffle(points) # Check if some selected points must be cut out if len(points) <= max_points: return points selected_points = [points[0]] for point in points: # Check if enough points have been selected if len(selected_points) >= max_points: break # Check if point is distant enough from all selected points if point not in selected_points and all(np.linalg.norm(np.array(point) - np.array(p)) > min_distance for p in selected_points): selected_points.append(point) return selected_points @staticmethod def _select_evenly_spaced_XSspots(points, max_points, min_distance): ''' Selects up to `max_points` from a list of acquisition spots, maximizing the minimum distance between any two selected points. Points are chosen iteratively: each new point is the one farthest (in minimum distance) from those already selected. Parameters ---------- points : list of tuple or ndarray List of (x, y) coordinates to choose from. max_points : int Maximum number of points to select. min_distance : float Minimum allowed distance between any two selected points (not strictly enforced, but likely achieved). Returns ------- selected_points : list List of selected (x, y) points. ''' # Shuffle points to unbias the choice of position random.shuffle(points) # Check if some selected points must be cut out if len(points) <= max_points: return points selected_points = [points[0]] while len(selected_points) < max_points: max_min_distance = -1 best_point = None for point in points: if point not in selected_points: min_dist_to_selected = min(np.linalg.norm(np.array(point) - np.array(p)) for p in selected_points) if min_dist_to_selected > max_min_distance: max_min_distance = min_dist_to_selected best_point = point if best_point is not None: selected_points.append(best_point) else: break return selected_points #%% Particle Statistics # =============================================================================
[docs] def get_particle_stats(self, n_par_target): """ Analyze frames to collect and save statistics on particle sizes until a desired number of particles is reached. This function is to be used on its own after microscope initialization. It iteratively analyzes frames, detects particles, filters and records their areas, and continues until `n_par_target` particles have been analyzed or no more frames are available. It saves both individual particle data and summary statistics, and generates a histogram of the size distribution. Parameters ---------- n_par_target : int The desired number of particles to analyze. Returns ------- par_size_distr_df : pandas.DataFrame A single-row DataFrame containing summary statistics of the analyzed particle size distribution. The columns include: - 'measurement' : str Description of the measurement (e.g., 'equivalent particle diameter in μm'). - 'n_par_analysed' : int Number of particles analyzed. - 'mean' : float Mean equivalent particle diameter (μm). - 'stdev' : float Standard deviation of the equivalent particle diameter (μm). - 'median' : float Median equivalent particle diameter (μm). - 'max' : float Maximum equivalent particle diameter (μm). - 'min' : float Minimum equivalent particle diameter (μm). - 'D10' : float 10th percentile of the equivalent particle diameter (μm). - 'D25' : float 25th percentile of the equivalent particle diameter (μm). - 'D75' : float 75th percentile of the equivalent particle diameter (μm). - 'D90' : float 90th percentile of the equivalent particle diameter (μm). This DataFrame is also saved as a CSV file in the results directory. Side Effects ------------ - Updates `self.analyzed_pars` with new particle areas (in μm²), and corresponding frame_label. - Saves a CSV file with all particle areas and equivalent diameters. - Saves a CSV file with summary statistics (mean, stdev, median, percentiles). - Saves a histogram plot of the equivalent diameters as a PNG file. - Prints information and warnings to the console if `self.verbose` is True. Notes ----- - The function relies on `_move_and_get_particles_stats_in_frame()` to process each frame and find new particles. - If no particles are found or frames are exhausted, the function will print a warning and exit early. - Equivalent diameters are computed assuming each particle is a circle of the same area. - The function sorts particle areas to facilitate percentile calculations. - The function handles cases where particles are very small or indistinguishable in area. Potential Improvements / TODO ---------------------------- - Ideally the software should autonomously measure different particle sizes at different frame widths. At the moment, it requires a specific range to be selected, and filters out particles outside this range. - Particles at frame edges are currently ignored. Ideally, they should be included by centering the stage around them, and recording their area - Add robust error handling for file I/O and plotting. - Optionally return summary statistics as a dictionary or DataFrame for further programmatic use. """ # Analyse frames and store particle areas until n = n_par_target particles have been analysed par_not_found_cntr = 0 # To check if particles were not found too many times while len(self.analyzed_pars) < n_par_target: previous_n_par = len(self.analyzed_pars) par_were_found = self._move_and_get_particles_stats_in_frame() if par_were_found is None: print(f"Could not find {n_par_target} particles. Completed statistics using {len(self.analyzed_pars)} particles.") break elif par_were_found is False: if self.verbose: print("No particle was found in this frame") par_not_found_cntr +=1 elif par_were_found: if self.verbose: tot_n_par_found = len(self.analyzed_pars) n_par_found_frame = tot_n_par_found - previous_n_par plural_frame = n_par_found_frame != 1 plural_total = tot_n_par_found != 1 print( f"{n_par_found_frame} particle{'s' if plural_frame else ''} " f"{'were' if plural_frame else 'was'} found in this frame." ) print( f"A total of {tot_n_par_found} particle{'s' if plural_total else ''} " f"{'have' if plural_total else 'has'} now been analyzed." f"{n_par_target - tot_n_par_found} more to go." ) par_not_found_cntr = 0 # Number of analysed particles n_par_analysed = len(self.analyzed_pars) if n_par_analysed == 0: print('Could not find any particle. Please check your sample, or change the constrast/brightness values.') return None par_size_distr_df = self.save_particle_statistics() return par_size_distr_df
[docs] def save_particle_statistics(self, output_file_suffix = ''): """ Process particle area data, compute summary statistics, export results, and produce a particle size histogram. This method: 1. Extracts particle areas (μm²) and associated frame labels. 2. Optionally warns if two smallest particles have identical area (potential imaging resolution issue). 3. Calculates equivalent diameters assuming circular particles. 4. Saves raw particle data to CSV. 5. Computes descriptive statistics and saves them to CSV. 6. Generates and saves a histogram plot of particle sizes. Parameters ---------- output_file_suffix : str, optional String added to output file name Returns ------- pandas.DataFrame DataFrame containing the calculated particle size statistics. """ if os.path.basename(self.results_dir) == cnst.IMAGES_DIR: output_dir = os.path.dirname(self.results_dir) else: output_dir = self.results_dir # ---- Extract numeric areas and labels from stored tuples ---- par_IDs, frame_labels, areas_um = zip(*self.analyzed_pars) par_IDs = np.array(par_IDs, dtype = int) frame_labels = np.array(frame_labels, dtype=str) areas_um = np.array(areas_um, dtype=float) n_par_analysed = len(areas_um) # ---- Warn if smallest particles have same size (pixel limit indication) ---- if n_par_analysed > 1 and np.isclose(areas_um.min(), np.partition(areas_um, 1)[1]): print( '⚠ The 2 smallest particles have identical area.\n' ' This may indicate they are only 1–2 pixels in size.\n' ' Frame width is set based on maximum acceptable particle size.\n' ' Consider reducing the maximum particle size so that the minimum\n' ' acceptable size is within ~1 order of magnitude.' ) # ---- Calculate equivalent diameters (circle assumption) ---- eq_diam_um = np.sqrt(areas_um / np.pi) * 2 # ---- Save raw particle sizes to CSV ---- particle_data = pd.DataFrame({ cnst.PAR_ID_DF_KEY : par_IDs, cnst.FRAME_ID_DF_KEY : frame_labels, cnst.PAR_AREA_UM_KEY: areas_um, cnst.PAR_EQ_D_KEY: eq_diam_um }) particle_data.to_csv( os.path.join(output_dir, f"{self._sample_ID}_{cnst.PARTICLE_SIZES_FILENAME}{output_file_suffix}.csv"), header=True, index=False ) # ---- Compute descriptive statistics ---- par_size_distr = { 'Measure': 'Equivalent particle diameter in μm', 'n_par_analysed': n_par_analysed, 'mean': np.mean(eq_diam_um), 'stdev': np.std(eq_diam_um), 'median': np.median(eq_diam_um), 'max': np.max(eq_diam_um), 'min': np.min(eq_diam_um), 'D10': np.percentile(eq_diam_um, 10), 'D25': np.percentile(eq_diam_um, 25), 'D75': np.percentile(eq_diam_um, 75), 'D90': np.percentile(eq_diam_um, 90) } # ---- Save statistics to CSV ---- stats_df = pd.DataFrame(par_size_distr, index=[0]) stats_df.to_csv( os.path.join(output_dir, f"{self._sample_ID}_{cnst.PARTICLE_STATS_FILENAME}{output_file_suffix}.csv"), header=True, index=False ) # ---- Optional verbose output ---- if self.verbose: print_double_separator() print("Computed statistics:\n") print(stats_df.T.to_string(header=False)) # ---- Generate and save particle size histogram ---- self._save_particle_size_histogram( areas_um, results_dir=output_dir, _sample_ID=self._sample_ID, verbose=self.verbose, output_file_suffix = output_file_suffix ) return stats_df
def _save_particle_size_histogram(self, diameters_um, results_dir=None, _sample_ID=None, verbose=False, output_file_suffix = '', bins=20): """ Save a histogram plot of particle equivalent diameters. This function generates and saves a histogram of the particle size distribution. The plot is saved as a PNG file in the specified results directory, with the file name based on the sample ID. Optionally, the plot can be displayed interactively. Parameters ---------- diameters_um : array-like Array of equivalent particle diameters in micrometers (μm). results_dir : str, optional Directory where the histogram PNG file will be saved. If None, uses `self.results_dir`. _sample_ID : str, optional Identifier for the sample, used in the output file name. If None, uses `self._sample_ID`. verbose : bool, optional If True, displays the plot interactively. Default is False. output_file_suffix : str, optional String added to output file name bins : int, optional Number of bins to use in the histogram. Default is 20. Returns ------- None Notes ----- - The histogram is always saved as a PNG file with the name '{_sample_ID}_Par_size_distribution_hist.png'. - The function uses matplotlib for plotting. Potential Improvements / TODO ----------------------------- - Allow user to specify output file format (e.g., SVG, PDF). - Add option to overlay statistics (mean, median, percentiles) on the plot. - Enable saving both linear and logarithmic scale histograms. - Allow customization of plot colors and style. - Return the matplotlib Figure object for further manipulation if desired. - Add error handling for file I/O issues. """ if results_dir is None: results_dir = self.results_dir if _sample_ID is None: _sample_ID = self._sample_ID plt.figure() plt.hist(diameters_um, bins=bins, edgecolor='black') plt.xlabel('Equivalent Diameter (μm)') plt.ylabel('Counts') plt.title('Particle size distribution') out_path = os.path.join(results_dir, f'{_sample_ID}_{cnst.PARTICLE_STAT_HIST_FILENAME}{output_file_suffix}.png') plt.savefig(out_path) plt.close() def _move_and_get_particles_stats_in_frame(self, frame_image=None, pixel_size=None, results_dir=None): """ Move to the next frame and analyze it to detect and characterize particles on the substrate. This function either acquires a new frame from the electron microscope (if running at the EM) or processes a provided image. It detects particles, filters them by size and edge proximity, calculates their areas in pixels and micrometers squared, and optionally saves annotated images with detected particles and their indices. Parameters ---------- frame_image : np.ndarray, optional Grayscale image of the current frame. If not provided and running at the EM, the image is acquired. pixel_size : float, optional Pixel size in micrometers. Required if not running at the microscope. results_dir : str, optional Directory to save result images and masks. Returns ------- bool or None Returns True if at least one valid particle is found in the frame, False if no valid particles are detected, or None if there are no more frames available (when running at the EM). Notes ----- - If running at the EM, a new frame is acquired and saved if development_mode=True. - If not at the EM, both `frame_image` and `pixel_size` must be provided. - The function applies a Gaussian blur to suppress noise before particle detection. - Particles are filtered by area and by proximity to the frame edge. - The area of each accepted particle is appended to `self.analyzed_pars`. - An annotated image with detected particles and their indices is saved for visualization. - If no valid particles are found, the mask image is deleted and the function returns False. - If at least one particle is found, the microscope focus/contrast/brightness is refreshed if needed. """ if EM_driver.is_at_EM: self._check_EM_controller_initialization() move_to_frame_success = self.EM.go_to_next_frame() if move_to_frame_success: # Collect image frame_image = EM_driver.get_image_data(self._im_width, self._im_height, 1) if self.development_mode: cv2.imwrite(os.path.join(self.results_dir, self._sample_ID + f'_fr_{self.EM.current_frame_label}.png'), frame_image) else: # No more frames are available return None elif frame_image is not None and pixel_size is not None: self._im_height, self._im_width = frame_image.shape self.EM.pixel_size_um = pixel_size if results_dir: self.results_dir = results_dir else: raise ValueError('This function must be run at the microscope, or it needs to be passed both image and its pixel size') # Select particles on substrate blurred_image = cv2.GaussianBlur(frame_image, (5, 5), 0) # Get mask of particles on substrate save_mask_img = True par_mask, mask_img_path = self._get_particles_on_substrate_mask(blurred_image, save_image=save_mask_img) # Find connected components num_labels, labels, stats, centroids = self._get_connected_components_with_stats(par_mask) # Store particle area par_centroids = [] # Only used for saving image par_areas = [] par_cntr = 0 for i in range(1, num_labels): # Skip the background component (index 0) par_area_pixels = stats[i, cv2.CC_STAT_AREA] if self._is_particle_area_ok(par_area_pixels) and not self.is_particle_at_frame_edge(stats, i): # If particle is within size limits and is not at the edge, consider it in the statistics par_area_um = par_area_pixels * self.EM.pixel_size_um**2 self.analyzed_pars.append((par_cntr, self.EM.current_frame_label, par_area_um)) # Store stats locally to draw circles par_areas.append(par_area_pixels) par_centroids.append(centroids[i]) par_cntr += 1 # Save image to visualize selected particles text_margin = 30 # pixel margin to determine where to annotate image with particle numbers text_pos_scale = 0.9 im_annotations = [] if par_cntr > 0: first_par_n = len(self.analyzed_pars) - par_cntr for i, (center, area) in enumerate(zip(par_centroids, par_areas)): ann_dict = {} radius_pixel = int(np.sqrt(area / np.pi) * 1.1) # Equivalent radius for particle (as a circle) ann_dict[self.EM.an_circle_key] = (radius_pixel, center.astype(int), 2) x_pos_text = int(center[0] + radius_pixel * text_pos_scale) # Number on the top-right of the circle y_pos_text = int(center[1] - radius_pixel * text_pos_scale) if x_pos_text > (self._im_width - text_margin) or y_pos_text < text_margin: # Move number to center x_pos_text = int(center[0]) y_pos_text = int(center[1]) ann_dict[self.EM.an_text_key] = (str(first_par_n + i), (x_pos_text, y_pos_text)) im_annotations.append(ann_dict) filename = f"{self._sample_ID}_fr{self.EM.current_frame_label}" # Save annotated particle image self.EM.save_frame_image(filename + '_particles', im_annotations = im_annotations, frame_image = frame_image) # Save mask image par_mask_to_save = self.prepare_mask_for_visualization(par_mask) self.EM.save_frame_image(filename + '_par_mask', im_annotations = im_annotations, frame_image = par_mask_to_save) # Return True if at least 1 particle was found if par_cntr == 0: if save_mask_img and os.path.exists(mask_img_path): os.remove(mask_img_path) return False else: if not self.EM.measurement_cfg.is_manual_navigation and (time.time() - self.EM._last_EM_adjustment_time > self.EM.refresh_time): # Adjust EM focus, contrast and brightness, but only if a particle is actually present self.EM.adjust_BCF() return True