Source code for valis.registration

"""
Classes and functions to register a collection of images
"""

import traceback
import re
import os
import numpy as np
import pathlib
from skimage import transform, exposure, filters
from time import time
import tqdm
import pandas as pd
import pickle
import colour
import pyvips
from scipy import ndimage
import shapely
from copy import deepcopy
from pprint import pformat
import json
from colorama import Fore
from itertools import chain

from . import feature_matcher
from . import serial_rigid
from . import feature_detectors
from . import non_rigid_registrars
from . import valtils
from . import preprocessing
from . import slide_tools
from . import slide_io
from . import viz
from . import warp_tools
from . import serial_non_rigid

pyvips.cache_set_max(0)

# Destination directories #
CONVERTED_IMG_DIR = "images"
PROCESSED_IMG_DIR = "processed"
RIGID_REG_IMG_DIR = "rigid_registration"
NON_RIGID_REG_IMG_DIR = "non_rigid_registration"
DEFORMATION_FIELD_IMG_DIR = "deformation_fields"
OVERLAP_IMG_DIR = "overlaps"
REG_RESULTS_DATA_DIR = "data"
MICRO_REG_DIR = "micro_registration"
DISPLACEMENT_DIRS = os.path.join(REG_RESULTS_DATA_DIR, "displacements")
MASK_DIR = "masks"

# Default image processing #
DEFAULT_BRIGHTFIELD_CLASS = preprocessing.ColorfulStandardizer
DEFAULT_BRIGHTFIELD_PROCESSING_ARGS = {'c': preprocessing.DEFAULT_COLOR_STD_C, "h": 0}
DEFAULT_FLOURESCENCE_CLASS = preprocessing.ChannelGetter
DEFAULT_FLOURESCENCE_PROCESSING_ARGS = {"channel": "dapi", "adaptive_eq": True}
DEFAULT_NORM_METHOD = "img_stats"

# Default rigid registration parameters #
DEFAULT_FD = feature_detectors.VggFD
DEFAULT_TRANSFORM_CLASS = transform.SimilarityTransform
DEFAULT_MATCH_FILTER = feature_matcher.Matcher(match_filter_method=feature_matcher.RANSAC_NAME)
DEFAULT_SIMILARITY_METRIC = "n_matches"
DEFAULT_AFFINE_OPTIMIZER_CLASS = None
DEFAULT_MAX_PROCESSED_IMG_SIZE = 850
DEFAULT_MAX_IMG_DIM = 850
DEFAULT_THUMBNAIL_SIZE = 500
DEFAULT_MAX_NON_RIGID_REG_SIZE = 3000

# Tiled non-rigid registration arguments
TILER_THRESH_GB = 10
DEFAULT_NR_TILE_WH = 512

# Rigid registration kwarg keys #
AFFINE_OPTIMIZER_KEY = "affine_optimizer"
TRANSFORMER_KEY = "transformer"
SIM_METRIC_KEY = "similarity_metric"
FD_KEY = "feature_detector"
MATCHER_KEY = "matcher"
NAME_KEY = "name"
IMAGES_ORDERD_KEY = "imgs_ordered"
REF_IMG_KEY = "reference_img_f"
QT_EMMITER_KEY = "qt_emitter"
TFORM_SRC_SHAPE_KEY = "transformation_src_shape_rc"
TFORM_DST_SHAPE_KEY = "transformation_dst_shape_rc"
TFORM_MAT_KEY = "M"
CHECK_REFLECT_KEY = "check_for_reflections"

# Rigid registration kwarg keys #
NON_RIGID_REG_CLASS_KEY = "non_rigid_reg_class"
NON_RIGID_REG_PARAMS_KEY = "non_rigid_reg_params"
NON_RIGID_USE_XY_KEY = "moving_to_fixed_xy"
NON_RIGID_COMPOSE_KEY = "compose_transforms"

# Default non-rigid registration parameters #
DEFAULT_NON_RIGID_CLASS = non_rigid_registrars.OpticalFlowWarper
DEFAULT_NON_RIGID_KWARGS = {}

# Cropping options
CROP_OVERLAP = "overlap"
CROP_REF = "reference"
CROP_NONE = "all"

# Messages
WARP_ANNO_MSG = "Warping annotations"
CONVERT_MSG = "Converting images"
DENOISE_MSG = "Denoising images"
PROCESS_IMG_MSG = "Processing images"
NORM_IMG_MSG = "Normalizing images"
TRANSFORM_MSG = "Finding rigid transforms"
PREP_NON_RIGID_MSG = "Preparing images for non-rigid registration"
MEASURE_MSG = "Measuring error"
SAVING_IMG_MSG = "Saving images"

PROCESS_IMG_MSG, NORM_IMG_MSG, DENOISE_MSG = valtils.pad_strings([PROCESS_IMG_MSG, NORM_IMG_MSG, DENOISE_MSG])


[docs] def init_jvm(jar=None, mem_gb=10): """Initialize JVM for BioFormats """ slide_io.init_jvm(jar=None, mem_gb=10)
[docs] def kill_jvm(): """Kill JVM for BioFormats """ slide_io.kill_jvm()
[docs] def load_registrar(src_f): """Load a Valis object Parameters ---------- src_f : string Path to pickled Valis object Returns ------- registrar : Valis Valis object used for registration """ registrar = pickle.load(open(src_f, 'rb')) data_dir = registrar.data_dir read_data_dir = os.path.split(src_f)[0] # If registrar has moved, will need to update paths to results # and displacement fields if data_dir != read_data_dir: new_dst_dir = os.path.split(read_data_dir)[0] registrar.dst_dir = new_dst_dir registrar.set_dst_paths() for slide_obj in registrar.slide_dict.values(): slide_obj.update_results_img_paths() return registrar
[docs] class Slide(object): """Stores registration info and warps slides/points `Slide` is a class that stores registration parameters and other metadata about a slide. Once registration has been completed, `Slide` is also able warp the slide and/or points using the same registration parameters. Warped slides can be saved as ome.tiff images with valid ome-xml. Attributes ---------- src_f : str Path to slide. image: ndarray Image to registered. Taken from a level in the image pyramid. However, image may be resized to fit within the `max_image_dim_px` argument specified when creating a `Valis` object. val_obj : Valis The "parent" object that registers all of the slide. reader : SlideReader Object that can read slides and collect metadata. original_xml : str Xml string created by bio-formats img_type : str Whether the image is "brightfield" or "fluorescence" is_rgb : bool Whether or not the slide is RGB. slide_shape_rc : tuple of int Dimensions of the largest resolution in the slide, in the form of (row, col). series : int Slide series to be read slide_dimensions_wh : ndarray Dimensions of all images in the pyramid (width, height). resolution : float Physical size of each pixel. units : str Physical unit of each pixel. name : str Name of the image. Usually `img_f` but with the extension removed. processed_img : ndarray Image used to perform registration rigid_reg_mask : ndarray Mask of convex hulls covering tissue in unregistered image. Could be used to mask `processed_img` before rigid registration non_rigid_reg_mask : ndarray Created by combining rigidly warped `rigid_reg_mask` in all other slides. stack_idx : int Position of image in sorted Z-stack processed_img_f : str Path to thumbnail of the processed `image`. rigid_reg_img_f : str Path to thumbnail of rigidly aligned `image`. non_rigid_reg_img_f : str Path to thumbnail of non-rigidly aligned `image`. processed_img_shape_rc : tuple of int Shape (row, col) of the processed image used to find the transformation parameters. Maximum dimension will be less or equal to the `max_processed_image_dim_px` specified when creating a `Valis` object. As such, this may be smaller than the image's shape. aligned_slide_shape_rc : tuple of int Shape (row, col) of aligned slide, based on the dimensions in the 0th level of they pyramid. In reg_img_shape_rc : tuple of int Shape (row, col) of the registered image M : ndarray Rigid transformation matrix that aligns `image` to the previous image in the stack. Found using the processed copy of `image`. bk_dxdy : ndarray (2, N, M) numpy array of pixel displacements in the x and y directions. dx = bk_dxdy[0], and dy=bk_dxdy[1]. Used to warp images. Found using the rigidly aligned version of the processed image. fwd_dxdy : ndarray Inverse of `bk_dxdy`. Used to warp points. _bk_dxdy_f : str Path to file containing bk_dxdy, if saved _fwd_dxdy_f : str Path to file containing fwd_dxdy, if saved _bk_dxdy_np : ndarray `bk_dxdy` as a numpy array. Only not None if `bk_dxdy` becomes associated with a file _fwd_dxdy_np : ndarray `fwd_dxdy` as a numpy array. Only not None if `fwd_dxdy` becomes associated with a file stored_dxdy : bool Whether or not the non-rigid displacements are saved in a file Should only occur if image is very large. fixed_slide : Slide Slide object to which this one was aligned. xy_matched_to_prev : ndarray Coordinates (x, y) of features in `image` that had matches in the previous image. Will have shape (N, 2) xy_in_prev : ndarray Coordinates (x, y) of features in the previous that had matches to those in `image`. Will have shape (N, 2) xy_matched_to_prev_in_bbox : ndarray Subset of `xy_matched_to_prev` that were within `overlap_mask_bbox_xywh`. Will either have shape (N, 2) or (M, 2), with M < N. xy_in_prev_in_bbox : ndarray Subset of `xy_in_prev` that were within `overlap_mask_bbox_xywh`. Will either have shape (N, 2) or (M, 2), with M < N. crop : str Crop method bg_px_pos_rc : tuple Position of pixel that has the background color bg_color : list, optional Color of background pixels is_empty : bool True if the image is empty (i.e. contains only 1 value) """
[docs] def __init__(self, src_f, image, val_obj, reader, name=None): """ Parameters ---------- src_f : str Path to slide. image: ndarray Image to registered. Taken from a level in the image pyramid. However, image may be resized to fit within the `max_image_dim_px` argument specified when creating a `Valis` object. val_obj : Valis The "parent" object that registers all of the slide. reader : SlideReader Object that can read slides and collect metadata. name : str, optional Name of slide. If None, it will be `src_f` with the extension removed """ self.src_f = src_f self.image = image self.val_obj = val_obj self.reader = reader # Metadata # self.is_rgb = reader.metadata.is_rgb self.img_type = reader.guess_image_type() self.slide_shape_rc = reader.metadata.slide_dimensions[0][::-1] self.series = reader.series self.slide_dimensions_wh = reader.metadata.slide_dimensions self.resolution = np.mean(reader.metadata.pixel_physical_size_xyu[0:2]) self.units = reader.metadata.pixel_physical_size_xyu[2] self.original_xml = reader.metadata.original_xml if name is None: name = valtils.get_name(src_f) self.name = name # To be filled in during registration # self.processed_img = None self.rigid_reg_mask = None self.non_rigid_reg_mask = None self.stack_idx = None self.aligned_slide_shape_rc = None self.processed_img_shape_rc = None self.reg_img_shape_rc = None self.M = None self.bk_dxdy = None self.fwd_dxdy = None self.stored_dxdy = False self._bk_dxdy_f = None self._fwd_dxdy_f = None self._bk_dxdy_np = None self._fwd_dxdy_np = None self.processed_img_f = None self.rigid_reg_img_f = None self.non_rigid_reg_img_f = None self.fixed_slide = None self.xy_matched_to_prev = None self.xy_in_prev = None self.xy_matched_to_prev_in_bbox = None self.xy_in_prev_in_bbox = None self.crop = None self.bg_px_pos_rc = (0, 0) self.bg_color = None self.is_empty = self.check_if_empty(image)
def check_if_empty(self, img): """Check if the image is empty Return ------ is_empty : bool Whether or not the image is empty """ is_empty = img.min() == img.max() return is_empty
[docs] def slide2image(self, level, series=None, xywh=None): """Convert slide to image Parameters ----------- level : int Pyramid level series : int, optional Series number. Defaults to 0 xywh : tuple of int, optional The region to be sliced from the slide. If None, then the entire slide will be converted. Otherwise xywh is the (top left x, top left y, width, height) of the region to be sliced. Returns ------- img : ndarray An image of the slide or the region defined by xywh """ img = self.reader.slide2image(level=level, series=series, xywh=xywh) return img
[docs] def slide2vips(self, level, series=None, xywh=None): """Convert slide to pyvips.Image Parameters ----------- level : int Pyramid level series : int, optional Series number. Defaults to 0 xywh : tuple of int, optional The region to be sliced from the slide. If None, then the entire slide will be converted. Otherwise xywh is the (top left x, top left y, width, height) of the region to be sliced. Returns ------- vips_slide : pyvips.Image An of the slide or the region defined by xywh """ vips_img = self.reader.slide2vips(level=level, series=series, xywh=xywh) return vips_img
def get_aligned_to_ref_slide_crop_xywh(self, ref_img_shape_rc, ref_M, scaled_ref_img_shape_rc=None): """Get bounding box used to crop slide to fit in reference image Parameters ---------- ref_img_shape_rc : tuple of int shape of reference image used to find registration parameters, i.e. processed image) ref_M : ndarray Transformation matrix for the reference image scaled_ref_img_shape_rc : tuple of int, optional shape of scaled image with shape `img_shape_rc`, i.e. slide corresponding to the image used to find the registration parameters. Returns ------- crop_xywh : tuple of int Bounding box of crop area (XYWH) mask : ndarray Mask covering reference image """ mask , _ = self.val_obj.get_crop_mask(CROP_REF) if scaled_ref_img_shape_rc is not None: sxy = np.array([*scaled_ref_img_shape_rc[::-1]]) / np.array([*ref_img_shape_rc[::-1]]) else: scaled_ref_img_shape_rc = ref_img_shape_rc sxy = np.ones(2) reg_txy = -ref_M[0:2, 2] slide_xywh = (*reg_txy*sxy, *scaled_ref_img_shape_rc[::-1]) return slide_xywh, mask def get_overlap_crop_xywh(self, warped_img_shape_rc, scaled_warped_img_shape_rc=None): """Get bounding box used to crop slide to where all slides overlap Parameters ---------- warped_img_shape_rc : tuple of int shape of registered image warped_scaled_img_shape_rc : tuple of int, optional shape of scaled registered image (i.e. registered slied) Returns ------- crop_xywh : tuple of int Bounding box of crop area (XYWH) """ mask , mask_bbox_xywh = self.val_obj.get_crop_mask(CROP_OVERLAP) if scaled_warped_img_shape_rc is not None: sxy = np.array([*scaled_warped_img_shape_rc[::-1]]) / np.array([*warped_img_shape_rc[::-1]]) else: sxy = np.ones(2) to_slide_transformer = transform.SimilarityTransform(scale=sxy) overlap_bbox = warp_tools.bbox2xy(mask_bbox_xywh) scaled_overlap_bbox = to_slide_transformer(overlap_bbox) scaled_overlap_xywh = warp_tools.xy2bbox(scaled_overlap_bbox) scaled_overlap_xywh[2:] = np.ceil(scaled_overlap_xywh[2:]) scaled_overlap_xywh = tuple(scaled_overlap_xywh.astype(int)) return scaled_overlap_xywh, mask def get_crop_xywh(self, crop, out_shape_rc=None): """Get bounding box used to crop aligned slide Parameters ---------- out_shape_rc : tuple of int, optional If crop is "reference", this should be the shape of scaled reference image, such as the unwarped slide that corresponds to the unwarped processed reference image. If crop is "overlap", this should be the shape of the registered slides. Returns ------- crop_xywh : tuple of int Bounding box of crop area (XYWH) mask : ndarray Mask, before crop """ ref_slide = self.val_obj.get_ref_slide() if crop == CROP_REF: transformation_shape_rc = np.array(ref_slide.processed_img_shape_rc) crop_xywh, mask = self.get_aligned_to_ref_slide_crop_xywh(ref_img_shape_rc=transformation_shape_rc, ref_M=ref_slide.M, scaled_ref_img_shape_rc=out_shape_rc) elif crop == CROP_OVERLAP: transformation_shape_rc = np.array(ref_slide.reg_img_shape_rc) crop_xywh, mask = self.get_overlap_crop_xywh(warped_img_shape_rc=transformation_shape_rc, scaled_warped_img_shape_rc=out_shape_rc) return crop_xywh, mask def get_crop_method(self, crop): """Get string or logic defining how to crop the image """ if crop is True: crop_method = self.crop else: crop_method = crop do_crop = crop_method in [CROP_REF, CROP_OVERLAP] if do_crop: return crop_method else: return False def get_bg_color_px_pos(self): """Get position of pixel that has color used for background """ if self.img_type == slide_tools.IHC_NAME: # RGB. Get brightest pixel eps = np.finfo("float").eps with colour.utilities.suppress_warnings(colour_usage_warnings=True): if 1 < self.image.max() <= 255 and np.issubdtype(self.image.dtype, np.integer): cam = colour.convert(self.image/255 + eps, 'sRGB', 'CAM16UCS') else: cam = colour.convert(self.image + eps, 'sRGB', 'CAM16UCS') lum = cam[..., 0] bg_px = np.unravel_index(np.argmax(lum, axis=None), lum.shape) else: # IF. Get darkest pixel sum_img = self.image.sum(axis=2) bg_px = np.unravel_index(np.argmin(sum_img, axis=None), sum_img.shape) self.bg_px_pos_rc = bg_px self.bg_color = list(self.image[bg_px]) def update_results_img_paths(self): n_digits = len(str(self.val_obj.size)) stack_id = str.zfill(str(self.stack_idx), n_digits) self.processed_img_f = os.path.join(self.val_obj.processed_dir, self.name + ".png") self.rigid_reg_img_f = os.path.join(self.val_obj.reg_dst_dir, f"{stack_id}_f{self.name}.png") self.non_rigid_reg_img_f = os.path.join(self.val_obj.non_rigid_dst_dir, f"{stack_id}_f{self.name}.png") if self.stored_dxdy: bk_dxdy_f, fwd_dxdy_f = self.get_displacement_f() self._bk_dxdy_f = bk_dxdy_f self._fwd_dxdy_f = fwd_dxdy_f def get_displacement_f(self): bk_dxdy_f = os.path.join(self.val_obj.displacements_dir, f"{self.name}_bk_dxdy.tiff") fwd_dxdy_f = os.path.join(self.val_obj.displacements_dir, f"{self.name}_fwd_dxdy.tiff") return bk_dxdy_f, fwd_dxdy_f def get_bk_dxdy(self): if self.stored_dxdy: bk_dxdy_f, _ = self.get_displacement_f() cropped_bk_dxdy = pyvips.Image.new_from_file(bk_dxdy_f) full_bk_dxdy = self.val_obj.pad_displacement(cropped_bk_dxdy, self.val_obj._full_displacement_shape_rc, self.val_obj._non_rigid_bbox) return full_bk_dxdy else: return self._bk_dxdy_np def set_bk_dxdy(self, bk_dxdy): """ Only set if an array """ if not isinstance(bk_dxdy, pyvips.Image): self._bk_dxdy_np = bk_dxdy else: print(f"Cannot set bk_dxdy when data is type {type(bk_dxdy)}") bk_dxdy = property(fget=get_bk_dxdy, fset=set_bk_dxdy, doc="Get and set backwards displacements") def get_fwd_dxdy(self): if self.stored_dxdy: _, fwd_dxdy_f = self.get_displacement_f() cropped_fwd_dxdy = pyvips.Image.new_from_file(fwd_dxdy_f) full_fwd_dxdy = self.val_obj.pad_displacement(cropped_fwd_dxdy, self.val_obj._full_displacement_shape_rc, self.val_obj._non_rigid_bbox) return full_fwd_dxdy else: return self._fwd_dxdy_np def set_fwd_dxdy(self, fwd_dxdy): if not isinstance(fwd_dxdy, pyvips.Image): self._fwd_dxdy_np = fwd_dxdy else: print(f"Cannot set fwd_dxdy when data is type {type(fwd_dxdy)}") fwd_dxdy = property(fget=get_fwd_dxdy, fset=set_fwd_dxdy, doc="Get forward displacements")
[docs] def warp_img(self, img=None, non_rigid=True, crop=True, interp_method="bicubic"): """Warp an image using the registration parameters img : ndarray, optional The image to be warped. If None, then Slide.image will be warped. non_rigid : bool Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. crop: bool, str How to crop the registered images. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. interp_method : str Interpolation method used when warping slide. Default is "bicubic" Returns ------- warped_img : ndarray Warped copy of `img` """ if img is None: img = self.image if non_rigid: dxdy = self.bk_dxdy else: dxdy = None if isinstance(img, pyvips.Image): img_shape_rc = (img.width, img.height) img_dim = img.bands else: img_shape_rc = img.shape[0:2] img_dim = img.ndim if not np.all(img_shape_rc == self.processed_img_shape_rc): msg = ("scaling transformation for image with different shape. " "However, without knowing all of other image's shapes, " "the scaling may not be the same for all images, and so " "may not overlap." ) valtils.print_warning(msg) same_shape = False img_scale_rc = np.array(img_shape_rc)/(np.array(self.processed_img_shape_rc)) out_shape_rc = self.val_obj.get_aligned_slide_shape(img_scale_rc) else: same_shape = True out_shape_rc = self.reg_img_shape_rc if isinstance(crop, bool) or isinstance(crop, str): crop_method = self.get_crop_method(crop) if crop_method is not False: if crop_method == CROP_REF: ref_slide = self.val_obj.get_ref_slide() if not same_shape: scaled_shape_rc = np.array(ref_slide.processed_img_shape_rc)*img_scale_rc else: scaled_shape_rc = ref_slide.processed_img_shape_rc elif crop_method == CROP_OVERLAP: scaled_shape_rc = out_shape_rc bbox_xywh, _ = self.get_crop_xywh(crop_method, scaled_shape_rc) else: bbox_xywh = None elif isinstance(crop[0], (int, float)) and len(crop) == 4: bbox_xywh = crop else: bbox_xywh = None if img_dim == self.image.ndim: bg_color = self.bg_color else: bg_color = None warped_img = \ warp_tools.warp_img(img, M=self.M, bk_dxdy=dxdy, out_shape_rc=out_shape_rc, transformation_src_shape_rc=self.processed_img_shape_rc, transformation_dst_shape_rc=self.reg_img_shape_rc, bbox_xywh=bbox_xywh, bg_color=bg_color, interp_method=interp_method) return warped_img
def warp_img_from_to(self, img, to_slide_obj, dst_slide_level=0, non_rigid=True, interp_method="bicubic", bg_color=None): """Warp an image from this slide onto another unwarped slide Note that if `img` is a labeled image then it is recommended to set `interp_method` to "nearest" Parameters ---------- img : ndarray, pyvips.Image Image to warp. Should be a scaled version of the same one used for registration to_slide_obj : Slide Slide to which the points will be warped. I.e. `xy` will be warped from this Slide to their position in the unwarped slide associated with `to_slide_obj`. dst_slide_level: int, tuple, optional Pyramid level of the slide/image that `img` will be warped on to non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. """ if np.issubdtype(type(dst_slide_level), np.integer): to_slide_src_shape_rc = to_slide_obj.slide_dimensions_wh[dst_slide_level][::-1] aligned_slide_shape = self.val_obj.get_aligned_slide_shape(dst_slide_level) else: to_slide_src_shape_rc = np.array(dst_slide_level) dst_scale_rc = (to_slide_src_shape_rc/np.array(to_slide_obj.processed_img_shape_rc)) aligned_slide_shape = np.round(dst_scale_rc*np.array(to_slide_obj.reg_img_shape_rc)).astype(int) if non_rigid: from_bk_dxdy = self.bk_dxdy to_fwd_dxdy = to_slide_obj.fwd_dxdy else: from_bk_dxdy = None to_fwd_dxdy = None warped_img = \ warp_tools.warp_img_from_to(img, from_M=self.M, from_transformation_src_shape_rc=self.processed_img_shape_rc, from_transformation_dst_shape_rc=self.reg_img_shape_rc, from_dst_shape_rc=aligned_slide_shape, from_bk_dxdy=from_bk_dxdy, to_M=to_slide_obj.M, to_transformation_src_shape_rc=to_slide_obj.processed_img_shape_rc, to_transformation_dst_shape_rc=to_slide_obj.reg_img_shape_rc, to_src_shape_rc=to_slide_src_shape_rc, to_fwd_dxdy=to_fwd_dxdy, bg_color=bg_color, interp_method=interp_method ) return warped_img
[docs] @valtils.deprecated_args(crop_to_overlap="crop") def warp_slide(self, level, non_rigid=True, crop=True, src_f=None, interp_method="bicubic", reader=None): """Warp a slide using registration parameters Parameters ---------- level : int Pyramid level to be warped non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True crop: bool, str How to crop the registered images. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. src_f : str, optional Path of slide to be warped. If None (the default), Slide.src_f will be used. Otherwise, the file to which `src_f` points to should be an alternative copy of the slide, such as one that has undergone processing (e.g. stain segmentation), has a mask applied, etc... interp_method : str Interpolation method used when warping slide. Default is "bicubic" """ if src_f is None: src_f = self.src_f if non_rigid: bk_dxdy = self.bk_dxdy else: bk_dxdy = None if level != 0: if not np.issubdtype(type(level), np.integer): msg = "Need slide level to be an integer indicating pyramid level" valtils.print_warning(msg) aligned_slide_shape = self.val_obj.get_aligned_slide_shape(level) else: aligned_slide_shape = self.aligned_slide_shape_rc if isinstance(crop, bool) or isinstance(crop, str): crop_method = self.get_crop_method(crop) if crop_method is not False: if crop_method == CROP_REF: ref_slide = self.val_obj.get_ref_slide() scaled_aligned_shape_rc = ref_slide.slide_dimensions_wh[level][::-1] elif crop_method == CROP_OVERLAP: scaled_aligned_shape_rc = aligned_slide_shape slide_bbox_xywh, _ = self.get_crop_xywh(crop=crop_method, out_shape_rc=scaled_aligned_shape_rc) if crop_method == CROP_REF: assert np.all(slide_bbox_xywh[2:] == scaled_aligned_shape_rc[::-1]) if src_f == self.src_f and self == ref_slide: # Shouldn't need to warp, but do checks just in case no_rigid = True no_non_rigid = True if self.M is not None: sxy = (scaled_aligned_shape_rc/self.processed_img_shape_rc)[::-1] scaled_txy = sxy*self.M[:2, 2] no_transforms = all(self.M[:2, :2].reshape(-1) == [1, 0, 0, 1]) crop_to_origin = np.all(np.abs(slide_bbox_xywh[0:2] + scaled_txy) < 1) no_rigid = no_transforms and crop_to_origin if self.bk_dxdy is not None: no_non_rigid = self.bk_dxdy.min() == 0 and self.bk_dxdy.max() == 0 if no_rigid and no_non_rigid: # Don't need to warp, so return original reference image ref_img = self.reader.slide2vips(level=level) return ref_img # else: # print("unexpectedly have to warp reference image. This may be due to an error") else: slide_bbox_xywh = None elif isinstance(crop[0], (int, float)) and len(crop) == 4: slide_bbox_xywh = crop else: slide_bbox_xywh = None if src_f == self.src_f: bg_color = self.bg_color else: bg_color = None if reader is None: reader = self.reader warped_slide = slide_tools.warp_slide(src_f, M=self.M, transformation_src_shape_rc=self.processed_img_shape_rc, transformation_dst_shape_rc=self.reg_img_shape_rc, aligned_slide_shape_rc=aligned_slide_shape, dxdy=bk_dxdy, level=level, series=self.series, interp_method=interp_method, bbox_xywh=slide_bbox_xywh, bg_color=bg_color, reader=reader) return warped_slide
[docs] def warp_and_save_slide(self, dst_f, level=0, non_rigid=True, crop=True, src_f=None, channel_names=None, colormap=slide_io.CMAP_AUTO, interp_method="bicubic", tile_wh=None, compression="lzw", Q=100, pyramid=True, reader=None): """Warp and save a slide Slides will be saved in the ome.tiff format. Parameters ---------- dst_f : str Path to were the warped slide will be saved. level : int Pyramid level to be warped non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True crop: bool, str How to crop the registered images. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initializing the `Valis object`. channel_names : list, optional List of channel names. If None, then Slide.reader will attempt to find the channel names associated with `src_f`. colormap : dict, optional Dictionary of channel colors, where the key is the channel name, and the value the color as rgb255. If None (default), the channel colors from `current_ome_xml_str` will be used, if available. If None, and there are no channel colors in the `current_ome_xml_str`, then no colors will be added src_f : str, optional Path of slide to be warped. If None (the default), Slide.src_f will be used. Otherwise, the file to which `src_f` points to should be an alternative copy of the slide, such as one that has undergone processing (e.g. stain segmentation), has a mask applied, etc... interp_method : str Interpolation method used when warping slide. Default is "bicubic" tile_wh : int, optional Tile width and height used to save image compression : str Compression method used to save ome.tiff . Default is lzw, but can also be jpeg or jp2k. See pyips for more details. Q : int Q factor for lossy compression pyramid : bool Whether or not to save an image pyramid. """ if src_f is None: src_f = self.src_f if reader is None: if src_f != self.src_f: slide_reader_cls = slide_io.get_slide_reader(src_f) reader = slide_reader_cls(src_f) else: reader = self.reader warped_slide = self.warp_slide(level=level, non_rigid=non_rigid, crop=crop, interp_method=interp_method, src_f=src_f, reader=reader) # Get ome-xml # ome_xml_obj = slide_io.update_xml_for_new_img(img=warped_slide, reader=reader, level=level, channel_names=channel_names, colormap=colormap) ome_xml = ome_xml_obj.to_xml() out_shape_wh = warp_tools.get_shape(warped_slide)[0:2][::-1] tile_wh = slide_io.get_tile_wh(reader=reader, level=level, out_shape_wh=out_shape_wh) slide_io.save_ome_tiff(warped_slide, dst_f=dst_f, ome_xml=ome_xml, tile_wh=tile_wh, compression=compression, Q=Q, pyramid=pyramid)
[docs] def warp_xy(self, xy, M=None, slide_level=0, pt_level=0, non_rigid=True, crop=True): """Warp points using registration parameters Warps `xy` to their location in the registered slide/image Parameters ---------- xy : ndarray (N, 2) array of points to be warped. Must be x,y coordinates slide_level: int, tuple, optional Pyramid level of the slide. Used to scale transformation matrices. Can also be the shape of the warped image (row, col) into which the points should be warped. Default is 0. pt_level: int, tuple, optional Pyramid level from which the points origingated. For example, if `xy` are from the centroids of cell segmentation performed on the full resolution image, this should be 0. Alternatively, the value can be a tuple of the image's shape (row, col) from which the points came. For example, if `xy` are bounding box coordinates from an analysis on a lower resolution image, then pt_level is that lower resolution image's shape (row, col). Default is 0. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True. crop: bool, str Apply crop to warped points by shifting points to the mask's origin. Note that this can result in negative coordinates, but might be useful if wanting to draw the coordinates on the registered slide, such as annotation coordinates. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. """ if M is None: M = self.M if np.issubdtype(type(pt_level), np.integer): pt_dim_rc = self.slide_dimensions_wh[pt_level][::-1] else: pt_dim_rc = np.array(pt_level) if np.issubdtype(type(slide_level), np.integer): if slide_level != 0: if np.issubdtype(type(slide_level), np.integer): aligned_slide_shape = self.val_obj.get_aligned_slide_shape(slide_level) else: aligned_slide_shape = np.array(slide_level) else: aligned_slide_shape = self.aligned_slide_shape_rc else: aligned_slide_shape = np.array(slide_level) if non_rigid: fwd_dxdy = self.fwd_dxdy else: fwd_dxdy = None warped_xy = warp_tools.warp_xy(xy, M=M, transformation_src_shape_rc=self.processed_img_shape_rc, transformation_dst_shape_rc=self.reg_img_shape_rc, src_shape_rc=pt_dim_rc, dst_shape_rc=aligned_slide_shape, fwd_dxdy=fwd_dxdy) crop_method = self.get_crop_method(crop) if crop_method is not False: if crop_method == CROP_REF: ref_slide = self.val_obj.get_ref_slide() if isinstance(slide_level, int): scaled_aligned_shape_rc = ref_slide.slide_dimensions_wh[slide_level][::-1] else: if len(slide_level) == 2: scaled_aligned_shape_rc = slide_level elif crop_method == CROP_OVERLAP: scaled_aligned_shape_rc = aligned_slide_shape crop_bbox_xywh, _ = self.get_crop_xywh(crop_method, scaled_aligned_shape_rc) warped_xy -= crop_bbox_xywh[0:2] return warped_xy
[docs] def warp_xy_from_to(self, xy, to_slide_obj, src_slide_level=0, src_pt_level=0, dst_slide_level=0, non_rigid=True): """Warp points from this slide to another unwarped slide Takes a set of points found in this unwarped slide, and warps them to their position in the unwarped "to" slide. Parameters ---------- xy : ndarray (N, 2) array of points to be warped. Must be x,y coordinates to_slide_obj : Slide Slide to which the points will be warped. I.e. `xy` will be warped from this Slide to their position in the unwarped slide associated with `to_slide_obj`. src_pt_level: int, tuple, optional Pyramid level of the slide/image in which `xy` originated. For example, if `xy` are from the centroids of cell segmentation performed on the unwarped full resolution image, this should be 0. Alternatively, the value can be a tuple of the image's shape (row, col) from which the points came. For example, if `xy` are bounding box coordinates from an analysis on a lower resolution image, then pt_level is that lower resolution image's shape (row, col). dst_slide_level: int, tuple, optional Pyramid level of the slide/image in to `xy` will be warped. Similar to `src_pt_level`, if `dst_slide_level` is an int then the points will be warped to that pyramid level. If `dst_slide_level` is the "to" image's shape (row, col), then the points will be warped to their location in an image with that same shape. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. """ if np.issubdtype(type(src_pt_level), np.integer): src_pt_dim_rc = self.slide_dimensions_wh[src_pt_level][::-1] else: src_pt_dim_rc = np.array(src_pt_level) if np.issubdtype(type(dst_slide_level), np.integer): to_slide_src_shape_rc = to_slide_obj.slide_dimensions_wh[dst_slide_level][::-1] else: to_slide_src_shape_rc = np.array(dst_slide_level) if src_slide_level != 0: if np.issubdtype(type(src_slide_level), np.integer): aligned_slide_shape = self.val_obj.get_aligned_slide_shape(src_slide_level) else: aligned_slide_shape = np.array(src_slide_level) else: aligned_slide_shape = self.aligned_slide_shape_rc if non_rigid: src_fwd_dxdy = self.fwd_dxdy dst_bk_dxdy = to_slide_obj.bk_dxdy else: src_fwd_dxdy = None dst_bk_dxdy = None xy_in_unwarped_to_img = \ warp_tools.warp_xy_from_to(xy=xy, from_M=self.M, from_transformation_dst_shape_rc=self.reg_img_shape_rc, from_transformation_src_shape_rc=self.processed_img_shape_rc, from_dst_shape_rc=aligned_slide_shape, from_src_shape_rc=src_pt_dim_rc, from_fwd_dxdy=src_fwd_dxdy, to_M=to_slide_obj.M, to_transformation_src_shape_rc=to_slide_obj.processed_img_shape_rc, to_transformation_dst_shape_rc=to_slide_obj.reg_img_shape_rc, to_src_shape_rc=to_slide_src_shape_rc, to_dst_shape_rc=aligned_slide_shape, to_bk_dxdy=dst_bk_dxdy ) return xy_in_unwarped_to_img
[docs] def warp_geojson(self, geojson_f, M=None, slide_level=0, pt_level=0, non_rigid=True, crop=True): """Warp geometry using registration parameters Warps geometries to their location in the registered slide/image Parameters ---------- geojson_f : str Path to geojson file containing the annotation geometries. Assumes coordinates are in pixels. slide_level: int, tuple, optional Pyramid level of the slide. Used to scale transformation matrices. Can also be the shape of the warped image (row, col) into which the points should be warped. Default is 0. pt_level: int, tuple, optional Pyramid level from which the points origingated. For example, if `xy` are from the centroids of cell segmentation performed on the full resolution image, this should be 0. Alternatively, the value can be a tuple of the image's shape (row, col) from which the points came. For example, if `xy` are bounding box coordinates from an analysis on a lower resolution image, then pt_level is that lower resolution image's shape (row, col). Default is 0. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True. crop: bool, str Apply crop to warped points by shifting points to the mask's origin. Note that this can result in negative coordinates, but might be useful if wanting to draw the coordinates on the registered slide, such as annotation coordinates. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. """ if M is None: M = self.M if np.issubdtype(type(pt_level), np.integer): pt_dim_rc = self.slide_dimensions_wh[pt_level][::-1] else: pt_dim_rc = np.array(pt_level) if np.issubdtype(type(slide_level), np.integer): if slide_level != 0: if np.issubdtype(type(slide_level), np.integer): aligned_slide_shape = self.val_obj.get_aligned_slide_shape(slide_level) else: aligned_slide_shape = np.array(slide_level) else: aligned_slide_shape = self.aligned_slide_shape_rc else: aligned_slide_shape = np.array(slide_level) if non_rigid: fwd_dxdy = self.fwd_dxdy else: fwd_dxdy = None with open(geojson_f) as f: annotation_geojson = json.load(f) crop_method = self.get_crop_method(crop) if crop_method is not False: if crop_method == CROP_REF: ref_slide = self.val_obj.get_ref_slide() if isinstance(slide_level, int): scaled_aligned_shape_rc = ref_slide.slide_dimensions_wh[slide_level][::-1] else: if len(slide_level) == 2: scaled_aligned_shape_rc = slide_level elif crop_method == CROP_OVERLAP: scaled_aligned_shape_rc = aligned_slide_shape crop_bbox_xywh, _ = self.get_crop_xywh(crop_method, scaled_aligned_shape_rc) shift_xy = crop_bbox_xywh[0:2] else: shift_xy = None warped_features = [None]*len(annotation_geojson["features"]) for i, ft in tqdm.tqdm(enumerate(annotation_geojson["features"]), desc=WARP_ANNO_MSG, unit="annotation"): geom = shapely.geometry.shape(ft["geometry"]) warped_geom = warp_tools.warp_shapely_geom(geom, M=M, transformation_src_shape_rc=self.processed_img_shape_rc, transformation_dst_shape_rc=self.reg_img_shape_rc, src_shape_rc=pt_dim_rc, dst_shape_rc=aligned_slide_shape, fwd_dxdy=fwd_dxdy, shift_xy=shift_xy) warped_ft = deepcopy(ft) warped_ft["geometry"] = shapely.geometry.mapping(warped_geom) warped_features[i] = warped_ft warped_geojson = {"type":annotation_geojson["type"], "features":warped_features} return warped_geojson
[docs] def warp_geojson_from_to(self, geojson_f, to_slide_obj, src_slide_level=0, src_pt_level=0, dst_slide_level=0, non_rigid=True): """Warp geoms in geojson file from annotation slide to another unwarped slide Takes a set of geometries found in this annotation slide, and warps them to their position in the unwarped "to" slide. Parameters ---------- geojson_f : str Path to geojson file containing the annotation geometries. Assumes coordinates are in pixels. to_slide_obj : Slide Slide to which the points will be warped. I.e. `xy` will be warped from this Slide to their position in the unwarped slide associated with `to_slide_obj`. src_pt_level: int, tuple, optional Pyramid level of the slide/image in which `xy` originated. For example, if `xy` are from the centroids of cell segmentation performed on the unwarped full resolution image, this should be 0. Alternatively, the value can be a tuple of the image's shape (row, col) from which the points came. For example, if `xy` are bounding box coordinates from an analysis on a lower resolution image, then pt_level is that lower resolution image's shape (row, col). dst_slide_level: int, tuple, optional Pyramid level of the slide/image in to `xy` will be warped. Similar to `src_pt_level`, if `dst_slide_level` is an int then the points will be warped to that pyramid level. If `dst_slide_level` is the "to" image's shape (row, col), then the points will be warped to their location in an image with that same shape. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Returns ------- warped_geojson : dict Dictionry of warped geojson geometries """ if np.issubdtype(type(src_pt_level), np.integer): src_pt_dim_rc = self.slide_dimensions_wh[src_pt_level][::-1] else: src_pt_dim_rc = np.array(src_pt_level) if np.issubdtype(type(dst_slide_level), np.integer): to_slide_src_shape_rc = to_slide_obj.slide_dimensions_wh[dst_slide_level][::-1] else: to_slide_src_shape_rc = np.array(dst_slide_level) if src_slide_level != 0: if np.issubdtype(type(src_slide_level), np.integer): aligned_slide_shape = self.val_obj.get_aligned_slide_shape(src_slide_level) else: aligned_slide_shape = np.array(src_slide_level) else: aligned_slide_shape = self.aligned_slide_shape_rc if non_rigid: src_fwd_dxdy = self.fwd_dxdy dst_bk_dxdy = to_slide_obj.bk_dxdy else: src_fwd_dxdy = None dst_bk_dxdy = None with open(geojson_f) as f: annotation_geojson = json.load(f) warped_features = [None]*len(annotation_geojson["features"]) for i, ft in tqdm.tqdm(enumerate(annotation_geojson["features"]), desc=WARP_ANNO_MSG, unit="annotation"): geom = shapely.geometry.shape(ft["geometry"]) warped_geom = warp_tools.warp_shapely_geom_from_to(geom=geom, from_M=self.M, from_transformation_dst_shape_rc=self.reg_img_shape_rc, from_transformation_src_shape_rc=self.processed_img_shape_rc, from_dst_shape_rc=aligned_slide_shape, from_src_shape_rc=src_pt_dim_rc, from_fwd_dxdy=src_fwd_dxdy, to_M=to_slide_obj.M, to_transformation_src_shape_rc=to_slide_obj.processed_img_shape_rc, to_transformation_dst_shape_rc=to_slide_obj.reg_img_shape_rc, to_src_shape_rc=to_slide_src_shape_rc, to_dst_shape_rc=aligned_slide_shape, to_bk_dxdy=dst_bk_dxdy ) warped_ft = deepcopy(ft) warped_ft["geometry"] = shapely.geometry.mapping(warped_geom) warped_features[i] = warped_ft warped_geojson = {"type":annotation_geojson["type"], "features":warped_features} return warped_geojson
[docs] class Valis(object): """Reads, registers, and saves a series of slides/images Implements the registration pipeline described in "VALIS: Virtual Alignment of pathoLogy Image Series" by Gatenbee et al. This pipeline will read images and whole slide images (WSI) using pyvips, bioformats, or openslide, and so should work with a wide variety of formats. VALIS can perform both rigid and non-rigid registration. The registered slides can be saved as ome.tiff slides that can be used in downstream analyses. The ome.tiff format is opensource and widely supported, being readable in several different programming languages (Python, Java, Matlab, etc...) and software, such as QuPath or HALO. The pipeline is fully automated and goes as follows: 1. Images/slides are converted to numpy arrays. As WSI are often too large to fit into memory, these images are usually lower resolution images from different pyramid levels. 2. Images are processed to single channel images. They are then normalized to make them look as similar as possible. 3. Image features are detected and then matched between all pairs of image. 4. If the order of images is unknown, they will be optimally ordered based on their feature similarity 5. Rigid registration is performed serially, with each image being rigidly aligned to the previous image in the stack. 6. Non-rigid registration is then performed either by 1) aliging each image towards the center of the stack, composing the deformation fields along the way, or 2) using groupwise registration that non-rigidly aligns the images to a common frame of reference. 7. Error is measured by calculating the distance between registered matched features. The transformations found by VALIS can then be used to warp the full resolution slides. It is also possible to merge non-RGB registered slides to create a highly multiplexed image. These aligned and/or merged slides can then be saved as ome.tiff images using pyvips. In addition to warping images and slides, VALIS can also warp point data, such as cell centoids or ROI coordinates. Attributes ---------- name : str Descriptive name of registrar, such as the sample's name. src_dir: str Path to directory containing the slides that will be registered. dst_dir : str Path to where the results should be saved. original_img_list : list of ndarray List of images converted from the slides in `src_dir` name_dict : dictionary Key=full path to image, value = name used to look up `Slide` in `Valis.slide_dict` slide_dims_dict_wh : Dictionary of slide dimensions. Only needed if dimensions not available in the slide/image's metadata. resolution_xyu: tuple Physical size per pixel and the unit. image_type : str Type of image, i.e. "brightfield" or "fluorescence" series : int Slide series to that was read. size : int Number of images to align aligned_img_shape_rc : tuple of int Shape (row, col) of aligned images aligned_slide_shape_rc : tuple of int Shape (row, col) of the aligned slides slide_dict : dict of Slide Dictionary of Slide objects, each of which contains information about a slide, and methods to warp it. brightfield_procsseing_fxn_str: str Name of function used to process brightfield images. if_procsseing_fxn_str : str Name of function used to process fluorescence images. max_image_dim_px : int Maximum width or height of images that will be saved. This limit is mostly to keep memory in check. max_processed_image_dim_px : int Maximum width or height of processed images. An important parameter, as it determines the size of of the image in which features will be detected and displacement fields computed. reference_img_f : str Filename of image that will be treated as the center of the stack. If None, the index of the middle image will be the reference. reference_img_idx : int Index of slide that corresponds to `reference_img_f`, after the `img_obj_list` has been sorted during rigid registration. align_to_reference : bool Whether or not images should be aligne to a reference image specified by `reference_img_f`. Will be set to True if `reference_img_f` is provided. crop: str, optional How to crop the registered images. rigid_registrar : SerialRigidRegistrar SerialRigidRegistrar object that performs the rigid registration. rigid_reg_kwargs : dict Dictionary of keyward arguments passed to `serial_rigid.register_images`. feature_descriptor_str : str Name of feature descriptor. feature_detector_str : str Name of feature detector. transform_str : str Name of rigid transform similarity_metric : str Name of similarity metric used to order slides. match_filter_method : str Name of method used to filter out poor feature matches. non_rigid_registrar : SerialNonRigidRegistrar SerialNonRigidRegistrar object that performs serial non-rigid registration. non_rigid_reg_kwargs : dict Dictionary of keyward arguments passed to `serial_non_rigid.register_images`. non_rigid_registrar_cls : NonRigidRegistrar Uninstantiated NonRigidRegistrar class that will be used by `non_rigid_registrar` to calculate the deformation fields between images. non_rigid_reg_class_str : str Name of the of class `non_rigid_registrar_cls` belongs to. thumbnail_size : int Maximum width or height of thumbnails that show results original_overlap_img : ndarray Image showing how original images overlap before registration. Created by merging coloring the inverted greyscale copies of each image, and then merging those images. rigid_overlap_img : ndarray Image showing how images overlap after rigid registration. non_rigid_overlap_img : ndarray Image showing how images overlap after rigid + non-rigid registration. has_rounds : bool Whether or not the contents of `src_dir` contain subdirectories that have single images spread across multiple files. An example would be .ndpis images. norm_method : str Name of method used to normalize the processed images target_processing_stats : ndarray Array of processed images' stats used to normalize all images summary_df : pd.Dataframe Pandas dataframe containing information about the results, such as the error, shape of aligned slides, time to completion, etc... start_time : float The time at which registation was initiated. end_rigid_time : float The time at which rigid registation was completed. end_non_rigid_time : float The time at which non-rigid registation was completed. qt_emitter : PySide2.QtCore.Signal Used to emit signals that update the GUI's progress bars _non_rigid_bbox : list Bounding box of area in which non-rigid registration was conducted _full_displacement_shape_rc : tuple Shape of full displacement field. Would be larger than `_non_rigid_bbox` if non-rigid registration only performed in a masked region _dup_names_dict : dictionary Dictionary describing which images would have been assigned duplicate names. Key= duplicated name, value=list of paths to images which would have been assigned the same name _empty_slides : dictionary Dictionary of `Slide` objects that have empty images. Ignored during registration but added back at the end Examples -------- Basic example using default parameters >>> from valis import registration, data >>> slide_src_dir = data.dcis_src_dir >>> results_dst_dir = "./slide_registration_example" >>> registered_slide_dst_dir = "./slide_registration_example/registered_slides" Perform registration >>> rigid_registrar, non_rigid_registrar, error_df = registrar.register() View results in "./slide_registration_example". If they look good, warp and save the slides as ome.tiff >>> registrar.warp_and_save_slides(registered_slide_dst_dir) This example shows how to register CyCIF images and then merge to create a high dimensional ome.tiff slide >>> registrar = registration.Valis(slide_src_dir, results_dst_dir) >>> rigid_registrar, non_rigid_registrar, error_df = registrar.register() Create function to get marker names from each slides' filename >>> def cnames_from_filename(src_f): ... f = valtils.get_name(src_f) ... return ["DAPI"] + f.split(" ")[1:4] ... >>> channel_name_dict = {f:cnames_from_filename(f) for f in registrar.original_img_list} >>> merged_img, channel_names, ome_xml = registrar.warp_and_merge_slides(merged_slide_dst_f, channel_name_dict=channel_name_dict) View ome.tiff, located at merged_slide_dst_f """
[docs] @valtils.deprecated_args(max_non_rigid_registartion_dim_px="max_non_rigid_registration_dim_px", img_type="image_type") def __init__(self, src_dir, dst_dir, series=None, name=None, image_type=None, feature_detector_cls=DEFAULT_FD, transformer_cls=DEFAULT_TRANSFORM_CLASS, affine_optimizer_cls=DEFAULT_AFFINE_OPTIMIZER_CLASS, similarity_metric=DEFAULT_SIMILARITY_METRIC, matcher=DEFAULT_MATCH_FILTER, imgs_ordered=False, non_rigid_registrar_cls=DEFAULT_NON_RIGID_CLASS, non_rigid_reg_params=DEFAULT_NON_RIGID_KWARGS, compose_non_rigid=False, img_list=None, reference_img_f=None, align_to_reference=False, do_rigid=True, crop=None, create_masks=True, denoise_rigid=True, check_for_reflections=False, resolution_xyu=None, slide_dims_dict_wh=None, max_image_dim_px=DEFAULT_MAX_IMG_DIM, max_processed_image_dim_px=DEFAULT_MAX_PROCESSED_IMG_SIZE, max_non_rigid_registration_dim_px=DEFAULT_MAX_PROCESSED_IMG_SIZE, thumbnail_size=DEFAULT_THUMBNAIL_SIZE, norm_method=DEFAULT_NORM_METHOD, micro_rigid_registrar_cls=None, micro_rigid_registrar_params={}, qt_emitter=None): """ src_dir: str Path to directory containing the slides that will be registered. dst_dir : str Path to where the results should be saved. name : str, optional Descriptive name of registrar, such as the sample's name series : int, optional Slide series to that was read. If None, series will be set to 0. image_type : str, optional The type of image, either "brightfield", "fluorescence", or "multi". If None, VALIS will guess `image_type` of each image, based on the number of channels and datatype. Will assume that RGB = "brightfield", otherwise `image_type` will be set to "fluorescence". feature_detector_cls : FeatureDD, optional Uninstantiated FeatureDD object that detects and computes image features. Default is VggFD. The available feature_detectors are found in the `feature_detectors` module. If a desired feature detector is not available, one can be created by subclassing `feature_detectors.FeatureDD`. transformer_cls : scikit-image Transform class, optional Uninstantiated scikit-image transformer used to find transformation matrix that will warp each image to the target image. Default is SimilarityTransform affine_optimizer_cls : AffineOptimzer class, optional Uninstantiated AffineOptimzer that will minimize a cost function to find the optimal affine transformations. If a desired affine optimization is not available, one can be created by subclassing `affine_optimizer.AffineOptimizer`. similarity_metric : str, optional Metric used to calculate similarity between images, which is in turn used to build the distance matrix used to sort the images. Can be "n_matches", or a string to used as distance in spatial.distance.cdist. "n_matches" is the number of matching features between image pairs. match_filter_method: str, optional "GMS" will use filter_matches_gms() to remove poor matches. This uses the Grid-based Motion Statistics (GMS) or RANSAC. imgs_ordered : bool, optional Boolean defining whether or not the order of images in img_dir are already in the correct order. If True, then each filename should begin with the number that indicates its position in the z-stack. If False, then the images will be sorted by ordering a feature distance matix. Default is False. reference_img_f : str, optional Filename of image that will be treated as the center of the stack. If None, the index of the middle image will be the reference. align_to_reference : bool, optional If `False`, images will be non-rigidly aligned serially towards the reference image. If `True`, images will be non-rigidly aligned directly to the reference image. If `reference_img_f` is None, then the reference image will be the one in the middle of the stack. non_rigid_registrar_cls : NonRigidRegistrar, optional Uninstantiated NonRigidRegistrar class that will be used to calculate the deformation fields between images. See the `non_rigid_registrars` module for a desciption of available methods. If a desired non-rigid registration method is not available, one can be implemented by subclassing.NonRigidRegistrar. If None, then only rigid registration will be performed non_rigid_reg_params: dictionary, optional Dictionary containing key, value pairs to be used to initialize `non_rigid_registrar_cls`. In the case where simple ITK is used by the, params should be a SimpleITK.ParameterMap. Note that numeric values nedd to be converted to strings. See the NonRigidRegistrar classes in `non_rigid_registrars` for the available non-rigid registration methods and arguments. compose_non_rigid : bool, optional Whether or not to compose non-rigid transformations. If `True`, then an image is non-rigidly warped before aligning to the adjacent non-rigidly aligned image. This allows the transformations to accumulate, which may bring distant features together but could also result in un-wanted deformations, particularly around the edges. If `False`, the image not warped before being aaligned to the adjacent non-rigidly aligned image. This can reduce unwanted deformations, but may not bring distant features together. img_list : list, dictionary, optional List of images to be registered. However, it can also be a dictionary, in which case the key: value pairs are full_path_to_image: name_of_image, where name_of_image is the key that can be used to access the image from Valis.slide_dict. do_rigid: bool, dictionary, optional Whether or not to perform rigid registration. If `False`, rigid registration will be skipped. If `do_rigid` is a dictionary, it should contain inverse transformation matrices to rigidly align images to the specificed by `reference_img_f`. M will be estimated for images that are not in the dictionary. Each key is the filename of the image associated with the transformation matrix, and value is a dictionary containing the following values: `M` : (required) a 3x3 inverse transformation matrix as a numpy array. Found by determining how to align fixed to moving. If `M` was found by determining how to align moving to fixed, then `M` will need to be inverted first. `transformation_src_shape_rc` : (optional) shape (row, col) of image used to find the rigid transformation. If not provided, then it is assumed to be the shape of the level 0 slide `transformation_dst_shape_rc` : (optional) shape of registered image. If not provided, this is assumed to be the shape of the level 0 reference slide. crop: str, optional How to crop the registered images. "overlap" will crop to include only areas where all images overlapped. "reference" crops to the area that overlaps with a reference image, defined by `reference_img_f`. This option can be used even if `reference_img_f` is `None` because the reference image will be set as the one at the center of the stack. If both `crop` and `reference_img_f` are `None`, `crop` will be set to "overlap". If `crop` is None, but `reference_img_f` is defined, then `crop` will be set to "reference". create_masks : bool, optional Whether or not to create and apply masks for registration. Can help focus alignment on the tissue, but can sometimes mask too much if there is a lot of variation in the image. denoise_rigid : bool, optional Whether or not to denoise processed images before rigid registion. Note that un-denoised images are used in the non-rigid registration check_for_reflections : bool, optional Determine if alignments are improved by relfecting/mirroring/flipping images. Optional because it requires re-detecting features in each version of the images and then re-matching features, and so can be time consuming and not always necessary. resolution_xyu: tuple, optional Physical size per pixel and the unit. If None (the default), these values will be determined for each slide using the slides' metadata. If provided, this physical pixel sizes will be used for all of the slides. This option is available in case one cannot easily access to the original slides, but does have the information on pixel's physical units. slide_dims_dict_wh : dict, optional Key= slide/image file name, value= dimensions = [(width, height), (width, height), ...] for each level. If None (the default), the slide dimensions will be pulled from the slides' metadata. If provided, those values will be overwritten. This option is available in case one cannot easily access to the original slides, but does have the information on the slide dimensions. max_image_dim_px : int, optional Maximum width or height of images that will be saved. This limit is mostly to keep memory in check. max_processed_image_dim_px : int, optional Maximum width or height of processed images. An important parameter, as it determines the size of of the image in which features will be detected and displacement fields computed. max_non_rigid_registration_dim_px : int, optional Maximum width or height of images used for non-rigid registration. Larger values may yeild more accurate results, at the expense of speed and memory. There is also a practical limit, as the specified size may be too large to fit in memory. mask_dict : dictionary Dictionary where key = overlap type (all, overlap, or reference), and value = (mask, mask_bbox_xywh) thumbnail_size : int, optional Maximum width or height of thumbnails that show results norm_method : str Name of method used to normalize the processed images. Options are None when normalization is not desired, "histo_match" for histogram matching and "img_stats" for normalizing by image statistics. See preprocessing.match_histograms and preprocessing.norm_khan for details. iter_order : list of tuples Each element of `iter_order` contains a tuple of stack indices. The first value is the index of the moving/current/from image, while the second value is the index of the moving/next/to image. micro_rigid_registrar_cls : MicroRigidRegistrar, optional Class used to perform higher resolution rigid registration. If `None`, this step is skipped. micro_rigid_registrar_params : dictionary Dictionary of keyword arguments used intialize the `MicroRigidRegistrar` qt_emitter : PySide2.QtCore.Signal, optional Used to emit signals that update the GUI's progress bars """ # Get name, based on src directory if name is None: if src_dir.endswith(os.path.sep): name = os.path.split(src_dir[:-1])[1] else: name = os.path.split(src_dir)[1] self.name = name.replace(" ", "_") # Set paths # self.src_dir = src_dir self.dst_dir = os.path.join(dst_dir, self.name) self.name_dict = None if img_list is not None: if isinstance(img_list, dict): # Key=original file name, value=name self.original_img_list = list(img_list.keys()) self.name_dict = img_list elif hasattr(img_list, "__iter__"): self.original_img_list = list(img_list) else: msg = (f"Cannot upack `img_list`, which is type {type(img_list).__name__}. " "Please provide an iterable object (list, tuple, array, etc...) that has the location of the images") valtils.print_warning(msg, rgb=Fore.RED) else: self.get_imgs_in_dir() if self.name_dict is None: self.name_dict = self.get_img_names(self.original_img_list) self.check_for_duplicated_names(self.original_img_list) self.set_dst_paths() # Some information may already be provided # self.slide_dims_dict_wh = slide_dims_dict_wh self.resolution_xyu = resolution_xyu self.image_type = image_type # Results fields # self.series = series self.size = 0 self.aligned_img_shape_rc = None self.aligned_slide_shape_rc = None self.slide_dict = {} # Fields related to image pre-processing # self.brightfield_procsseing_fxn_str = None self.if_procsseing_fxn_str = None if max_image_dim_px < max_processed_image_dim_px: msg = f"max_image_dim_px is {max_image_dim_px} but needs to be less or equal to {max_processed_image_dim_px}. Setting max_image_dim_px to {max_processed_image_dim_px}" valtils.print_warning(msg) max_image_dim_px = max_processed_image_dim_px self.max_image_dim_px = max_image_dim_px self.max_processed_image_dim_px = max_processed_image_dim_px self.max_non_rigid_registration_dim_px = max_non_rigid_registration_dim_px # Setup rigid registration # self.reference_img_idx = None self.reference_img_f = reference_img_f self.align_to_reference = align_to_reference self.iter_order = None self.do_rigid = do_rigid self.rigid_registrar = None self.micro_rigid_registrar_cls = micro_rigid_registrar_cls self.micro_rigid_registrar_params = micro_rigid_registrar_params self.denoise_rigid = denoise_rigid self._set_rigid_reg_kwargs(name=name, feature_detector=feature_detector_cls, similarity_metric=similarity_metric, matcher=matcher, transformer=transformer_cls, affine_optimizer=affine_optimizer_cls, imgs_ordered=imgs_ordered, reference_img_f=reference_img_f, check_for_reflections=check_for_reflections, qt_emitter=qt_emitter) # Setup non-rigid registration # self.non_rigid_registrar = None self.non_rigid_registrar_cls = non_rigid_registrar_cls if crop is None: if reference_img_f is None: self.crop = CROP_OVERLAP else: self.crop = CROP_REF else: self.crop = crop self.compose_non_rigid = compose_non_rigid if non_rigid_registrar_cls is not None: self._set_non_rigid_reg_kwargs(name=name, non_rigid_reg_class=non_rigid_registrar_cls, non_rigid_reg_params=non_rigid_reg_params, reference_img_f=reference_img_f, compose_non_rigid=compose_non_rigid, qt_emitter=qt_emitter) # Info realted to saving images to view results # self.mask_dict = None self.create_masks = create_masks self.thumbnail_size = thumbnail_size self.original_overlap_img = None self.rigid_overlap_img = None self.non_rigid_overlap_img = None self.micro_reg_overlap_img = None self.has_rounds = False self.norm_method = norm_method self.summary_df = None self.start_time = None self.end_rigid_time = None self.end_non_rigid_time = None self._empty_slides = {}
def _set_rigid_reg_kwargs(self, name, feature_detector, similarity_metric, matcher, transformer, affine_optimizer, imgs_ordered, reference_img_f, check_for_reflections, qt_emitter): """Set rigid registration kwargs Keyword arguments will be passed to `serial_rigid.register_images` """ if affine_optimizer is not None: afo = affine_optimizer(transform=transformer.__name__) else: afo = affine_optimizer self.rigid_reg_kwargs = {NAME_KEY: name, FD_KEY: feature_detector(), SIM_METRIC_KEY: similarity_metric, TRANSFORMER_KEY: transformer(), MATCHER_KEY: matcher, AFFINE_OPTIMIZER_KEY: afo, REF_IMG_KEY: reference_img_f, IMAGES_ORDERD_KEY: imgs_ordered, CHECK_REFLECT_KEY: check_for_reflections, QT_EMMITER_KEY: qt_emitter } # Save methods as strings since some objects cannot be pickled # self.feature_descriptor_str = self.rigid_reg_kwargs[FD_KEY].kp_descriptor_name self.feature_detector_str = self.rigid_reg_kwargs[FD_KEY].kp_detector_name self.transform_str = self.rigid_reg_kwargs[TRANSFORMER_KEY].__class__.__name__ self.similarity_metric = self.rigid_reg_kwargs[SIM_METRIC_KEY] self.match_filter_method = matcher.__class__.__name__ self.imgs_ordered = imgs_ordered def _set_non_rigid_reg_kwargs(self, name, non_rigid_reg_class, non_rigid_reg_params, reference_img_f, compose_non_rigid, qt_emitter): """Set non-rigid registration kwargs Keyword arguments will be passed to `serial_non_rigid.register_images` """ self.non_rigid_reg_kwargs = {NAME_KEY: name, NON_RIGID_REG_CLASS_KEY: non_rigid_reg_class, NON_RIGID_REG_PARAMS_KEY: non_rigid_reg_params, REF_IMG_KEY: reference_img_f, QT_EMMITER_KEY: qt_emitter, NON_RIGID_COMPOSE_KEY: compose_non_rigid } self.non_rigid_reg_class_str = self.non_rigid_reg_kwargs[NON_RIGID_REG_CLASS_KEY].__name__ def _add_empty_slides(self): # Fill in missing attributes for slide_name, slide_obj in self._empty_slides.items(): slide_obj.processed_img_shape_rc = slide_obj.image.shape[0:2] slide_obj.aligned_slide_shape_rc = self.aligned_slide_shape_rc slide_obj.reg_img_shape_rc = self.aligned_img_shape_rc slide_obj.processed_img = np.zeros(slide_obj.processed_img_shape_rc) slide_obj.rigid_reg_mask = np.full(slide_obj.processed_img_shape_rc, 255) slide_obj.non_rigid_reg_mask = np.full(slide_obj.reg_img_shape_rc, 255) slide_obj.M = np.eye(3) slide_obj.stack_idx = self.size self.size += 1 self.slide_dict[slide_name] = slide_obj def get_imgs_in_dir(self): """Get all images in Valis.src_dir """ full_path_list = [os.path.join(self.src_dir, f) for f in os.listdir(self.src_dir)] self.original_img_list = [] img_names = [] for f in full_path_list: if os.path.isfile(f): if slide_tools.get_img_type(f) is not None: self.original_img_list.append(f) img_names.append(valtils.get_name(f)) for f in full_path_list: if os.path.isdir(f): dir_name = os.path.split(f)[1] is_round, master_slide = slide_tools.determine_if_staining_round(f) if is_round: self.original_img_list.append(master_slide) else: # Some formats, like .mrxs have the main file but # data in a subdirectory with the same name matching_f = [ff for ff in full_path_list if re.search(dir_name, ff) is not None and os.path.split(ff)[1] != dir_name] if len(matching_f) == 1: if not matching_f[0] in self.original_img_list: # Make sure that file not already in list self.original_img_list.extend(matching_f) img_names.append(dir_name) elif len(matching_f) > 1: msg = f"found {len(matching_f)} matches for {dir_name}: {', '.join(matching_f)}" valtils.print_warning(msg, rgb=Fore.RED) elif len(matching_f) == 0: msg = f"Can't find slide file associated with {dir_name}" valtils.print_warning(msg, rgb=Fore.RED) def set_dst_paths(self): """Set paths to where the results will be saved. """ self.img_dir = os.path.join(self.dst_dir, CONVERTED_IMG_DIR) self.processed_dir = os.path.join(self.dst_dir, PROCESSED_IMG_DIR) self.reg_dst_dir = os.path.join(self.dst_dir, RIGID_REG_IMG_DIR) self.non_rigid_dst_dir = os.path.join(self.dst_dir, NON_RIGID_REG_IMG_DIR) self.deformation_field_dir = os.path.join(self.dst_dir, DEFORMATION_FIELD_IMG_DIR) self.overlap_dir = os.path.join(self.dst_dir, OVERLAP_IMG_DIR) self.data_dir = os.path.join(self.dst_dir, REG_RESULTS_DATA_DIR) self.displacements_dir = os.path.join(self.dst_dir, DISPLACEMENT_DIRS) self.micro_reg_dir = os.path.join(self.dst_dir, MICRO_REG_DIR) self.mask_dir = os.path.join(self.dst_dir, MASK_DIR)
[docs] def get_slide(self, src_f): """Get Slide Get the Slide associated with `src_f`. Slide store registration parameters and other metadata about the slide associated with `src_f`. Slide can also: * Convert the slide to a numpy array (Slide.slide2image) * Convert the slide to a pyvips.Image (Slide.slide2vips) * Warp the slide (Slide.warp_slide) * Save the warped slide as an ome.tiff (Slide.warp_and_save_slide) * Warp an image of the slide (Slide.warp_img) * Warp points (Slide.warp_xy) * Warp points in one slide to their position in another unwarped slide (Slide.warp_xy_from_to) * Access slide ome-xml (Slide.original_xml) See Slide for more details. Parameters ---------- src_f : str Path to the slide, or name assigned to slide (see Valis.name_dict) Returns ------- slide_obj : Slide Slide associated with src_f """ default_name = valtils.get_name(src_f) if src_f in self.name_dict.keys(): # src_f is full path to image assigned_name = self.name_dict[src_f] elif src_f in self.name_dict.values(): # src_f is name of image assigned_name = src_f else: # src_f isn't in name_dict assigned_name = None if default_name in self.slide_dict: # src_f is the image name or file name slide_obj = self.slide_dict[default_name] elif assigned_name in self.slide_dict: # src_f is full path and name was looked up slide_obj = self.slide_dict[assigned_name] elif src_f in self.slide_dict: # src_f is the name of the slide slide_obj = self.slide_dict[src_f] elif default_name in self._dup_names_dict: # default name has multiple matches n_matching = len(self._dup_names_dict[default_name]) possible_names_dict = {f: self.name_dict[f] for f in self._dup_names_dict[default_name]} msg = (f"\n{src_f} matches {n_matching} images in this dataset:\n" f"{pformat(self._dup_names_dict[default_name])}" f"\n\nPlease see `Valis.name_dict` to find correct name in " f"the dictionary. Either key (filenmae) or value (assigned name) will work:\n" f"{pformat(possible_names_dict)}") valtils.print_warning(msg, rgb=Fore.RED) slide_obj = None return slide_obj
def get_ref_slide(self): ref_slide = self.get_slide(self.reference_img_f) return ref_slide def get_img_names(self, img_list): """ Check that each image will have a unique name, which is based on the file name. Images that would otherwise have the same name are assigned extra ids, starting at 0. For example, if there were three images named "HE.tiff", they would be named "HE_0", "HE_1", and "HE_2". Parameters ---------- img_list : list List of image names Returns ------- name_dict : dict Dictionary, where key= full path to image, value = image name used as key in Valis.slide_dict """ img_df = pd.DataFrame({"img_f": img_list, "name": [valtils.get_name(f) for f in img_list]}) names_dict = {f: valtils.get_name(f) for f in img_list} count_df = img_df["name"].value_counts() dup_idx = np.where(count_df.values > 1)[0] if len(dup_idx) > 0: for i in dup_idx: dup_name = count_df.index[i] dup_paths = img_df["img_f"][img_df["name"] == dup_name] z = len(str(len(dup_paths))) msg = f"Detected {len(dup_paths)} images that would be named {dup_name}" valtils.print_warning(msg, rgb=Fore.RED) for j, p in enumerate(dup_paths): new_name = f"{names_dict[p]}_{str(j).zfill(z)}" msg = f"Renmaing {p} to {new_name} in Valis.slide_dict)" valtils.print_warning(msg) names_dict[p] = new_name return names_dict def check_for_duplicated_names(self, img_list): """ Create dictionary that tracks which files might be assigned the same name, which can happen if the filenames (minus the rest of the path) are the same """ default_names_dict = {} for f in img_list: default_name = valtils.get_name(f) if default_name not in default_names_dict: default_names_dict[default_name] = [f] else: default_names_dict[default_name].append(f) self._dup_names_dict = {k: v for k, v in default_names_dict.items() if len(v) > 1} def create_img_reader_dict(self, reader_dict=None, default_reader=None, series=None): if reader_dict is None: named_reader_dict = {} else: named_reader_dict = {valtils.get_name(f): reader_dict[f] for f in reader_dict.keys()} for i, slide_f in enumerate(self.original_img_list): slide_name = valtils.get_name(slide_f) if slide_name not in named_reader_dict: if default_reader is None: try: slide_reader_cls = slide_io.get_slide_reader(slide_f, series=series) except Exception as e: traceback_msg = traceback.format_exc() msg = f"Attempting to get reader for {slide_f} created the following error:\n{e}" valtils.print_warning(msg, rgb=Fore.RED, traceback_msg=traceback_msg) else: slide_reader_cls = default_reader slide_reader_kwargs = {"series": series} else: slide_reader_info = named_reader_dict[slide_name] if isinstance(slide_reader_info, list) or isinstance(slide_reader_info, tuple): if len(slide_reader_info) == 2: slide_reader_cls, slide_reader_kwargs = slide_reader_info elif len(slide_reader_info) == 1: # Provided processor, but no kwargs slide_reader_cls = slide_reader_info[0] slide_reader_kwargs = {} else: # Provided processor, but no kwargs slide_reader_kwargs = {} try: slide_reader = slide_reader_cls(src_f=slide_f, **slide_reader_kwargs) except Exception as e: traceback_msg = traceback.format_exc() msg = f"Attempting to read {slide_f} created the following error:\n{e}" valtils.print_warning(msg, rgb=Fore.RED, traceback_msg=traceback_msg) named_reader_dict[slide_name] = slide_reader return named_reader_dict def convert_imgs(self, series=None, reader_dict=None, reader_cls=None): """Convert slides to images and create dictionary of Slides. series : int, optional Slide series to be read. If None, the series with largest image will be read reader_cls : SlideReader, optional Uninstantiated SlideReader class that will convert the slide to an image, and also collect metadata. reader_dict: dict, optional Dictionary specifying which readers to use for individual images. The keys, value pairs are image filename and instantiated `slide_io.SlideReader` to use to read that file. Valis will try to find an appropritate reader for any omitted files, or will use `reader_cls` as the default. """ named_reader_dict = self.create_img_reader_dict(reader_dict=reader_dict, default_reader=reader_cls, series=series) img_types = [] self.size = 0 for f in tqdm.tqdm(self.original_img_list, desc=CONVERT_MSG, unit="image"): slide_name = valtils.get_name(f) reader = named_reader_dict[slide_name] slide_dims = reader.metadata.slide_dimensions levels_in_range = np.where(slide_dims.max(axis=1) < self.max_image_dim_px)[0] if len(levels_in_range) > 0: level = levels_in_range[0] else: level = len(slide_dims) - 1 vips_img = reader.slide2vips(level=level) scaling = np.min(self.max_image_dim_px/np.array([vips_img.width, vips_img.height])) if scaling < 1: vips_img = warp_tools.rescale_img(vips_img, scaling) img = warp_tools.vips2numpy(vips_img) slide_name = self.name_dict[f] slide_obj = Slide(f, img, self, reader, name=slide_name) slide_obj.crop = self.crop # Will overwrite data if provided. Can occur if reading images, not the actual slides # if self.slide_dims_dict_wh is not None: matching_slide = [k for k in self.slide_dims_dict_wh.keys() if valtils.get_name(k) == slide_obj.name][0] slide_dims = self.slide_dims_dict_wh[matching_slide] if slide_dims.ndim == 1: slide_dims = np.array([[slide_dims]]) slide_obj.slide_shape_rc = slide_dims[0][::-1] if self.resolution_xyu is not None: slide_obj.resolution = np.mean(self.resolution_xyu[0:2]) slide_obj.units = self.resolution_xyu[2] if slide_obj.is_empty: msg = f"{slide_obj.name} appears to be empty and will be skipped during registration" valtils.print_warning(msg) self._empty_slides[slide_obj.name] = slide_obj continue img_types.append(slide_obj.img_type) self.slide_dict[slide_obj.name] = slide_obj self.size += 1 if self.image_type is None: unique_img_types = list(set(img_types)) if len(unique_img_types) > 1: self.image_type = slide_tools.MULTI_MODAL_NAME else: self.image_type = unique_img_types[0] self.check_img_max_dims() def check_img_max_dims(self): """Ensure that all images have similar sizes. `max_image_dim_px` will be set to the maximum dimension of the smallest image if that value is less than max_image_dim_px """ og_img_sizes_wh = np.array([slide_obj.image.shape[0:2][::-1] for slide_obj in self.slide_dict.values()]) img_max_dims = og_img_sizes_wh.max(axis=1) min_max_wh = img_max_dims.min() scaling_for_og_imgs = min_max_wh/img_max_dims if np.any(scaling_for_og_imgs < 1): msg = f"Smallest image is less than max_image_dim_px. parameter max_image_dim_px is being set to {min_max_wh}" valtils.print_warning(msg) self.max_image_dim_px = min_max_wh for slide_obj in self.slide_dict.values(): # Rescale images scaling = self.max_image_dim_px/max(slide_obj.image.shape[0:2]) assert scaling <= self.max_image_dim_px if scaling < 1: slide_obj.image = warp_tools.rescale_img(slide_obj.image, scaling) if self.max_processed_image_dim_px > self.max_image_dim_px: msg = f"parameter max_processed_image_dim_px also being updated to {self.max_image_dim_px}" valtils.print_warning(msg) self.max_processed_image_dim_px = self.max_image_dim_px def create_original_composite_img(self, rigid_registrar): """Create imaage showing how images overlap before registration """ min_r = np.inf max_r = 0 min_c = np.inf max_c = 0 composite_img_list = [None] * self.size for i, img_obj in enumerate(rigid_registrar.img_obj_list): img = img_obj.image padded_img = transform.warp(img, img_obj.T, preserve_range=True, output_shape=img_obj.padded_shape_rc) composite_img_list[i] = padded_img img_corners_rc = warp_tools.get_corners_of_image(img.shape[0:2]) warped_corners_xy = warp_tools.warp_xy(img_corners_rc[:, ::-1], img_obj.T) min_r = min(warped_corners_xy[:, 1].min(), min_r) max_r = max(warped_corners_xy[:, 1].max(), max_r) min_c = min(warped_corners_xy[:, 0].min(), min_c) max_c = max(warped_corners_xy[:, 0].max(), max_c) composite_img = np.dstack(composite_img_list) cmap = viz.jzazbz_cmap() channel_colors = viz.get_n_colors(cmap, composite_img.shape[2]) overlap_img = viz.color_multichannel(composite_img, channel_colors, rescale_channels=True, normalize_by="channel", cspace="CAM16UCS") min_r = int(min_r) max_r = int(np.ceil(max_r)) min_c = int(min_c) max_c = int(np.ceil(max_c)) overlap_img = overlap_img[min_r:max_r, min_c:max_c] overlap_img = (255*overlap_img).astype(np.uint8) return overlap_img def measure_original_mmi(self, img1, img2): """Measure Mattes mutation inormation between 2 unregistered images. """ dst_rc = np.max([img1.shape, img2.shape], axis=1) padded_img_list = [None] * self.size for i, img in enumerate([img1, img2]): T = warp_tools.get_padding_matrix(img.shape, dst_rc) padded_img = transform.warp(img, T, preserve_range=True, output_shape=dst_rc) padded_img_list[i] = padded_img og_mmi = warp_tools.mattes_mi(padded_img_list[0], padded_img_list[1]) return og_mmi def create_img_processor_dict(self, brightfield_processing_cls=DEFAULT_BRIGHTFIELD_CLASS, brightfield_processing_kwargs=DEFAULT_BRIGHTFIELD_PROCESSING_ARGS, if_processing_cls=DEFAULT_FLOURESCENCE_CLASS, if_processing_kwargs=DEFAULT_FLOURESCENCE_PROCESSING_ARGS, processor_dict=None): """Create dictionary to get processors for each image Create dictionary to get processors for each image. If an image is not in `processing_dict`, this function will try to guess the modality and then assign a default processor. Parameters ---------- brightfield_processing_cls : ImageProcesser ImageProcesser to pre-process brightfield images to make them look as similar as possible. Should return a single channel uint8 image. brightfield_processing_kwargs : dict Dictionary of keyward arguments to be passed to `brightfield_processing_cls` if_processing_cls : ImageProcesser ImageProcesser to pre-process immunofluorescent images to make them look as similar as possible. Should return a single channel uint8 image. if_processing_kwargs : dict Dictionary of keyward arguments to be passed to `if_processing_cls` processor_dict : dict Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to ImageProcesser.process_img. If `None`, then this function will assign a processor to each image. Returns ------- named_processing_dict : Dict Each key is the name of the slide, and the value is a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to ImageProcesser.process_img """ if processor_dict is None: named_processing_dict = {} else: named_processing_dict = {self.get_slide(f).name: processor_dict[f] for f in processor_dict.keys()} for i, slide_obj in enumerate(self.slide_dict.values()): if slide_obj.name in named_processing_dict: slide_p = named_processing_dict[slide_obj.name] if isinstance(slide_p, list): if len(slide_p) == 2: slide_p, slide_kwargs = slide_p elif len(slide_p) == 1: # Provided processor, but no kwargs slide_kwargs = {} else: # Provided processor, but no kwargs slide_kwargs = {} named_processing_dict[slide_obj.name] = [slide_p, slide_kwargs] else: # Processor not provided, so assign one based on inferred modality is_ihc = slide_obj.img_type == slide_tools.IHC_NAME if is_ihc: processing_cls = brightfield_processing_cls processing_kwargs = brightfield_processing_kwargs else: processing_cls = if_processing_cls processing_kwargs = if_processing_kwargs named_processing_dict[slide_obj.name] = [processing_cls, processing_kwargs] return named_processing_dict def process_imgs(self, processor_dict): """Process images to make them look as similar as possible Images will also be normalized after images are processed Parameters ---------- processor_dict : dict Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to the processor. If `None`, then a default processor will be used for each image based on the inferred modality. """ pathlib.Path(self.processed_dir).mkdir(exist_ok=True, parents=True) if self.norm_method is not None: if self.norm_method == "histo_match": ref_histogram = np.zeros(256, dtype=np.int) else: all_v = [None]*self.size for i, slide_obj in enumerate(tqdm.tqdm(self.slide_dict.values(), desc=PROCESS_IMG_MSG, unit="image")): levels_in_range = np.where(slide_obj.slide_dimensions_wh.max(axis=1) < self.max_processed_image_dim_px)[0] if len(levels_in_range) > 0: level = levels_in_range[0] else: level = len(slide_obj.slide_dimensions_wh) - 1 processing_cls, processing_kwargs = processor_dict[slide_obj.name] processor = processing_cls(image=slide_obj.image, src_f=slide_obj.src_f, level=level, series=slide_obj.series, reader=slide_obj.reader) try: processed_img = processor.process_image(**processing_kwargs) except TypeError: # processor.process_image doesn't take kwargs processed_img = processor.process_image() processed_img = exposure.rescale_intensity(processed_img, out_range=(0, 255)).astype(np.uint8) scaling = np.min(self.max_processed_image_dim_px/np.array(processed_img.shape[0:2])) if scaling < 1: processed_img = warp_tools.rescale_img(processed_img, scaling) if self.create_masks: # Get masks # pathlib.Path(self.mask_dir).mkdir(exist_ok=True, parents=True) # Slice region from slide and process too mask = processor.create_mask() if not np.all(mask.shape == processed_img.shape[0:2]): mask = warp_tools.resize_img(mask, processed_img.shape[0:2], interp_method="nearest") slide_obj.rigid_reg_mask = mask # Save image with mask drawn on top of it thumbnail_mask = self.create_thumbnail(mask) if slide_obj.img_type == slide_tools.IHC_NAME: thumbnail_img = self.create_thumbnail(slide_obj.image) else: thumbnail_img = self.create_thumbnail(processed_img) thumbnail_mask_outline = viz.draw_outline(thumbnail_img, thumbnail_mask) outline_f_out = os.path.join(self.mask_dir, f'{slide_obj.name}.png') warp_tools.save_img(outline_f_out, thumbnail_mask_outline) else: mask = np.full(processed_img.shape, 255, dtype=np.uint8) slide_obj.rigid_reg_mask = mask slide_obj.processed_img = processed_img processed_f_out = os.path.join(self.processed_dir, slide_obj.name + ".png") slide_obj.processed_img_f = processed_f_out slide_obj.processed_img_shape_rc = np.array(processed_img.shape[0:2]) warp_tools.save_img(processed_f_out, processed_img) img_for_stats = processed_img.reshape(-1) if self.norm_method is not None: if self.norm_method == "histo_match": img_hist, _ = np.histogram(img_for_stats, bins=256) ref_histogram += img_hist else: all_v[i] = img_for_stats.reshape(-1) if self.norm_method is not None: if self.norm_method == "histo_match": target_stats = ref_histogram else: all_v = np.hstack(all_v) target_stats = all_v self.normalize_images(target_stats) def denoise_images(self): for i, slide_obj in enumerate(tqdm.tqdm(self.slide_dict.values(), desc=DENOISE_MSG, unit="image")): if slide_obj.rigid_reg_mask is None: is_ihc = slide_obj.img_type == slide_tools.IHC_NAME _, tissue_mask = preprocessing.create_tissue_mask(slide_obj.image, is_ihc) mask_bbox = warp_tools.xy2bbox(warp_tools.mask2xy(tissue_mask)) c0, r0 = mask_bbox[:2] c1, r1 = mask_bbox[:2] + mask_bbox[2:] denoise_mask = np.zeros_like(tissue_mask) denoise_mask[r0:r1, c0:c1] = 255 else: denoise_mask = slide_obj.rigid_reg_mask denoised = preprocessing.denoise_img(slide_obj.processed_img, mask=denoise_mask) warp_tools.save_img(slide_obj.processed_img_f, denoised) def normalize_images(self, target): """Normalize intensity values in images Parameters ---------- target : ndarray Target statistics used to normalize images """ # print("\n==== Normalizing images\n") for i, slide_obj in enumerate(tqdm.tqdm(self.slide_dict.values(), desc=NORM_IMG_MSG, unit="image")): vips_img = pyvips.Image.new_from_file(slide_obj.processed_img_f) img = warp_tools.vips2numpy(vips_img) if self.norm_method == "histo_match": self.target_processing_stats = target normed_img = preprocessing.match_histograms(img, self.target_processing_stats) elif self.norm_method == "img_stats": self.target_processing_stats = preprocessing.get_channel_stats(target) normed_img = preprocessing.norm_img_stats(img, self.target_processing_stats) normed_img = exposure.rescale_intensity(normed_img, out_range=(0, 255)).astype(np.uint8) slide_obj.processed_img = normed_img slide_obj.processed_img_shape_rc = np.array(normed_img.shape[0:2]) warp_tools.save_img(slide_obj.processed_img_f, normed_img) def create_thumbnail(self, img, rescale_color=False): """Create thumbnail image to view results """ is_vips = isinstance(img, pyvips.Image) img_shape = warp_tools.get_shape(img) scaling = np.min(self.thumbnail_size/np.array(img_shape[:2])) if scaling < 1: thumbnail = warp_tools.rescale_img(img, scaling) else: thumbnail = img if rescale_color is True: if is_vips: # Convert to numpy to rescale thumbnail = warp_tools.vips2numpy(img) thumbnail = exposure.rescale_intensity(thumbnail, out_range=(0, 255)).astype(np.uint8) if is_vips: # Convert back to pyvips thumbnail = warp_tools.numpy2vips(thumbnail) return thumbnail def draw_overlap_img(self, img_list): """Create image showing the overlap of registered images """ composite_img = np.dstack(img_list) cmap = viz.jzazbz_cmap() channel_colors = viz.get_n_colors(cmap, composite_img.shape[2]) overlap_img = viz.color_multichannel(composite_img, channel_colors, rescale_channels=True, normalize_by="channel", cspace="CAM16UCS") overlap_img = exposure.equalize_adapthist(overlap_img) overlap_img = exposure.rescale_intensity(overlap_img, out_range=(0, 255)).astype(np.uint8) return overlap_img def get_ref_img_mask(self, rigid_registrar): """Create mask that covers reference image Returns ------- mask : ndarray Mask that covers reference image in registered images mask_bbox_xywh : tuple of int XYWH of mask in reference image """ ref_name = self.name_dict[self.reference_img_f] ref_slide = rigid_registrar.img_obj_dict[ref_name] ref_shape_wh = ref_slide.image.shape[0:2][::-1] uw_mask = np.full(ref_shape_wh[::-1], 255, dtype=np.uint8) mask = warp_tools.warp_img(uw_mask, ref_slide.M, out_shape_rc=ref_slide.registered_shape_rc) reg_txy = -ref_slide.M[0:2, 2] mask_bbox_xywh = np.array([*reg_txy, *ref_shape_wh]) return mask, mask_bbox_xywh def get_all_overlap_mask(self, rigid_registrar): """Create mask that covers all tissue Returns ------- mask : ndarray Mask that covers reference image in registered images mask_bbox_xywh : tuple of int XYWH of mask in reference image """ ref_name = self.name_dict[self.reference_img_f] ref_slide = rigid_registrar.img_obj_dict[ref_name] combo_mask = np.zeros(ref_slide.registered_shape_rc, dtype=int) for img_obj in rigid_registrar.img_obj_list: img_mask = self.slide_dict[img_obj.name].rigid_reg_mask warped_img_mask = warp_tools.warp_img(img_mask, M=img_obj.M, out_shape_rc=img_obj.registered_shape_rc, interp_method="nearest") combo_mask[warped_img_mask > 0] += 1 temp_mask = 255*filters.apply_hysteresis_threshold(combo_mask, 0.5, self.size-0.5).astype(np.uint8) mask = 255*ndimage.binary_fill_holes(temp_mask).astype(np.uint8) mask = preprocessing.mask2contours(mask) mask_bbox_xywh = warp_tools.xy2bbox(warp_tools.mask2xy(mask)) return mask, mask_bbox_xywh def get_null_overlap_mask(self, rigid_registrar): """Create mask that covers all of the image. Not really a mask Returns ------- mask : ndarray Mask that covers reference image in registered images mask_bbox_xywh : tuple of int XYWH of mask in reference image """ reg_shape = rigid_registrar.img_obj_list[0].registered_shape_rc mask = np.full(reg_shape, 255, dtype=np.uint8) mask_bbox_xywh = np.array([0, 0, reg_shape[1], reg_shape[0]]) return mask, mask_bbox_xywh def create_crop_masks(self, rigid_registrar): """Create masks based on rigid registration """ mask_dict = {} mask_dict[CROP_REF] = self.get_ref_img_mask(rigid_registrar) mask_dict[CROP_OVERLAP] = self.get_all_overlap_mask(rigid_registrar) mask_dict[CROP_NONE] = self.get_null_overlap_mask(rigid_registrar) self.mask_dict = mask_dict def get_crop_mask(self, overlap_type): """Get overlap mask and bounding box Returns ------- mask : ndarray Mask mask_xywh : tuple XYWH for bounding box around mask """ if overlap_type is None: overlap_type = CROP_NONE return self.mask_dict[overlap_type] def rigid_register_partial(self, tform_dict=None): """Perform rigid registration using provided parameters Still sorts images by similarity for use with non-rigid registration. tform_dict : dictionary Dictionary with rigid registration parameters. Each key is the image's file name, and the values are another dictionary with transformation parameters: M: 3x3 inverse transformation matrix. Found by determining how to align fixed to moving. If M was found by determining how to align moving to fixed, then it will need to be inverted transformation_src_shape_rc: shape (row, col) of image used to find the rigid transformation. If not provided, then it is assumed to be the shape of the level 0 slide transformation_dst_shape_rc: shape of registered image. If not presesnt, but a reference was provided and `transformation_src_shape_rc` was not provided, this is assumed to be the shape of the reference slide If None, then all rigid M will be the identity matrix """ # Still need to sort images # rigid_registrar = serial_rigid.SerialRigidRegistrar(self.processed_dir, imgs_ordered=self.imgs_ordered, reference_img_f=self.reference_img_f, name=self.name, align_to_reference=self.align_to_reference) feature_detector = self.rigid_reg_kwargs[FD_KEY] matcher = self.rigid_reg_kwargs[MATCHER_KEY] similarity_metric = self.rigid_reg_kwargs[SIM_METRIC_KEY] transformer = self.rigid_reg_kwargs[TRANSFORMER_KEY] # print("\n======== Detecting features\n") rigid_registrar.generate_img_obj_list(feature_detector) if self.create_masks: # Remove feature points outside of mask for img_obj in rigid_registrar.img_obj_dict.values(): slide_obj = self.get_slide(img_obj.name) features_in_mask_idx = warp_tools.get_xy_inside_mask(xy=img_obj.kp_pos_xy, mask=slide_obj.rigid_reg_mask) if len(features_in_mask_idx) > 0: img_obj.kp_pos_xy = img_obj.kp_pos_xy[features_in_mask_idx, :] img_obj.desc = img_obj.desc[features_in_mask_idx, :] # print("\n======== Matching images\n") if rigid_registrar.aleady_sorted: rigid_registrar.match_sorted_imgs(matcher, keep_unfiltered=False) for i, img_obj in enumerate(rigid_registrar.img_obj_list): img_obj.stack_idx = i else: rigid_registrar.match_imgs(matcher, keep_unfiltered=False) # print("\n======== Sorting images\n") rigid_registrar.build_metric_matrix(metric=similarity_metric) rigid_registrar.sort() rigid_registrar.distance_metric_name = matcher.metric_name rigid_registrar.distance_metric_type = matcher.metric_type rigid_registrar.get_iter_order() if rigid_registrar.size > 2: rigid_registrar.update_match_dicts_with_neighbor_filter(transformer, matcher) if self.reference_img_f is not None: ref_name = self.name_dict[self.reference_img_f] else: ref_name = valtils.get_name(rigid_registrar.reference_img_f) if self.do_rigid is not False: msg = " ".join([f"Best to specify `{REF_IMG_KEY}` when manually providing `{TFORM_MAT_KEY}`.", f"Setting this image to be {ref_name}"]) valtils.print_warning(msg) # Get output shapes # if tform_dict is None: named_tform_dict = {o.name: {"M":np.eye(3)} for o in rigid_registrar.img_obj_list} else: named_tform_dict = {valtils.get_name(k):v for k, v in tform_dict.items()} # Get output shapes # rigid_ref_obj = rigid_registrar.img_obj_dict[ref_name] ref_slide_obj = self.get_ref_slide() if ref_name in named_tform_dict.keys(): ref_tforms = named_tform_dict[ref_name] if TFORM_SRC_SHAPE_KEY in ref_tforms: ref_tform_src_shape_rc = ref_tforms[TFORM_SRC_SHAPE_KEY] else: ref_tform_src_shape_rc = ref_slide_obj.slide_dimensions_wh[0][::-1] if TFORM_DST_SHAPE_KEY in ref_tforms: temp_out_shape_rc = ref_tforms[TFORM_DST_SHAPE_KEY] else: # Assume M was found by aligning to level 0 reference temp_out_shape_rc = ref_slide_obj.slide_dimensions_wh[0][::-1] ref_to_reg_sxy = (np.array(rigid_ref_obj.image.shape)/np.array(ref_tform_src_shape_rc))[::-1] out_rc = np.round(temp_out_shape_rc*ref_to_reg_sxy).astype(int) else: out_rc = rigid_ref_obj.image.shape scaled_M_dict = {} for img_name, img_tforms in named_tform_dict.items(): matching_rigid_obj = rigid_registrar.img_obj_dict[img_name] matching_slide_obj = self.slide_dict[img_name] if TFORM_SRC_SHAPE_KEY in img_tforms: og_src_shape_rc = img_tforms[TFORM_SRC_SHAPE_KEY] else: og_src_shape_rc = matching_slide_obj.slide_dimensions_wh[0][::-1] temp_M = img_tforms[TFORM_MAT_KEY] if temp_M.shape[0] == 2: temp_M = np.vstack([temp_M, [0, 0, 1]]) if TFORM_DST_SHAPE_KEY in img_tforms: og_dst_shape_rc = img_tforms[TFORM_DST_SHAPE_KEY] else: og_dst_shape_rc = ref_slide_obj.slide_dimensions_wh[0][::-1] img_corners_xy = warp_tools.get_corners_of_image(matching_rigid_obj.image.shape)[::-1] warped_corners = warp_tools.warp_xy(img_corners_xy, M=temp_M, transformation_src_shape_rc=og_src_shape_rc, transformation_dst_shape_rc=og_dst_shape_rc, src_shape_rc=matching_rigid_obj.image.shape, dst_shape_rc=out_rc) M_tform = transform.ProjectiveTransform() M_tform.estimate(warped_corners, img_corners_xy) for_reg_M = M_tform.params scaled_M_dict[matching_rigid_obj.name] = for_reg_M matching_rigid_obj.M = for_reg_M # Find M if not provided for moving_idx, fixed_idx in tqdm.tqdm(rigid_registrar.iter_order, desc=TRANSFORM_MSG, unit="image"): img_obj = rigid_registrar.img_obj_list[moving_idx] if img_obj.name in scaled_M_dict: continue prev_img_obj = rigid_registrar.img_obj_list[fixed_idx] img_obj.fixed_obj = prev_img_obj print(f"finding M for {img_obj.name}, which is being aligned to {prev_img_obj.name}") if fixed_idx == rigid_registrar.reference_img_idx: prev_M = np.eye(3) to_prev_match_info = img_obj.match_dict[prev_img_obj] src_xy = to_prev_match_info.matched_kp1_xy dst_xy = warp_tools.warp_xy(to_prev_match_info.matched_kp2_xy, prev_M) transformer.estimate(dst_xy, src_xy) img_obj.M = transformer.params prev_M = img_obj.M # Add registered image for img_obj in rigid_registrar.img_obj_list: img_obj.M_inv = np.linalg.inv(img_obj.M) img_obj.registered_img = warp_tools.warp_img(img=img_obj.image, M=img_obj.M, out_shape_rc=out_rc) img_obj.registered_shape_rc = img_obj.registered_img.shape[0:2] return rigid_registrar def rigid_register(self): """Rigidly register slides Also saves thumbnails of rigidly registered images. Returns ------- rigid_registrar : SerialRigidRegistrar SerialRigidRegistrar object that performed the rigid registration. """ if self.denoise_rigid: self.denoise_images() print("\n==== Rigid registration\n") if self.do_rigid is True: rigid_registrar = serial_rigid.register_images(self.processed_dir, align_to_reference=self.align_to_reference, valis_obj=self, **self.rigid_reg_kwargs) else: if isinstance(self.do_rigid, dict): # User provided transforms rigid_tforms = self.do_rigid elif self.do_rigid is False: # Skip rigid registration rigid_tforms = None rigid_registrar = self.rigid_register_partial(tform_dict=rigid_tforms) self.end_rigid_time = time() self.rigid_registrar = rigid_registrar if rigid_registrar is False: msg = "Rigid registration failed" valtils.print_warning(msg, rgb=Fore.RED) return False # Draw and save overlap image # self.aligned_img_shape_rc = rigid_registrar.img_obj_list[0].registered_shape_rc self.reference_img_idx = rigid_registrar.reference_img_idx ref_slide = self.slide_dict[valtils.get_name(rigid_registrar.reference_img_f)] self.reference_img_f = ref_slide.src_f self.create_crop_masks(rigid_registrar) overlap_mask, overlap_mask_bbox_xywh = self.get_crop_mask(self.crop) overlap_mask_bbox_xywh = overlap_mask_bbox_xywh.astype(int) # Create original overlap image # self.original_overlap_img = self.create_original_composite_img(rigid_registrar) pathlib.Path(self.overlap_dir).mkdir(exist_ok=True, parents=True) original_overlap_img_fout = os.path.join(self.overlap_dir, self.name + "_original_overlap.png") warp_tools.save_img(original_overlap_img_fout, self.original_overlap_img, thumbnail_size=self.thumbnail_size) pathlib.Path(self.reg_dst_dir).mkdir(exist_ok= True, parents= True) # Update attributes in slide_obj # n_digits = len(str(rigid_registrar.size)) for slide_reg_obj in rigid_registrar.img_obj_list: slide_obj = self.slide_dict[slide_reg_obj.name] slide_obj.M = slide_reg_obj.M slide_obj.stack_idx = slide_reg_obj.stack_idx slide_obj.reg_img_shape_rc = slide_reg_obj.registered_img.shape slide_obj.rigid_reg_img_f = os.path.join(self.reg_dst_dir, str.zfill(str(slide_obj.stack_idx), n_digits) + "_" + slide_obj.name + ".png") if slide_obj.image.ndim > 2: # Won't know if single channel image is processed RGB (bight bg) or IF channel (dark bg) slide_obj.get_bg_color_px_pos() if slide_reg_obj.stack_idx == self.reference_img_idx: continue fixed_slide = self.slide_dict[slide_reg_obj.fixed_obj.name] slide_obj.fixed_slide = fixed_slide match_dict = slide_reg_obj.match_dict[slide_reg_obj.fixed_obj] slide_obj.xy_matched_to_prev = match_dict.matched_kp1_xy slide_obj.xy_in_prev = match_dict.matched_kp2_xy # Get points in overlap box # prev_kp_warped_for_bbox_test = warp_tools.warp_xy(slide_obj.xy_in_prev, M=slide_obj.M) _, prev_kp_in_bbox_idx = \ warp_tools.get_pts_in_bbox(prev_kp_warped_for_bbox_test, overlap_mask_bbox_xywh) current_kp_warped_for_bbox_test = \ warp_tools.warp_xy(slide_obj.xy_matched_to_prev, M=slide_obj.M) _, current_kp_in_bbox_idx = \ warp_tools.get_pts_in_bbox(current_kp_warped_for_bbox_test, overlap_mask_bbox_xywh) matched_kp_in_bbox = np.intersect1d(prev_kp_in_bbox_idx, current_kp_in_bbox_idx) slide_obj.xy_matched_to_prev_in_bbox = slide_obj.xy_matched_to_prev[matched_kp_in_bbox] slide_obj.xy_in_prev_in_bbox = slide_obj.xy_in_prev[matched_kp_in_bbox] if self.denoise_rigid: # Processed image may have been denoised for rigid registration. Replace with unblurred image for img_obj in rigid_registrar.img_obj_list: matching_slide = self.slide_dict[img_obj.name] reg_img = matching_slide.warp_img(matching_slide.processed_img, non_rigid=False, crop=False) img_obj.registered_img = reg_img img_obj.image = matching_slide.processed_img rigid_img_list = [img_obj.registered_img for img_obj in rigid_registrar.img_obj_list] self.rigid_overlap_img = self.draw_overlap_img(rigid_img_list) self.rigid_overlap_img = warp_tools.crop_img(self.rigid_overlap_img, overlap_mask_bbox_xywh) rigid_overlap_img_fout = os.path.join(self.overlap_dir, self.name + "_rigid_overlap.png") warp_tools.save_img(rigid_overlap_img_fout, self.rigid_overlap_img, thumbnail_size=self.thumbnail_size) # Overwrite black and white processed images # for slide_name, slide_obj in self.slide_dict.items(): slide_reg_obj = rigid_registrar.img_obj_dict[slide_name] if not slide_obj.is_rgb: img_to_warp = slide_reg_obj.image else: img_to_warp = slide_obj.image img_to_warp = warp_tools.resize_img(img_to_warp, slide_obj.processed_img_shape_rc) warped_img = slide_obj.warp_img(img_to_warp, non_rigid=False, crop=self.crop) warp_tools.save_img(slide_obj.rigid_reg_img_f, warped_img.astype(np.uint8), thumbnail_size=self.thumbnail_size) # Replace processed image with a thumbnail # warp_tools.save_img(slide_obj.processed_img_f, slide_reg_obj.image, thumbnail_size=self.thumbnail_size) return rigid_registrar def micro_rigid_register(self): micro_rigid_registar = self.micro_rigid_registrar_cls(val_obj=self, **self.micro_rigid_registrar_params) micro_rigid_registar.register() # Draw in same order as regular rigid registration draw_list = [self.slide_dict[img_obj.name] for img_obj in self.rigid_registrar.img_obj_list] rigid_img_list = [slide_obj.warp_img(slide_obj.processed_img, non_rigid=False) for slide_obj in draw_list] self.micro_rigid_overlap_img = self.draw_overlap_img(rigid_img_list) micro_rigid_overlap_img_fout = os.path.join(self.overlap_dir, self.name + "_micro_rigid_overlap.png") warp_tools.save_img(micro_rigid_overlap_img_fout, self.micro_rigid_overlap_img, thumbnail_size=self.thumbnail_size) # Overwrite rigid registration results and update rigid registrar for slide_name, slide_obj in self.slide_dict.items(): if not slide_obj.is_rgb: img_to_warp = slide_obj.processed_img else: img_to_warp = slide_obj.image img_to_warp = warp_tools.resize_img(img_to_warp, slide_obj.processed_img_shape_rc) warped_img = slide_obj.warp_img(img_to_warp, non_rigid=False, crop=self.crop) warp_tools.save_img(slide_obj.rigid_reg_img_f, warped_img.astype(np.uint8), thumbnail_size=self.thumbnail_size) if slide_obj.fixed_slide is None: continue fixed_slide = slide_obj.fixed_slide fixed_rigid_obj = self.rigid_registrar.img_obj_dict[fixed_slide.name] rigid_img_obj = self.rigid_registrar.img_obj_dict[slide_obj.name] rigid_img_obj.M = slide_obj.M rigid_img_obj.M_inv = np.linalg.inv(slide_obj.M) rigid_img_obj.registered_img = slide_obj.warp_img(img_to_warp, non_rigid=False, crop=False) rigid_img_obj.match_dict[fixed_rigid_obj].matched_kp1_xy = slide_obj.xy_matched_to_prev rigid_img_obj.match_dict[fixed_rigid_obj].matched_kp2_xy = slide_obj.xy_in_prev rigid_img_obj.match_dict[fixed_rigid_obj].n_matches = slide_obj.xy_in_prev.shape[0] fixed_rigid_obj.match_dict[rigid_img_obj].matched_kp1_xy = slide_obj.xy_in_prev fixed_rigid_obj.match_dict[rigid_img_obj].matched_kp2_xy = slide_obj.xy_matched_to_prev fixed_rigid_obj.match_dict[rigid_img_obj].n_matches = slide_obj.xy_in_prev.shape[0] def draw_matches(self, dst_dir): """Draw and save images of matching features Parameters ---------- dst_dir : str Where to save the images of the matched features """ dst_dir = str(dst_dir) pathlib.Path(dst_dir).mkdir(exist_ok=True, parents=True) slide_idx, slide_names = list(zip(*[[slide_obj.stack_idx, slide_obj.name] for slide_obj in self.slide_dict.values()])) slide_order = np.argsort(slide_idx) # sorts ascending slide_list = [self.slide_dict[slide_names[i]] for i in slide_order] for moving_idx, fixed_idx in self.iter_order: moving_slide = slide_list[moving_idx] fixed_slide = slide_list[fixed_idx] # RGB draw images if moving_slide.image.ndim == 3 and moving_slide.is_rgb: moving_draw_img = warp_tools.resize_img(moving_slide.image, moving_slide.processed_img.shape[0:2]) else: moving_draw_img = moving_slide.processed_img if fixed_slide.image.ndim == 3 and fixed_slide.is_rgb: fixed_draw_img = warp_tools.resize_img(fixed_slide.image, fixed_slide.processed_img.shape[0:2]) else: fixed_draw_img = fixed_slide.processed_img all_matches_img = viz.draw_matches(src_img=moving_draw_img, kp1_xy=moving_slide.xy_matched_to_prev, dst_img=fixed_draw_img, kp2_xy=moving_slide.xy_in_prev, rad=3, alignment='horizontal') matches_f_out = os.path.join(dst_dir, f"{self.name}_{moving_slide.name}_to_{fixed_slide.name}_matches.png") warp_tools.save_img(matches_f_out, all_matches_img) def create_non_rigid_reg_mask(self): """ Get mask for non-rigid registration """ if self.create_masks: non_rigid_mask = self._create_mask_from_processed() else: non_rigid_mask = self._create_non_rigid_reg_mask_from_bbox() for slide_obj in self.slide_dict.values(): slide_obj.non_rigid_reg_mask = non_rigid_mask # Save thumbnail of mask ref_slide = self.get_ref_slide() if ref_slide.img_type == slide_tools.IHC_NAME: ref_img = warp_tools.resize_img(ref_slide.image, ref_slide.processed_img_shape_rc) warped_ref_img = ref_slide.warp_img(ref_img, non_rigid=False, crop=CROP_REF) else: warped_ref_img = ref_slide.warp_img(ref_slide.processed_img, non_rigid=False, crop=CROP_REF) pathlib.Path(self.mask_dir).mkdir(exist_ok=True, parents=True) thumbnail_img = self.create_thumbnail(warped_ref_img) draw_mask = warp_tools.resize_img(non_rigid_mask, ref_slide.reg_img_shape_rc, interp_method="nearest") _, overlap_mask_bbox_xywh = self.get_crop_mask(CROP_REF) draw_mask = warp_tools.crop_img(draw_mask, overlap_mask_bbox_xywh.astype(int)) thumbnail_mask = self.create_thumbnail(draw_mask) thumbnail_mask_outline = viz.draw_outline(thumbnail_img, thumbnail_mask) outline_f_out = os.path.join(self.mask_dir, f'{self.name}_non_rigid_mask.png') warp_tools.save_img(outline_f_out, thumbnail_mask_outline) def _create_non_rigid_reg_mask_from_bbox(self, slide_list=None): """Mask will be bounding box of image overlaps """ ref_slide = self.get_ref_slide() combo_mask = np.zeros(ref_slide.reg_img_shape_rc, dtype=int) if slide_list is None: slide_list = list(self.slide_dict.values()) for slide_obj in slide_list: img_bbox = np.full(slide_obj.processed_img_shape_rc, 255, dtype=np.uint8) rigid_mask = slide_obj.warp_img(img_bbox, non_rigid=False, crop=False, interp_method="nearest") combo_mask[rigid_mask > 0] += 1 n = len(slide_list) overlap_mask = (combo_mask == n).astype(np.uint8) overlap_bbox = warp_tools.xy2bbox(warp_tools.mask2xy(overlap_mask)) c0, r0 = overlap_bbox[:2] c1, r1 = overlap_bbox[:2] + overlap_bbox[2:] non_rigid_mask = np.zeros_like(overlap_mask) non_rigid_mask[r0:r1, c0:c1] = 255 return non_rigid_mask def _create_mask_from_processed(self, slide_list=None): combo_mask = np.zeros(self.aligned_img_shape_rc, dtype=int) if slide_list is None: slide_list = list(self.slide_dict.values()) for i, slide_obj in enumerate(self.slide_dict.values()): rigid_mask = slide_obj.warp_img(slide_obj.rigid_reg_mask, non_rigid=False, crop=False, interp_method="nearest") combo_mask[rigid_mask > 0] += 1 temp_non_rigid_mask = 255*filters.apply_hysteresis_threshold(combo_mask, 0.5, self.size-0.5).astype(np.uint8) overlap_mask = 255*ndimage.binary_fill_holes(temp_non_rigid_mask).astype(np.uint8) to_combine_list = [None] * len(slide_list) for i, slide_obj in enumerate(slide_list): for_summary = exposure.rescale_intensity(slide_obj.warp_img(slide_obj.processed_img, non_rigid=False, crop=False), out_range=(0,1)) to_combine_list[i] = for_summary combo_img = np.dstack(to_combine_list) summary_img = np.median(combo_img, axis=2) summary_img[overlap_mask == 0] = 0 low_t, high_t = filters.threshold_multiotsu(summary_img[overlap_mask > 0]) fg = 255*filters.apply_hysteresis_threshold(summary_img, low_t, high_t).astype(np.uint8) fg_bbox_mask = np.zeros_like(overlap_mask) fg_bbox = warp_tools.xy2bbox(warp_tools.mask2xy(fg)) c0, r0 = fg_bbox[0:2] c1, r1 = fg_bbox[0:2] + fg_bbox[2:] fg_bbox_mask[r0:r1, c0:c1] = 255 return fg_bbox_mask def _create_non_rigid_reg_mask_from_rigid_masks(self, slide_list=None): """ Get mask that will cover all tissue. Use hysteresis thresholding to ignore masked regions found in only 1 image. """ if slide_list is None: slide_list = list(self.slide_dict.values()) combo_mask = np.zeros(self.aligned_img_shape_rc, dtype=int) for i, slide_obj in enumerate(slide_list): rigid_mask = slide_obj.warp_img(slide_obj.rigid_reg_mask, non_rigid=False, crop=False, interp_method="nearest") combo_mask[rigid_mask > 0] += 1 temp_mask = 255*filters.apply_hysteresis_threshold(combo_mask, 0.5, self.size-0.5).astype(np.uint8) # Draw convex hull around each region final_mask = 255*ndimage.binary_fill_holes(temp_mask).astype(np.uint8) final_mask = preprocessing.mask2contours(final_mask) return final_mask def pad_displacement(self, dxdy, out_shape_rc, bbox_xywh): is_array = not isinstance(dxdy, pyvips.Image) if is_array: vips_dxdy = warp_tools.numpy2vips(np.dstack(dxdy)) else: vips_dxdy = dxdy full_dxdy = pyvips.Image.black(out_shape_rc[1], out_shape_rc[0], bands=2).cast("float") full_dxdy = full_dxdy.insert(vips_dxdy, *bbox_xywh[0:2]) if is_array: full_dxdy = warp_tools.vips2numpy(full_dxdy) full_dxdy = np.array([full_dxdy[..., 0], full_dxdy[..., 1]]) return full_dxdy def get_nr_tiling_params(self, non_rigid_registrar_cls, processor_dict, img_specific_args, tile_wh): """Get extra parameters need for tiled non-rigid registration processor_dict : dict Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to the processor. If `None`, then a default processor will be used for each image based on the inferred modality. """ if img_specific_args is None: img_specific_args = {} for slide_obj in self.slide_dict.values(): processing_cls, processing_kwargs = processor_dict[slide_obj.name] # Add registration parameters tiled_non_rigid_reg_params = {} tiled_non_rigid_reg_params[non_rigid_registrars.NR_CLS_KEY] = non_rigid_registrar_cls if self.norm_method is not None: tiled_non_rigid_reg_params[non_rigid_registrars.NR_STATS_KEY] = self.target_processing_stats tiled_non_rigid_reg_params[non_rigid_registrars.NR_TILE_WH_KEY] = tile_wh tiled_non_rigid_reg_params[non_rigid_registrars.NR_PROCESSING_CLASS_KEY] = processing_cls tiled_non_rigid_reg_params[non_rigid_registrars.NR_PROCESSING_KW_KEY] = processing_kwargs tiled_non_rigid_reg_params[non_rigid_registrars.NR_PROCESSING_INIT_KW_KEY] = {"src_f": slide_obj.src_f, "series": slide_obj.series, "reader": deepcopy(slide_obj.reader) } img_specific_args[slide_obj.name] = tiled_non_rigid_reg_params non_rigid_registrar_cls = non_rigid_registrars.NonRigidTileRegistrar return non_rigid_registrar_cls, img_specific_args def prep_images_for_large_non_rigid_registration(self, max_img_dim, processor_dict, updating_non_rigid=False, mask=None): """Scale and process images for non-rigid registration using larger images Parameters ---------- max_img_dim : int, optional Maximum size of image to be used for non-rigid registration. If None, the whole image will be used for non-rigid registration processor_dict : dict Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to the processor. If `None`, then a default processor will be used for each image based on the inferred modality. updating_non_rigid : bool, optional If `True`, the slide's current non-rigid registration will be applied The new displacements found using these larger images can therefore be used to update existing dxdy. If `False`, only the rigid transform will be applied, so this will be the first non-rigid transformation. mask : ndarray, optional Binary image indicating where to perform the non-rigid registration. Should be based off an already registered image. Returns ------- img_dict : dictionary Dictionary that can be passed to a non-rigid registrar max_img_dim : int Maximum size of image to do non-rigid registration on. May be different if the requested size was too big scaled_non_rigid_mask : ndarray Scaled mask to use for non-rigid registration full_out_shape : ndarray of int Shape (row, col) of the warped images, without cropping mask_bbox_xywh : list Bounding box of `mask`. If `mask` is None, then so will `mask_bbox_xywh` """ warp_full_img = max_img_dim is None if not warp_full_img: all_max_dims = [np.any(np.max(slide_obj.slide_dimensions_wh, axis=1) >= max_img_dim) for slide_obj in self.slide_dict.values()] if not np.all(all_max_dims): img_maxes = [np.max(slide_obj.slide_dimensions_wh, axis=1)[0] for slide_obj in self.slide_dict.values()] smallest_img_max = np.min(img_maxes) msg = (f"Requested size of images for non-rigid registration was {max_img_dim}. " f"However, not all images are this large. Setting `max_non_rigid_registration_dim_px` to " f"{smallest_img_max}, which is the largest dimension of the smallest image") valtils.print_warning(msg) max_img_dim = smallest_img_max ref_slide = self.get_ref_slide() max_s = np.min(ref_slide.slide_dimensions_wh[0]/np.array(ref_slide.processed_img_shape_rc[::-1])) if mask is None: if warp_full_img: s = max_s else: s = np.min(max_img_dim/np.array(ref_slide.processed_img_shape_rc)) else: # Determine how big image would have to be to get mask with maxmimum dimension = max_img_dim if isinstance(mask, pyvips.Image): mask_shape_rc = np.array((mask.height, mask.width)) else: mask_shape_rc = np.array(mask.shape[0:2]) to_reg_mask_sxy = (mask_shape_rc/np.array(ref_slide.reg_img_shape_rc))[::-1] if not np.all(to_reg_mask_sxy == 1): # Resize just in case it's huge. Only need bounding box reg_size_mask = warp_tools.resize_img(mask, ref_slide.reg_img_shape_rc, interp_method="nearest") else: reg_size_mask = mask reg_size_mask_xy = warp_tools.mask2xy(reg_size_mask) to_reg_mask_bbox_xywh = list(warp_tools.xy2bbox(reg_size_mask_xy)) to_reg_mask_wh = np.round(to_reg_mask_bbox_xywh[2:]).astype(int) if warp_full_img: s = max_s else: s = np.min(max_img_dim/np.array(to_reg_mask_wh)) if s < max_s: full_out_shape = self.get_aligned_slide_shape(s) else: full_out_shape = self.get_aligned_slide_shape(0) if mask is None: out_shape = full_out_shape mask_bbox_xywh = None else: # If masking, the area will be smaller. Get bounding box mask_sxy = (full_out_shape/mask_shape_rc)[::-1] mask_bbox_xywh = list(warp_tools.xy2bbox(mask_sxy*reg_size_mask_xy)) mask_bbox_xywh[2:] = np.round(mask_bbox_xywh[2:]).astype(int) out_shape = mask_bbox_xywh[2:][::-1] if not isinstance(mask, pyvips.Image): vips_micro_reg_mask = warp_tools.numpy2vips(mask) else: vips_micro_reg_mask = mask vips_micro_reg_mask = warp_tools.resize_img(vips_micro_reg_mask, full_out_shape, interp_method="nearest") vips_micro_reg_mask = warp_tools.crop_img(img=vips_micro_reg_mask, xywh=mask_bbox_xywh) if ref_slide.reader.metadata.bf_datatype is not None: np_dtype = slide_tools.BF_FORMAT_NUMPY_DTYPE[ref_slide.reader.metadata.bf_datatype] else: # Assuming images not read by bio-formats are RGB read using from openslide or png, jpeg, etc... np_dtype = "uint8" displacement_gb = self.size*warp_tools.calc_memory_size_gb(full_out_shape, 2, "float32") processed_img_gb = self.size*warp_tools.calc_memory_size_gb(out_shape, 1, "uint8") img_gb = self.size*warp_tools.calc_memory_size_gb(out_shape, ref_slide.reader.metadata.n_channels, np_dtype) # Size of full displacement fields, all larger processed images, and an image that will be processed estimated_gb = img_gb + displacement_gb + processed_img_gb use_tiler = False if estimated_gb > TILER_THRESH_GB: # Avoid having huge displacement fields saved in registrar. use_tiler = True scaled_warped_img_list = [None] * self.size scaled_mask_list = [None] * self.size img_names_list = [None] * self.size img_f_list = [None] * self.size # print("\n======== Preparing images for non-rigid registration\n") for slide_obj in tqdm.tqdm(self.slide_dict.values(), desc=PREP_NON_RIGID_MSG, unit="image"): # Get image to warp. Likely a larger image scaled down to specified shape # src_img_shape_rc, src_M = warp_tools.get_src_img_shape_and_M(transformation_src_shape_rc=slide_obj.processed_img_shape_rc, transformation_dst_shape_rc=slide_obj.reg_img_shape_rc, dst_shape_rc=full_out_shape, M=slide_obj.M) if max_img_dim is not None: closest_img_levels = np.where(np.max(slide_obj.slide_dimensions_wh, axis=1) < np.max(src_img_shape_rc))[0] if len(closest_img_levels) > 0: closest_img_level = closest_img_levels[0] - 1 else: closest_img_level = len(slide_obj.slide_dimensions_wh) - 1 else: closest_img_level = 0 vips_level_img = slide_obj.slide2vips(closest_img_level) img_to_warp = warp_tools.resize_img(vips_level_img, src_img_shape_rc) if updating_non_rigid: dxdy = slide_obj.bk_dxdy else: dxdy = None # Get mask covering tissue temp_slide_mask = slide_obj.warp_img(slide_obj.rigid_reg_mask, non_rigid=dxdy is not None, crop=False, interp_method="nearest") temp_slide_mask = warp_tools.numpy2vips(temp_slide_mask) slide_mask = warp_tools.resize_img(temp_slide_mask, full_out_shape, interp_method="nearest") if mask_bbox_xywh is not None: slide_mask = warp_tools.crop_img(slide_mask, mask_bbox_xywh) # Get mask that covers image temp_processing_mask = pyvips.Image.black(img_to_warp.width, img_to_warp.height).invert() processing_mask = warp_tools.warp_img(img=temp_processing_mask, M=slide_obj.M, bk_dxdy=dxdy, transformation_src_shape_rc=slide_obj.processed_img_shape_rc, transformation_dst_shape_rc=slide_obj.reg_img_shape_rc, out_shape_rc=full_out_shape, bbox_xywh=mask_bbox_xywh, interp_method="nearest") if not use_tiler: # Process image using same method for rigid registration # unprocessed_warped_img = warp_tools.warp_img(img=img_to_warp, M=slide_obj.M, bk_dxdy=dxdy, transformation_src_shape_rc=slide_obj.processed_img_shape_rc, transformation_dst_shape_rc=slide_obj.reg_img_shape_rc, out_shape_rc=full_out_shape, bbox_xywh=mask_bbox_xywh, bg_color=slide_obj.bg_color) unprocessed_warped_img = warp_tools.vips2numpy(unprocessed_warped_img) processing_cls, processing_kwargs = processor_dict[slide_obj.name] processor = processing_cls(image=unprocessed_warped_img, src_f=slide_obj.src_f, level=closest_img_level, series=slide_obj.series, reader=slide_obj.reader) try: processed_img = processor.process_image(**processing_kwargs) except TypeError: # processor.process_image doesn't take kwargs processed_img = processor.process_image() processed_img = exposure.rescale_intensity(processed_img, out_range=(0, 255)).astype(np.uint8) np_mask = warp_tools.vips2numpy(slide_mask) processed_img[np_mask == 0] = 0 # Normalize images using stats collected for rigid registration # if self.norm_method is not None: processed_img = preprocessing.norm_img_stats(img=processed_img, target_stats=self.target_processing_stats, mask=np_mask) warped_img = exposure.rescale_intensity(processed_img, out_range=(0, 255)).astype(np.uint8) else: if not warp_full_img: warped_img = warp_tools.warp_img(img=img_to_warp, M=slide_obj.M, bk_dxdy=dxdy, transformation_src_shape_rc=slide_obj.processed_img_shape_rc, transformation_dst_shape_rc=slide_obj.reg_img_shape_rc, out_shape_rc=full_out_shape, bbox_xywh=mask_bbox_xywh) else: warped_img = slide_obj.warp_slide(0, non_rigid=updating_non_rigid, crop=mask_bbox_xywh) # Get mask # if mask is not None: slide_mask = (vips_micro_reg_mask==0).ifthenelse(0, slide_mask) # Update lists img_f_list[slide_obj.stack_idx] = slide_obj.src_f img_names_list[slide_obj.stack_idx] = slide_obj.name scaled_warped_img_list[slide_obj.stack_idx] = warped_img scaled_mask_list[slide_obj.stack_idx] = processing_mask img_dict = {serial_non_rigid.IMG_LIST_KEY: scaled_warped_img_list, serial_non_rigid.IMG_F_LIST_KEY: img_f_list, serial_non_rigid.MASK_LIST_KEY: scaled_mask_list, serial_non_rigid.IMG_NAME_KEY: img_names_list } if ref_slide.non_rigid_reg_mask is not None: vips_nr_mask = warp_tools.numpy2vips(ref_slide.non_rigid_reg_mask) scaled_non_rigid_mask = warp_tools.resize_img(vips_nr_mask, full_out_shape, interp_method="nearest") if mask is not None: scaled_non_rigid_mask = scaled_non_rigid_mask.extract_area(*mask_bbox_xywh) scaled_non_rigid_mask = (vips_micro_reg_mask == 0).ifthenelse(0, scaled_non_rigid_mask) if not use_tiler: scaled_non_rigid_mask = warp_tools.vips2numpy(scaled_non_rigid_mask) else: scaled_non_rigid_mask = None if mask is not None: final_max_img_dim = np.max(mask_bbox_xywh[2:]) else: final_max_img_dim = max_img_dim return img_dict, final_max_img_dim, scaled_non_rigid_mask, full_out_shape, mask_bbox_xywh, use_tiler def non_rigid_register(self, rigid_registrar, processor_dict): """Non-rigidly register slides Non-rigidly register slides after performing rigid registration. Also saves thumbnails of non-rigidly registered images and deformation fields. Parameters ---------- rigid_registrar : SerialRigidRegistrar SerialRigidRegistrar object that performed the rigid registration. processor_dict : dict Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to the processor. If `None`, then a default processor will be used for each image based on the inferred modality. Returns ------- non_rigid_registrar : SerialNonRigidRegistrar SerialNonRigidRegistrar object that performed serial non-rigid registration. """ ref_slide = self.get_ref_slide() self.create_non_rigid_reg_mask() non_rigid_reg_mask = ref_slide.non_rigid_reg_mask cropped_mask_shape_rc = warp_tools.xy2bbox(warp_tools.mask2xy(non_rigid_reg_mask))[2:][::-1] nr_on_scaled_img = self.max_processed_image_dim_px != self.max_non_rigid_registration_dim_px or \ (non_rigid_reg_mask is not None and np.any(cropped_mask_shape_rc != ref_slide.reg_img_shape_rc)) using_tiler = False img_specific_args = {} if nr_on_scaled_img: # Use higher resolution and/or roi for non-rigid nr_reg_src, max_img_dim, non_rigid_reg_mask, full_out_shape_rc, mask_bbox_xywh, using_tiler = \ self.prep_images_for_large_non_rigid_registration(max_img_dim=self.max_non_rigid_registration_dim_px, processor_dict=processor_dict, mask=non_rigid_reg_mask) self._non_rigid_bbox = mask_bbox_xywh self.max_non_rigid_registration_dim_px = max_img_dim if using_tiler: non_rigid_registrar_cls, img_specific_args = self.get_nr_tiling_params(self.non_rigid_reg_kwargs[NON_RIGID_REG_CLASS_KEY], processor_dict=processor_dict, img_specific_args=None, tile_wh=DEFAULT_NR_TILE_WH) # Update args to use tiled non-rigid registrar self.non_rigid_reg_kwargs[NON_RIGID_REG_CLASS_KEY] = non_rigid_registrar_cls else: nr_reg_src = rigid_registrar full_out_shape_rc = ref_slide.reg_img_shape_rc self._full_displacement_shape_rc = full_out_shape_rc non_rigid_registrar = serial_non_rigid.register_images(src=nr_reg_src, align_to_reference=self.align_to_reference, img_params = img_specific_args, **self.non_rigid_reg_kwargs) self.end_non_rigid_time = time() for d in [self.non_rigid_dst_dir, self.deformation_field_dir]: pathlib.Path(d).mkdir(exist_ok=True, parents=True) self.non_rigid_registrar = non_rigid_registrar # Clean up displacements and expand if mask was used for nr_name, nr_obj in non_rigid_registrar.non_rigid_obj_dict.items(): if nr_on_scaled_img: # If a mask was used, the displacement fields will be smaller # So need to insert them in the full image bk_dxdy = self.pad_displacement(nr_obj.bk_dxdy, full_out_shape_rc, mask_bbox_xywh) fwd_dxdy = self.pad_displacement(nr_obj.fwd_dxdy, full_out_shape_rc, mask_bbox_xywh) else: bk_dxdy = nr_obj.bk_dxdy fwd_dxdy = nr_obj.fwd_dxdy nr_obj.bk_dxdy = bk_dxdy nr_obj.fwd_dxdy = fwd_dxdy # Draw overlap image # overlap_mask, overlap_mask_bbox_xywh = self.get_crop_mask(self.crop) overlap_mask_bbox_xywh = overlap_mask_bbox_xywh.astype(int) if not nr_on_scaled_img: non_rigid_img_list = [nr_img_obj.registered_img for nr_img_obj in non_rigid_registrar.non_rigid_obj_list] else: non_rigid_img_list = [warp_tools.warp_img(img=o.image, M=o.M, bk_dxdy= non_rigid_registrar.non_rigid_obj_dict[o.name].bk_dxdy, out_shape_rc=o.registered_img.shape[0:2], transformation_src_shape_rc=o.image.shape[0:2], transformation_dst_shape_rc=o.registered_img.shape[0:2]) for o in rigid_registrar.img_obj_list] self.non_rigid_overlap_img = self.draw_overlap_img(non_rigid_img_list) self.non_rigid_overlap_img = warp_tools.crop_img(self.non_rigid_overlap_img, overlap_mask_bbox_xywh) overlap_img_fout = os.path.join(self.overlap_dir, self.name + "_non_rigid_overlap.png") warp_tools.save_img(overlap_img_fout, self.non_rigid_overlap_img, thumbnail_size=self.thumbnail_size) n_digits = len(str(self.size)) for slide_name, slide_obj in self.slide_dict.items(): img_save_id = str.zfill(str(slide_obj.stack_idx), n_digits) slide_nr_reg_obj = non_rigid_registrar.non_rigid_obj_dict[slide_name] if not using_tiler: slide_obj.bk_dxdy = slide_nr_reg_obj.bk_dxdy slide_obj.fwd_dxdy = slide_nr_reg_obj.fwd_dxdy else: # save displacements as images pathlib.Path(self.displacements_dir).mkdir(exist_ok=True, parents=True) slide_obj.stored_dxdy = True bk_dxdy_f, fwd_dxdy_f = slide_obj.get_displacement_f() slide_obj._bk_dxdy_f = bk_dxdy_f slide_obj._fwd_dxdy_f = fwd_dxdy_f # Save space by only writing the necessary areas. Most displacements may be 0 cropped_bk_dxdy = slide_nr_reg_obj.bk_dxdy.extract_area(*mask_bbox_xywh) cropped_fwd_dxdy = slide_nr_reg_obj.fwd_dxdy.extract_area(*mask_bbox_xywh) cropped_bk_dxdy.cast("float").tiffsave(slide_obj._bk_dxdy_f, compression="lzw", lossless=True, tile=True, bigtiff=True) cropped_fwd_dxdy.cast("float").tiffsave(slide_obj._fwd_dxdy_f, compression="lzw", lossless=True, tile=True, bigtiff=True) slide_obj.nr_rigid_reg_img_f = os.path.join(self.non_rigid_dst_dir, img_save_id + "_" + slide_obj.name + ".png") if not slide_obj.is_rgb: img_to_warp = rigid_registrar.img_obj_dict[slide_name].image else: img_to_warp = slide_obj.image img_to_warp = warp_tools.resize_img(img_to_warp, slide_obj.processed_img_shape_rc) warped_img = slide_obj.warp_img(img_to_warp, non_rigid=True, crop=self.crop) warp_tools.save_img(slide_obj.nr_rigid_reg_img_f, warped_img, thumbnail_size=self.thumbnail_size) # Draw displacements on image actually used in non-rigid. Might be higher resolution if not isinstance(slide_nr_reg_obj.bk_dxdy, pyvips.Image): draw_dxdy = np.dstack(slide_nr_reg_obj.bk_dxdy) else: #pyvips draw_dxdy = slide_nr_reg_obj.bk_dxdy if nr_on_scaled_img: draw_dxdy = warp_tools.crop_img(draw_dxdy, self._non_rigid_bbox) dxdy_shape = warp_tools.get_shape(draw_dxdy) thumbnail_scaling = np.min(self.thumbnail_size/np.array(dxdy_shape[0:2])) thumbnail_bk_dxdy = self.create_thumbnail(draw_dxdy) thumbnail_bk_dxdy *= float(thumbnail_scaling) if isinstance(thumbnail_bk_dxdy, pyvips.Image): thumbnail_bk_dxdy = warp_tools.vips2numpy(thumbnail_bk_dxdy) draw_img = warp_tools.resize_img(slide_nr_reg_obj.registered_img, thumbnail_bk_dxdy[..., 0].shape) if isinstance(draw_img, pyvips.Image): draw_img = warp_tools.vips2numpy(draw_img) draw_img = exposure.rescale_intensity(draw_img, out_range=(0, 255)) if draw_img.ndim == 2: draw_img = np.dstack([draw_img] * 3) thumbanil_deform_grid = viz.color_displacement_tri_grid(bk_dx=thumbnail_bk_dxdy[..., 0], bk_dy=thumbnail_bk_dxdy[..., 1], img=draw_img, n_grid_pts=25) deform_img_f = os.path.join(self.deformation_field_dir, img_save_id + "_" + slide_obj.name + ".png") warp_tools.save_img(deform_img_f, thumbanil_deform_grid, thumbnail_size=self.thumbnail_size) return non_rigid_registrar def measure_error(self): """Measure registration error Error is measured as the distance between matched features after registration. Returns ------- summary_df : Dataframe `summary_df` contains various information about the registration. The "from" column is the name of the image, while the "to" column name of the image it was aligned to. "from" is analagous to "moving" or "current", while "to" is analgous to "fixed" or "previous". Columns begining with "original" refer to error measurements of the unregistered images. Those beginning with "rigid" or "non_rigid" refer to measurements related to rigid or non-rigid registration, respectively. Columns beginning with "mean" are averages of error measurements. In the case of errors based on feature distances (i.e. those ending in "D"), the mean is weighted by the number of feature matches between "from" and "to". Columns endining in "D" indicate the median distance between matched features in "from" and "to". Columns ending in "rTRE" indicate the target registration error between "from" and "to". Columns ending in "mattesMI" contain measurements of the Mattes mutual information between "from" and "to". "processed_img_shape" indicates the shape (row, column) of the processed image actually used to conduct the registration "shape" is the shape of the slide at full resolution "aligned_shape" is the shape of the registered full resolution slide "physical_units" are the names of the pixels physcial unit, e.g. u'\u00B5m' "resolution" is the physical unit per pixel "name" is the name assigned to the Valis instance "rigid_time_minutes" is the total number of minutes it took to convert the images and then rigidly align them. "non_rigid_time_minutes" is the total number of minutes it took to convert the images, and then perform rigid -> non-rigid registration. """ path_list = [None] * (self.size) all_og_d = [None] * (self.size) all_og_tre = [None] * (self.size) all_rigid_d = [None] * (self.size) all_rigid_tre = [None] * (self.size) all_nr_d = [None] * (self.size) all_nr_tre = [None] * (self.size) all_n = [None] * (self.size) from_list = [None] * (self.size) to_list = [None] * (self.size) shape_list = [None] * (self.size) processed_img_shape_list = [None] * (self.size) unit_list = [None] * (self.size) resolution_list = [None] * (self.size) slide_obj_list = list(self.slide_dict.values()) outshape = slide_obj_list[0].aligned_slide_shape_rc ref_slide = self.get_ref_slide() ref_diagonal = np.sqrt(np.sum(np.power(ref_slide.processed_img_shape_rc, 2))) measure_idx = [] for slide_obj in tqdm.tqdm(self.slide_dict.values(), desc=MEASURE_MSG, unit="image"): i = slide_obj.stack_idx slide_name = slide_obj.name shape_list[i] = tuple(slide_obj.slide_shape_rc) processed_img_shape_list[i] = tuple(slide_obj.processed_img_shape_rc) unit_list[i] = slide_obj.units resolution_list[i] = slide_obj.resolution from_list[i] = slide_name path_list[i] = slide_obj.src_f if slide_obj.name == ref_slide.name or slide_obj.is_empty: continue measure_idx.append(i) prev_slide_obj = slide_obj.fixed_slide to_list[i] = prev_slide_obj.name img_T = warp_tools.get_padding_matrix(slide_obj.processed_img_shape_rc, slide_obj.reg_img_shape_rc) prev_T = warp_tools.get_padding_matrix(prev_slide_obj.processed_img_shape_rc, prev_slide_obj.reg_img_shape_rc) prev_kp_in_slide = prev_slide_obj.warp_xy(slide_obj.xy_in_prev, M=prev_T, pt_level= prev_slide_obj.processed_img_shape_rc, non_rigid=False) current_kp_in_slide = slide_obj.warp_xy(slide_obj.xy_matched_to_prev, M=img_T, pt_level= slide_obj.processed_img_shape_rc, non_rigid=False) og_d = warp_tools.calc_d(prev_kp_in_slide, current_kp_in_slide) og_rtre = og_d/ref_diagonal median_og_tre = np.median(og_rtre) og_d *= slide_obj.resolution median_d_og = np.median(og_d) all_og_d[i] = median_d_og all_og_tre[i] = median_og_tre prev_warped_rigid = prev_slide_obj.warp_xy(slide_obj.xy_in_prev, M=prev_slide_obj.M, pt_level= prev_slide_obj.processed_img_shape_rc, non_rigid=False) current_warped_rigid = slide_obj.warp_xy(slide_obj.xy_matched_to_prev, M=slide_obj.M, pt_level= slide_obj.processed_img_shape_rc, non_rigid=False) rigid_d = warp_tools.calc_d(prev_warped_rigid, current_warped_rigid) rtre = rigid_d/ref_diagonal median_rigid_tre = np.median(rtre) rigid_d *= slide_obj.resolution median_d_rigid = np.median(rigid_d) all_rigid_d[i] = median_d_rigid all_n[i] = len(rigid_d) all_rigid_tre[i] = median_rigid_tre if slide_obj.bk_dxdy is not None: prev_warped_nr = prev_slide_obj.warp_xy(slide_obj.xy_in_prev, M=prev_slide_obj.M, pt_level= prev_slide_obj.processed_img_shape_rc, non_rigid=True) current_warped_nr = slide_obj.warp_xy(slide_obj.xy_matched_to_prev, M=slide_obj.M, pt_level= slide_obj.processed_img_shape_rc, non_rigid=True) nr_d = warp_tools.calc_d(prev_warped_nr, current_warped_nr) nrtre = nr_d/ref_diagonal mean_nr_tre = np.median(nrtre) nr_d *= slide_obj.resolution median_d_nr = np.median(nr_d) all_nr_d[i] = median_d_nr all_nr_tre[i] = mean_nr_tre weights = np.array(all_n)[measure_idx] mean_og_d = np.average(np.array(all_og_d)[measure_idx], weights=weights) median_og_tre = np.average(np.array(all_og_tre)[measure_idx], weights=weights) mean_rigid_d = np.average(np.array(all_rigid_d)[measure_idx], weights=weights) median_rigid_tre = np.average(np.array(all_rigid_tre)[measure_idx], weights=weights) rigid_min = (self.end_rigid_time - self.start_time)/60 self.summary_df = pd.DataFrame({ "filename": path_list, "from":from_list, "to": to_list, "original_D": all_og_d, "original_rTRE": all_og_tre, "rigid_D": all_rigid_d, "rigid_rTRE": all_rigid_tre, "non_rigid_D": all_nr_d, "non_rigid_rTRE": all_nr_tre, "processed_img_shape": processed_img_shape_list, "shape": shape_list, "aligned_shape": [tuple(outshape)]*self.size, "mean_original_D": [mean_og_d]*self.size, "mean_rigid_D": [mean_rigid_d]*self.size, "physical_units":unit_list, "resolution":resolution_list, "name": [self.name]*self.size, "rigid_time_minutes" : [rigid_min]*self.size }) if any([d for d in all_nr_d if d is not None]): mean_nr_d = np.average(np.array(all_nr_d)[measure_idx], weights=weights) mean_nr_tre = np.average(np.array(all_nr_tre)[measure_idx], weights=weights) non_rigid_min = (self.end_non_rigid_time - self.start_time)/60 self.summary_df["mean_non_rigid_D"] = [mean_nr_d]*self.size self.summary_df["non_rigid_time_minutes"] = [non_rigid_min]*self.size return self.summary_df
[docs] def register(self, brightfield_processing_cls=DEFAULT_BRIGHTFIELD_CLASS, brightfield_processing_kwargs=DEFAULT_BRIGHTFIELD_PROCESSING_ARGS, if_processing_cls=DEFAULT_FLOURESCENCE_CLASS, if_processing_kwargs=DEFAULT_FLOURESCENCE_PROCESSING_ARGS, processor_dict=None, reader_cls=None, reader_dict=None): """Register a collection of images This function will convert the slides to images, pre-process and normalize them, and then conduct rigid registration. Non-rigid registration will then be performed if the `non_rigid_registrar_cls` argument used to initialize the Valis object was not None. In addition to the objects returned, the desination directory (i.e. `dst_dir`) will contain thumbnails so that one can visualize the results: converted image thumbnails will be in "images/"; processed images in "processed/"; rigidly aligned images in "rigid_registration/"; non-rigidly aligned images in "non_rigid_registration/"; non-rigid deformation field images (i.e. warped grids colored by the direction and magntidue) of the deformation) will be in ""deformation_fields/". The size of these thumbnails is determined by the `thumbnail_size` argument used to initialze this object. One can get a sense of how well the registration worked by looking in the "overlaps/", which shows how the images overlap before registration, after rigid registration, and after non-rigid registration. Each image is created by coloring an inverted greyscale version of the processed images, and then blending those images. The "data/" directory will contain a pickled copy of this registrar, which can be later be opened (unpickled) and used to warp slides and/or point data. "data/" will also contain the `summary_df` saved as a csv file. Parameters ---------- brightfield_processing_cls : preprocessing.ImageProcesser preprocessing.ImageProcesser used to pre-process brightfield images to make them look as similar as possible. brightfield_processing_kwargs : dict Dictionary of keyward arguments to be passed to `brightfield_processing_cls` if_processing_cls : preprocessing.ImageProcesser preprocessing.ImageProcesser used to pre-process immunofluorescent images to make them look as similar as possible. if_processing_kwargs : dict Dictionary of keyward arguments to be passed to `if_processing_cls` processor_dict : dict, optional Each key should be the filename of the image, and the value either a subclassed preprocessing.ImageProcessor, or a list, where the 1st element is the processor, and the second element a dictionary of keyword arguments passed to the processor. If `None`, then a default processor will be used for each image based on the inferred modality. reader_cls : SlideReader, optional Uninstantiated SlideReader class that will convert the slide to an image, and also collect metadata. If None (the default), the appropriate SlideReader will be found by `slide_io.get_slide_reader`. This option is provided in case the slides cannot be opened by a current SlideReader class. In this case, the user should create a subclass of SlideReader. See slide_io.SlideReader for details. reader_dict: dict, optional Dictionary specifying which readers to use for individual images. The keys should be the image's filename, and the values the instantiated slide_io.SlideReader to use to read that file. Valis will try to find an appropritate reader for any omitted files, or will use `reader_cls` as the default. Returns ------- rigid_registrar : SerialRigidRegistrar SerialRigidRegistrar object that performed the rigid registration. This object can be pickled if so desired non_rigid_registrar : SerialNonRigidRegistrar SerialNonRigidRegistrar object that performed serial non-rigid registration. This object can be pickled if so desired. summary_df : Dataframe `summary_df` contains various information about the registration. The "from" column is the name of the image, while the "to" column name of the image it was aligned to. "from" is analagous to "moving" or "current", while "to" is analgous to "fixed" or "previous". Columns begining with "original" refer to error measurements of the unregistered images. Those beginning with "rigid" or "non_rigid" refer to measurements related to rigid or non-rigid registration, respectively. Columns beginning with "mean" are averages of error measurements. In the case of errors based on feature distances (i.e. those ending in "D"), the mean is weighted by the number of feature matches between "from" and "to". Columns endining in "D" indicate the median distance between matched features in "from" and "to". Columns ending in "TRE" indicate the target registration error between "from" and "to". Columns ending in "mattesMI" contain measurements of the Mattes mutual information between "from" and "to". "processed_img_shape" indicates the shape (row, column) of the processed image actually used to conduct the registration "shape" is the shape of the slide at full resolution "aligned_shape" is the shape of the registered full resolution slide "physical_units" are the names of the pixels physcial unit, e.g. u'\u00B5m' "resolution" is the physical unit per pixel "name" is the name assigned to the Valis instance "rigid_time_minutes" is the total number of minutes it took to convert the images and then rigidly align them. "non_rigid_time_minutes" is the total number of minutes it took to convert the images, and then perform rigid -> non-rigid registration. """ self.start_time = time() try: print("\n==== Converting images\n") self.convert_imgs(series=self.series, reader_cls=reader_cls, reader_dict=reader_dict) print("\n==== Processing images\n") slide_processors = self.create_img_processor_dict(brightfield_processing_cls=brightfield_processing_cls, brightfield_processing_kwargs=brightfield_processing_kwargs, if_processing_cls=if_processing_cls, if_processing_kwargs=if_processing_kwargs, processor_dict=processor_dict) self.brightfield_procsseing_fxn_str = brightfield_processing_cls.__name__ self.if_processing_fxn_str = if_processing_cls.__name__ self.process_imgs(processor_dict=slide_processors) # print("\n==== Rigid registration\n") rigid_registrar = self.rigid_register() aligned_slide_shape_rc = self.get_aligned_slide_shape(0) self.aligned_slide_shape_rc = aligned_slide_shape_rc self.iter_order = rigid_registrar.iter_order for slide_obj in self.slide_dict.values(): slide_obj.aligned_slide_shape_rc = aligned_slide_shape_rc if self.micro_rigid_registrar_cls is not None: print("\n==== Micro-rigid registration\n") self.micro_rigid_register() if rigid_registrar is False: return None, None, None if self.non_rigid_registrar_cls is not None: print("\n==== Non-rigid registration\n") non_rigid_registrar = self.non_rigid_register(rigid_registrar, slide_processors) else: non_rigid_registrar = None self._add_empty_slides() print("\n==== Measuring error\n") error_df = self.measure_error() self.cleanup() pathlib.Path(self.data_dir).mkdir(exist_ok=True, parents=True) f_out = os.path.join(self.data_dir, self.name + "_registrar.pickle") self.reg_f = f_out pickle.dump(self, open(f_out, 'wb')) data_f_out = os.path.join(self.data_dir, self.name + "_summary.csv") error_df.to_csv(data_f_out, index=False) except Exception as e: traceback_msg = traceback.format_exc() valtils.print_warning(e, rgb=Fore.RED, traceback_msg=traceback_msg) kill_jvm() return None, None, None return rigid_registrar, non_rigid_registrar, error_df
def cleanup(self): """Remove objects that can't be pickled """ self.rigid_reg_kwargs["feature_detector"] = None self.rigid_reg_kwargs["affine_optimizer"] = None self.non_rigid_registrar_cls = None self.rigid_registrar = None self.micro_rigid_registrar_cls = None self.non_rigid_registrar = None @valtils.deprecated_args(max_non_rigid_registartion_dim_px="max_non_rigid_registration_dim_px") def register_micro(self, brightfield_processing_cls=DEFAULT_BRIGHTFIELD_CLASS, brightfield_processing_kwargs=DEFAULT_BRIGHTFIELD_PROCESSING_ARGS, if_processing_cls=DEFAULT_FLOURESCENCE_CLASS, if_processing_kwargs=DEFAULT_FLOURESCENCE_PROCESSING_ARGS, processor_dict=None, max_non_rigid_registration_dim_px=DEFAULT_MAX_NON_RIGID_REG_SIZE, non_rigid_registrar_cls=DEFAULT_NON_RIGID_CLASS, non_rigid_reg_params=DEFAULT_NON_RIGID_KWARGS, reference_img_f=None, align_to_reference=False, mask=None, tile_wh=DEFAULT_NR_TILE_WH): """Improve alingment of microfeatures by performing second non-rigid registration on larger images Caclculates additional non-rigid deformations using a larger image Parameters ---------- brightfield_processing_cls : preprocessing.ImageProcesser preprocessing.ImageProcesser used to pre-process brightfield images to make them look as similar as possible. brightfield_processing_kwargs : dict Dictionary of keyward arguments to be passed to `brightfield_processing_cls` if_processing_cls : preprocessing.ImageProcesser preprocessing.ImageProcesser used to pre-process immunofluorescent images to make them look as similar as possible. if_processing_kwargs : dict Dictionary of keyward arguments to be passed to `if_processing_cls` max_non_rigid_registration_dim_px : int, optional Maximum width or height of images used for non-rigid registration. If None, then the full sized image will be used. However, this may take quite some time to complete. reference_img_f : str, optional Filename of image that will be treated as the center of the stack. If None, the index of the middle image will be the reference, and images will be aligned towards it. If provided, images will be aligned to this reference. align_to_reference : bool, optional If `False`, images will be non-rigidly aligned serially towards the reference image. If `True`, images will be non-rigidly aligned directly to the reference image. If `reference_img_f` is None, then the reference image will be the one in the middle of the stack. non_rigid_registrar_cls : NonRigidRegistrar, optional Uninstantiated NonRigidRegistrar class that will be used to calculate the deformation fields between images. See the `non_rigid_registrars` module for a desciption of available methods. If a desired non-rigid registration method is not available, one can be implemented by subclassing.NonRigidRegistrar. non_rigid_reg_params: dictionary, optional Dictionary containing key, value pairs to be used to initialize `non_rigid_registrar_cls`. In the case where simple ITK is used by the, params should be a SimpleITK.ParameterMap. Note that numeric values nedd to be converted to strings. See the NonRigidRegistrar classes in `non_rigid_registrars` for the available non-rigid registration methods and arguments. """ # Remove empty slides for empty_slide_name, empty_slide in self._empty_slides.items(): del self.slide_dict[empty_slide_name] self.size -= 1 ref_slide = self.get_ref_slide() if mask is None: if ref_slide.non_rigid_reg_mask is not None: mask = ref_slide.non_rigid_reg_mask.copy() slide_processors = self.create_img_processor_dict(brightfield_processing_cls=brightfield_processing_cls, brightfield_processing_kwargs=brightfield_processing_kwargs, if_processing_cls=if_processing_cls, if_processing_kwargs=if_processing_kwargs, processor_dict=processor_dict) nr_reg_src, max_img_dim, non_rigid_reg_mask, full_out_shape_rc, mask_bbox_xywh, using_tiler = \ self.prep_images_for_large_non_rigid_registration(max_img_dim=max_non_rigid_registration_dim_px, processor_dict=slide_processors, updating_non_rigid=True, mask=mask) img_specific_args = None write_dxdy = isinstance(ref_slide.bk_dxdy, pyvips.Image) if using_tiler: # Have determined that these images will be too big msg = (f"Registration would more than {TILER_THRESH_GB} GB if all images opened in memory. " f"Will use NonRigidTileRegistrar to register cooresponding tiles to reduce memory consumption, " f"but this method is experimental") valtils.print_warning(msg) write_dxdy = True non_rigid_registrar_cls, img_specific_args = self.get_nr_tiling_params(non_rigid_registrar_cls, processor_dict=slide_processors, img_specific_args=img_specific_args, tile_wh=tile_wh) print("\n==== Performing microregistration\n") non_rigid_registrar = serial_non_rigid.register_images(src=nr_reg_src, non_rigid_reg_class=non_rigid_registrar_cls, non_rigid_reg_params=non_rigid_reg_params, reference_img_f=reference_img_f, mask=non_rigid_reg_mask, align_to_reference=align_to_reference, name=self.name, img_params=img_specific_args ) pathlib.Path(self.micro_reg_dir).mkdir(exist_ok=True, parents=True) out_shape = full_out_shape_rc n_digits = len(str(self.size)) micro_reg_imgs = [None] * self.size # Update displacements for slide_obj in self.slide_dict.values(): if slide_obj == ref_slide: continue nr_obj = non_rigid_registrar.non_rigid_obj_dict[slide_obj.name] # Will be combining original and new dxdy as pyvips Images if not isinstance(slide_obj.bk_dxdy[0], pyvips.Image): vips_current_bk_dxdy = warp_tools.numpy2vips(np.dstack(slide_obj.bk_dxdy)).cast("float") vips_current_fwd_dxdy = warp_tools.numpy2vips(np.dstack(slide_obj.fwd_dxdy)).cast("float") else: vips_current_bk_dxdy = slide_obj.bk_dxdy vips_current_fwd_dxdy = slide_obj.fwd_dxdy if not isinstance(nr_obj.bk_dxdy, pyvips.Image): vips_new_bk_dxdy = warp_tools.numpy2vips(np.dstack(nr_obj.bk_dxdy)).cast("float") vips_new_fwd_dxdy = warp_tools.numpy2vips(np.dstack(nr_obj.fwd_dxdy)).cast("float") else: vips_new_bk_dxdy = nr_obj.bk_dxdy vips_new_fwd_dxdy = nr_obj.fwd_dxdy if np.any(non_rigid_registrar.shape != full_out_shape_rc): # Micro-registration performed on sub-region. Need to put in full image vips_new_bk_dxdy = self.pad_displacement(vips_new_bk_dxdy, full_out_shape_rc, mask_bbox_xywh) vips_new_fwd_dxdy = self.pad_displacement(vips_new_fwd_dxdy, full_out_shape_rc, mask_bbox_xywh) # Scale original dxdy to match scaled shape of new dxdy slide_sxy = (np.array(out_shape)/np.array([vips_current_bk_dxdy.height, vips_current_bk_dxdy.width]))[::-1] if not np.all(slide_sxy == 1): scaled_bk_dx = float(slide_sxy[0])*vips_current_bk_dxdy[0] scaled_bk_dy = float(slide_sxy[1])*vips_current_bk_dxdy[1] vips_current_bk_dxdy = scaled_bk_dx.bandjoin(scaled_bk_dy) vips_current_bk_dxdy = warp_tools.resize_img(vips_current_bk_dxdy, out_shape) scaled_fwd_dx = float(slide_sxy[0])*vips_current_fwd_dxdy[0] scaled_fwd_dy = float(slide_sxy[1])*vips_current_fwd_dxdy[1] vips_current_fwd_dxdy = scaled_fwd_dx.bandjoin(scaled_fwd_dy) vips_current_fwd_dxdy = warp_tools.resize_img(vips_current_fwd_dxdy, out_shape) vips_updated_bk_dxdy = vips_current_bk_dxdy + vips_new_bk_dxdy vips_updated_fwd_dxdy = vips_current_fwd_dxdy + vips_new_fwd_dxdy if not write_dxdy: # Will save numpy dxdy as Slide attributes np_updated_bk_dxdy = warp_tools.vips2numpy(vips_updated_bk_dxdy) np_updated_fwd_dxdy = warp_tools.vips2numpy(vips_updated_fwd_dxdy) slide_obj.bk_dxdy = np.array([np_updated_bk_dxdy[..., 0], np_updated_bk_dxdy[..., 1]]) slide_obj.fwd_dxdy = np.array([np_updated_fwd_dxdy[..., 0], np_updated_fwd_dxdy[..., 1]]) else: pathlib.Path(self.displacements_dir).mkdir(exist_ok=True, parents=True) slide_obj.stored_dxdy = True bk_dxdy_f, fwd_dxdy_f = slide_obj.get_displacement_f() slide_obj._bk_dxdy_f = bk_dxdy_f slide_obj._fwd_dxdy_f = fwd_dxdy_f # Save space by only writing the necessary areas. Most displacements may be 0 cropped_bk_dxdy = vips_updated_bk_dxdy.extract_area(*mask_bbox_xywh) cropped_fwd_dxdy = vips_updated_fwd_dxdy.extract_area(*mask_bbox_xywh) if not os.path.exists(slide_obj._bk_dxdy_f): cropped_bk_dxdy.cast("float").tiffsave(slide_obj._bk_dxdy_f, compression="lzw", lossless=True, tile=True, bigtiff=True) else: # Don't seem to be able to overwrite directly because also accessing it? disp_dir, temp_bk_f = os.path.split(slide_obj._bk_dxdy_f) full_temp_dx_f = os.path.join(disp_dir, f".temp_{temp_bk_f}") cropped_bk_dxdy.cast("float").tiffsave(full_temp_dx_f, compression="lzw", lossless=True, tile=True, bigtiff=True) os.remove(slide_obj._bk_dxdy_f) os.rename(full_temp_dx_f, slide_obj._bk_dxdy_f) if not os.path.exists(slide_obj._fwd_dxdy_f): cropped_fwd_dxdy.cast("float").tiffsave(slide_obj._fwd_dxdy_f, compression="lzw", lossless=True, tile=True, bigtiff=True) else: disp_dir, temp_fwd_f = os.path.split(slide_obj._fwd_dxdy_f) full_temp_fwd_f = os.path.join(disp_dir, f".temp_{temp_fwd_f}") cropped_fwd_dxdy.cast("float").tiffsave(full_temp_fwd_f, compression="lzw", lossless=True, tile=True, bigtiff=True) os.remove(slide_obj._fwd_dxdy_f) os.rename(full_temp_fwd_f, slide_obj._fwd_dxdy_f) # Update dxdy padding attributes here, in the event that previous displacements were also saved as files # Updating these attributes earlier will cause errors self._non_rigid_bbox = mask_bbox_xywh self._full_displacement_shape_rc = full_out_shape_rc for slide_obj in self.slide_dict.values(): if not slide_obj.is_rgb: img_to_warp = slide_obj.processed_img else: img_to_warp = slide_obj.image img_to_warp = warp_tools.resize_img(img_to_warp, slide_obj.processed_img_shape_rc) micro_reg_img = slide_obj.warp_img(img_to_warp, non_rigid=True, crop=self.crop) img_save_id = str.zfill(str(slide_obj.stack_idx), n_digits) micro_fout = os.path.join(self.micro_reg_dir, f"{img_save_id}_{slide_obj.name}.png") micro_thumb = self.create_thumbnail(micro_reg_img) warp_tools.save_img(micro_fout, micro_thumb) processed_micro_reg_img = slide_obj.warp_img(slide_obj.processed_img) micro_reg_imgs[slide_obj.stack_idx] = processed_micro_reg_img # Add empty slides back and save results for empty_slide_name, empty_slide in self._empty_slides.items(): self.slide_dict[empty_slide_name] = empty_slide self.size += 1 pickle.dump(self, open(self.reg_f, 'wb')) micro_overlap = self.draw_overlap_img(micro_reg_imgs) self.micro_reg_overlap_img = micro_overlap overlap_img_fout = os.path.join(self.overlap_dir, self.name + "_micro_reg.png") warp_tools.save_img(overlap_img_fout, micro_overlap, thumbnail_size=self.thumbnail_size) print("\n==== Measuring error\n") error_df = self.measure_error() data_f_out = os.path.join(self.data_dir, self.name + "_summary.csv") error_df.to_csv(data_f_out, index=False) return non_rigid_registrar, error_df def get_aligned_slide_shape(self, level): """Get size of aligned images Parameters ---------- level : int, float If `level` is an integer, then it is assumed that `level` is referring to the pyramid level that will be warped. If `level` is a float, it is assumed `level` is how much to rescale the registered image's size. """ ref_slide = self.get_ref_slide() if np.issubdtype(type(level), np.integer): n_levels = len(ref_slide.slide_dimensions_wh) if level >= n_levels: msg = (f"requested to scale transformation for pyramid level {level}, ", f"but the image only has {n_levels} (starting from 0). ", f"Will use level {level-1}, which is the smallest level") valtils.print_warning(msg) level = level - 1 slide_shape_rc = ref_slide.slide_dimensions_wh[level][::-1] s_rc = (slide_shape_rc/np.array(ref_slide.processed_img_shape_rc)) else: s_rc = level aligned_out_shape_rc = np.ceil(np.array(ref_slide.reg_img_shape_rc)*s_rc).astype(int) return aligned_out_shape_rc @valtils.deprecated_args(perceputally_uniform_channel_colors="colormap") def warp_and_save_slides(self, dst_dir, level=0, non_rigid=True, crop=True, colormap=slide_io.CMAP_AUTO, interp_method="bicubic", tile_wh=None, compression="lzw", Q=100, pyramid=True): f"""Warp and save all slides Each slide will be saved as an ome.tiff. The extension of each file will be changed to ome.tiff if it is not already. Parameters ---------- dst_dir : str Path to were the warped slides will be saved. level : int, optional Pyramid level to be warped. Default is 0, which means the highest resolution image will be warped and saved. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True crop: bool, str How to crop the registered images. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. colormap : list List of RGB colors (0-255) to use for channel colors. If 'auto' (the default), the original channel colors ` will be used, if available. If `None`, no channel colors will be assigned. interp_method : str Interpolation method used when warping slide. Default is "bicubic" tile_wh : int, optional Tile width and height used to save image compression : str, optional Compression method used to save ome.tiff . Default is lzw, but can also be jpeg or jp2k. See pyips for more details. Q : int Q factor for lossy compression """ pathlib.Path(dst_dir).mkdir(exist_ok=True, parents=True) src_f_list = [self.original_img_list[slide_obj.stack_idx] for slide_obj in self.slide_dict.values()] cmap_is_str = False named_color_map = None if colormap is not None: if isinstance(colormap, str) and colormap == slide_io.CMAP_AUTO: cmap_is_str = True else: named_color_map = {self.get_slide(x).name:colormap[x] for x in colormap.keys()} for src_f in tqdm.tqdm(src_f_list, desc=SAVING_IMG_MSG, unit="image"): slide_obj = self.get_slide(src_f) slide_cmap = None is_rgb = slide_obj.reader.metadata.is_rgb if is_rgb: updated_channel_names = None elif colormap is not None: chnl_names = slide_obj.reader.metadata.channel_names updated_channel_names = slide_io.check_channel_names(chnl_names, is_rgb, nc=slide_obj.reader.metadata.n_channels) try: if not cmap_is_str and named_color_map is not None: slide_cmap = named_color_map[slide_obj.name] else: slide_cmap = colormap slide_cmap = slide_io.check_colormap(colormap=slide_cmap, channel_names=updated_channel_names) except Exception as e: traceback_msg = traceback.format_exc() msg = f"Could not create colormap for the following reason:{e}" valtils.print_warning(msg, traceback_msg=traceback_msg) dst_f = os.path.join(dst_dir, slide_obj.name + ".ome.tiff") slide_obj.warp_and_save_slide(dst_f=dst_f, level=level, non_rigid=non_rigid, crop=crop, src_f=slide_obj.src_f, interp_method=interp_method, colormap=slide_cmap, tile_wh=tile_wh, compression=compression, channel_names=updated_channel_names, Q=Q, pyramid=pyramid)
[docs] @valtils.deprecated_args(perceputally_uniform_channel_colors="colormap") def warp_and_merge_slides(self, dst_f=None, level=0, non_rigid=True, crop=True, channel_name_dict=None, src_f_list=None, colormap=slide_io.CMAP_AUTO, drop_duplicates=True, tile_wh=None, interp_method="bicubic", compression="lzw", Q=100, pyramid=True): """Warp and merge registered slides Parameters ---------- dst_f : str, optional Path to were the warped slide will be saved. If None, then the slides will be merged but not saved. level : int, optional Pyramid level to be warped. Default is 0, which means the highest resolution image will be warped and saved. non_rigid : bool, optional Whether or not to conduct non-rigid warping. If False, then only a rigid transformation will be applied. Default is True crop: bool, str How to crop the registered images. If `True`, then the same crop used when initializing the `Valis` object will be used. If `False`, the image will not be cropped. If "overlap", the warped slide will be cropped to include only areas where all images overlapped. "reference" crops to the area that overlaps with the reference image, defined by `reference_img_f` when initialzing the `Valis object`. channel_name_dict : dict of lists, optional. key = slide file name, value = list of channel names for that slide. If None, the the channel names found in each slide will be used. src_f_list : list of str, optionaal List of paths to slide to be warped. If None (the default), Valis.original_img_list will be used. Otherwise, the paths to which `src_f_list` points to should be an alternative copy of the slides, such as ones that have undergone processing (e.g. stain segmentation), had a mask applied, etc... colormap : list List of RGB colors (0-255) to use for channel colors drop_duplicates : bool, optional Whether or not to drop duplicate channels that might be found in multiple slides. For example, if DAPI is in multiple slides, then the only the DAPI channel in the first slide will be kept. tile_wh : int, optional Tile width and height used to save image interp_method : str Interpolation method used when warping slide. Default is "bicubic" compression : str Compression method used to save ome.tiff . Default is lzw, but can also be jpeg or jp2k. See pyips for more details. Q : int Q factor for lossy compression pyramid : bool Whether or not to save an image pyramid. Returns ------- merged_slide : pyvips.Image Image with all channels merged. If `drop_duplicates` is True, then this will only contain unique channels. all_channel_names : list of str Name of each channel in the image ome_xml : str OME-XML string containing the slide's metadata """ if channel_name_dict is not None: channel_name_dict_by_name = {valtils.get_name(k):channel_name_dict[k] for k in channel_name_dict} else: channel_name_dict_by_name = {slide_obj.name: [f"{c} ({slide_obj.name})" for c in slide_obj.reader.metadata.channel_names] for slide_obj in self.slide_dict.values()} if src_f_list is None: # Save in the sorted order. Will still be original order if imgs_ordered= True src_f_list = [self.original_img_list[slide_obj.stack_idx] for slide_obj in self.slide_dict.values()] all_channel_names = [] merged_slide = None expected_channel_order = list(chain.from_iterable([channel_name_dict_by_name[valtils.get_name(f)] for f in src_f_list])) if drop_duplicates: expected_channel_order = list(dict.fromkeys(expected_channel_order)) for f in src_f_list: slide_name = valtils.get_name(os.path.split(f)[1]) slide_obj = self.slide_dict[slide_name] warped_slide = slide_obj.warp_slide(level, non_rigid=non_rigid, crop=crop, interp_method=interp_method) keep_idx = list(range(warped_slide.bands)) slide_channel_names = channel_name_dict_by_name[slide_obj.name] if drop_duplicates: keep_idx = [idx for idx in range(len(slide_channel_names)) if slide_channel_names[idx] not in all_channel_names] if len(keep_idx) == 0: msg= f"Have already added all channels in {slide_channel_names}. Ignoring {slide_name}" valtils.print_warning(msg) continue if drop_duplicates and warped_slide.bands != len(keep_idx): keep_channels = [warped_slide[c] for c in keep_idx] slide_channel_names = [slide_channel_names[idx] for idx in keep_idx] if len(keep_channels) == 1: warped_slide = keep_channels[0] else: warped_slide = keep_channels[0].bandjoin(keep_channels[1:]) print(f"merging {', '.join(slide_channel_names)} from {slide_obj.name}") if merged_slide is None: merged_slide = warped_slide else: merged_slide = merged_slide.bandjoin(warped_slide) all_channel_names.extend(slide_channel_names) if merged_slide.bands == 1: merged_slide = merged_slide.copy(interpretation="b-w") else: merged_slide = merged_slide.copy(interpretation="multiband") assert all_channel_names == expected_channel_order if colormap is not None: cmap_dict = slide_io.check_colormap(colormap, all_channel_names) else: cmap_dict = None slide_obj = self.get_ref_slide() px_phys_size = slide_obj.reader.scale_physical_size(level) bf_dtype = slide_io.vips2bf_dtype(merged_slide.format) out_xyczt = slide_io.get_shape_xyzct((merged_slide.width, merged_slide.height), merged_slide.bands) ome_xml_obj = slide_io.create_ome_xml(out_xyczt, bf_dtype, is_rgb=False, pixel_physical_size_xyu=px_phys_size, channel_names=all_channel_names, colormap=cmap_dict) ome_xml = ome_xml_obj.to_xml() if dst_f is not None: dst_dir = os.path.split(dst_f)[0] pathlib.Path(dst_dir).mkdir(exist_ok=True, parents=True) ref_slide = self.get_ref_slide() tile_wh = slide_io.get_tile_wh(reader=ref_slide.reader, level=level, out_shape_wh=out_xyczt[0:2]) slide_io.save_ome_tiff(merged_slide, dst_f=dst_f, ome_xml=ome_xml,tile_wh=tile_wh, compression=compression, Q=Q, pyramid=pyramid) return merged_slide, all_channel_names, ome_xml