"""Optimize rigid alignment
Contains functions related to optimization, as well as the AffineOptimizer
class that performs the optimzation. This class can be subclassed to implement
custom optimization methods.
There are several subclasses, but AffineOptimizerMattesMI is the
the fastest and most accurate, and so is default affine optimizer in VALIS.
It's not recommended that the other subclasses be used, but they are kept
to provide examples on how to subclass AffineOptimizer.
"""
from scipy import ndimage, optimize
import numpy as np
from skimage import transform, util
import cv2
import os
import SimpleITK as sitk
from scipy import interpolate
import pathlib
from . warp_tools import get_affine_transformation_params, \
get_corners_of_image, warp_xy
# Cost functions #
EPS = np.finfo("float").eps
def mse(arr1, arr2, mask=None):
"""Compute the mean squared error between two arrays."""
if mask is None:
return np.mean((arr1 - arr2)**2)
else:
return np.mean((arr1[mask != 0] - arr2[mask != 0]) ** 2)
def displacement(moving_image, target_image, mask=None):
"""Minimize average displacement between moving_image and target_image
"""
opt_flow = cv2.optflow.createOptFlow_DeepFlow()
flow = opt_flow.calc(util.img_as_ubyte(target_image),
util.img_as_ubyte(moving_image), None)
if mask is not None:
dx = flow[..., 0][mask != 0]
dy = flow[..., 1][mask != 0]
else:
dx = flow[..., 0].reshape(-1)
dy = flow[..., 1].reshape(-1)
mean_displacement = np.mean(np.sqrt(dx**2 + dy**2))
return mean_displacement
def cost_mse(param, reference_image, target_image, mask=None):
transformation = make_transform(param)
transformed = transform.warp(target_image, transformation, order=3)
return mse(reference_image, transformed, mask)
def downsample2x(image):
"""Down sample image.
"""
offsets = [((s + 1) % 2) / 2 for s in image.shape]
slices = [slice(offset, end, 2)
for offset, end in zip(offsets, image.shape)]
coords = np.mgrid[slices]
return ndimage.map_coordinates(image, coords, order=1)
def gaussian_pyramid(image, levels=6):
"""Make a Gaussian image pyramid.
Parameters
----------
image : array of float
The input image.
max_layer : int, optional
The number of levels in the pyramid.
Returns
-------
pyramid : iterator of array of float
An iterator of Gaussian pyramid levels, starting with the top
(lowest resolution) level.
"""
pyramid = [image]
for level in range(levels - 1):
image = downsample2x(image)
pyramid.append(image)
return pyramid
def make_transform(param):
if len(param) == 3:
r, tc, tr = param
s = None
else:
r, tc, tr, s = param
return transform.SimilarityTransform(rotation=r,
translation=(tc, tr),
scale=s)
def bin_image(img, p):
x_min = np.min(img)
x_max_ = np.max(img)
x_range = x_max_ - x_min + EPS
binned_img = np.zeros_like(img)
_bins = p * (1 - EPS) # Keeps right bin closed
for i in range(img.shape[0]):
for j in range(img.shape[1]):
binned_img[i, j] = int(_bins * ((img[i, j] - x_min) / (x_range)))
return binned_img
def solve_abc(verts):
"""
Find coefficients A,B,C that will allow estimation of intesnity of point
inside triangle with vertices v0, v1, v2. Each vertex is in the format of
[x,y,z] were z=intensity of pixel at point x,y
Parameters
----------
verts : 3x3 array
Each row has coordinates x,y and z, where z in the image intensiy at
point xy (i.e. image[y, r])
Returns
-------
abc : [A,B,C]
Coefficients to estimate intensity in triangle, as well as the
intersection of isointensity lines
"""
a = np.array([[verts[0, 0], verts[0, 1], 1],
[verts[1, 0], verts[1, 1], 1],
[verts[2, 0], verts[2, 1], 1]])
b = verts[:, 2]
try:
abc = np.linalg.inv(a) @ b
except np.linalg.LinAlgError:
sln = np.linalg.lstsq(a, b)
abc = sln[0]
return abc
def area(x1, y1, x2, y2, x3, y3):
# From https://www.geeksforgeeks.org/check-whether-a-given-point-lies-inside-a-triangle-or-not/
a = np.abs((x1 * (y2 - y3) + x2 * (y3 - y1) + x3 * (y1 - y2)) / 2.0)
return a
def isInside(x1, y1, x2, y2, x3, y3, x, y):
# Calculate area of triangle ABC
A = area(x1, y1, x2, y2, x3, y3)
# A = np.abs((x1 * (y2 - y3) + x2 * (y3 - y1) + x3 * (y1 - y2)) / 2.0)
# Calculate area of triangle PBC
A1 = area(x, y, x2, y2, x3, y3)
# Calculate area of triangle PAC
A2 = area(x1, y1, x, y, x3, y3)
# Calculate area of triangle PAB
A3 = area(x1, y1, x2, y2, x, y)
# print(A, A1, A2, A3)
# print(A == (A1 + A2 + A3))
# Check if sum of A1, A2 and A3
# is same as A
if (A == (A1 + A2 + A3)):
return 1
else:
return 0
def get_intersection(alpha1, alpha2, abc1, abc2):
"""
Parameters
----------
alpha1 : float
Intensity of point in image 1
alpha2 : float
Intensity of point in image 2
abc1: [A,B,C]
Coefficients to interpolate value for triangle in image1
abc2: [A,B,C]
Coefficients to interpolate value for corresponding triangle in image2
"""
# Find interestion of isointensity lines ###
intensities = np.array([alpha1 - abc1[2], alpha2 - abc2[2]])
coef = np.array([[abc1[0], abc1[1]],
[abc2[0], abc2[1]]
])
try:
xy = np.linalg.inv(coef) @ intensities
except np.linalg.LinAlgError:
sln = np.linalg.lstsq(coef, intensities)
xy = sln[0]
return xy
def get_verts(img, x, y, pos=0):
"""
Get veritices of triangle and intenisty at each vertex
"""
if pos == 0:
# Lower left
verts = np.array([[x, y, img[y, x]], # BL
[x + 1, y, img[y, x + 1]], # BR
[x, y + 1, img[y + 1, x]] # TL
])
if pos == 1:
# Upper right
verts = np.array([[x, y+1, img[y+1, x]], # BL
[x + 1, y, img[y, x + 1]], # BR
[x+1, y + 1, img[y + 1, x + 1]] # TL
])
return verts
def hist2d(x, y, n_bins):
"""
Build 2D histogram by determining the bin each x and y value falls in
https://stats.stackexchange.com/questions/236205/programmatically-calculate-which-bin-a-value-will-fall-into-for-a-histogram
"""
x_min = np.min(x)
x_max_ = np.max(x)
x_range = x_max_ - x_min + EPS
y_min = np.min(y)
y_max = np.max(y)
y_range = y_max - y_min + EPS
_bins = n_bins * (1 - EPS) # Keeps right bin closed
x_margins = np.zeros(n_bins)
y_margins = np.zeros(n_bins)
results = np.zeros((n_bins, n_bins))
for i in range(len(x)):
x_bin = int(_bins*((x[i]-x_min)/(x_range)))
y_bin = int(_bins*((y[i] - y_min) / (y_range)))
x_margins[x_bin] += 1
y_margins[y_bin] += 1
results[x_bin, y_bin] += 1
return results, x_margins, y_margins
def update_joint_H(binned_moving, binned_fixed, H, M, sample_pts, pos=0,
precalcd_abc=None):
q = H.shape[0]
for i, sxy in enumerate(sample_pts):
# Get vertices and intensities in each image.
# Note that indices are as rc, but vertices need to be xy
img1_v = get_verts(binned_moving, sxy[0], sxy[1], pos)
abc1 = solve_abc(img1_v)
if precalcd_abc is None:
img2_v = get_verts(binned_fixed, sxy[0], sxy[1], pos)
abc2 = solve_abc(img2_v)
else:
# ABC for fixed image's trianges are precomputed
abc2 = precalcd_abc[i]
x_lims = np.array([np.min(img1_v[:, 0]), np.max(img1_v[:, 0])])
y_lims = np.array([np.min(img1_v[:, 1]), np.max(img1_v[:, 1])])
for alpha1 in range(0, q):
for alpha2 in range(0, q):
xy = get_intersection(alpha1, alpha2, abc1, abc2)
if xy[0] <= x_lims[0] or xy[0] >= x_lims[1] or \
xy[1] <= y_lims[0] or xy[1] >= y_lims[1]:
continue
# Determine if intersection inside triangle ###
vote = isInside(img1_v[0, 0], img1_v[0, 1],
img1_v[1, 0], img1_v[1, 1],
img1_v[2, 0], img1_v[2, 1],
xy[0], xy[1])
H[alpha1, alpha2] += vote
return H
def get_neighborhood(im, i, j, r):
"""
Get values in a neighborhood
"""
return im[i - r:i + r + 1, j - r:j + r + 1].flatten()
def build_P(A, B, r, mask):
hood_size = (2 * r + 1) ** 2
d = 2 * hood_size
N = (A.shape[0] - 2*r)*(A.shape[1] - 2*r)
P = np.zeros((d, N))
idx = 0
for i in range(r, A.shape[0]):
# Skip borders
if i < r or i > A.shape[0] - r - 1:
continue
for j in range(r, A.shape[1]):
pmask = get_neighborhood(mask, i, j, r)
if j < r or j > A.shape[1] - r - 1 or np.min(pmask) == 0:
continue
pa = get_neighborhood(A, i, j, r)
pb = get_neighborhood(B, i, j, r)
P[:hood_size, idx] = pa
P[hood_size:, idx] = pb
idx += 1
return P[:, :idx]
def entropy(x):
"""
Caclulate Shannon's entropy for array x
Parameters
----------
x : array
Array from which to calculate entropy
Returns
-------
h : float
Shannon's entropy
"""
# x += EPS ## Avoid -Inf if there is log(0)
px = x/np.sum(x)
px = px[px > 0]
h = -np.sum(px * np.log(px))
return h
def entropy_from_c(cov_mat, d):
e = np.log(((2*np.pi*np.e) ** (d/2)) *
(np.linalg.det(cov_mat) ** 0.5) + EPS)
return e
def region_mi(A, B, mask, r=4):
P = build_P(A, B, r, mask) # d x N matrix: N points with d dimensions
# Center points so each dimensions is around 0
C = np.cov(P, rowvar=True, bias=True)
hood_size = (2 * r + 1) ** 2
d = hood_size*2
HA = entropy_from_c(C[0:hood_size, 0:hood_size], d)
HB = entropy_from_c(C[hood_size:, hood_size:], d)
HC = entropy_from_c(C, d)
RMI = HA + HB - HC
if RMI < 0:
RMI = 0
return RMI
def normalized_mutual_information(A, B, mask, n_bins=256):
"""
Build 2D histogram by determining the bin each x and y value falls in
https://stats.stackexchange.com/questions/236205/programmatically-calculate-which-bin-a-value-will-fall-into-for-a-histogram
"""
x_min = np.min(A)
x_max_ = np.max(A)
x_range = x_max_ - x_min + EPS
y_min = np.min(B)
y_max = np.max(B)
y_range = y_max - y_min + EPS
_bins = n_bins * (1 - EPS) # Keeps right bin closed
x_margins = np.zeros(n_bins)
y_margins = np.zeros(n_bins)
results = np.zeros((n_bins, n_bins))
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if mask[i, j] == 0:
continue
x = A[i, j]
y = B[i, j]
x_bin = int(_bins * ((x - x_min) / x_range))
y_bin = int(_bins * ((y - y_min) / y_range))
x_margins[x_bin] += 1
y_margins[y_bin] += 1
results[x_bin, y_bin] += 1
n = np.sum(results)
results /= n
x_margins /= n
y_margins /= n
H_A = entropy(x_margins)
H_B = entropy(y_margins)
H_AB = entropy(results.flatten())
MI = (H_A + H_B) / H_AB
if MI < 0:
MI = 0
return MI
def sample_img(img, spacing=10):
sr, sc = np.meshgrid(np.arange(0, img.shape[0], spacing), np.arange(0, img.shape[1], spacing))
sample_r = sr.reshape(-1) + np.random.uniform(0, spacing/2, sr.size)
sample_c = sc.reshape(-1) + np.random.uniform(0, spacing/2, sc.size)
interp = interpolate.RectBivariateSpline(np.arange(0, img.shape[0]), np.arange(0, img.shape[1]), img)
z = np.array([interp(sample_r[i], sample_c[i])[0][0] for i in range(len(sample_c))])
return z[(0 <= z) & (z <= img.max())]
def MI(fixed, moving, nb, spacing):
fixed_sampled = sample_img(fixed, spacing)
moving_sampled = sample_img(moving, spacing)
results, x_margins, y_margins = hist2d(moving_sampled, fixed_sampled, nb)
n = np.sum(results)
results /= n
x_margins /= n
y_margins /= n
H_A = entropy(x_margins)
H_B = entropy(y_margins)
H_AB = entropy(results.flatten())
MI = (H_A + H_B) / H_AB
if MI < 0:
MI = 0
return MI
[docs]
class AffineOptimizer(object):
"""Class that optimizes ridid registration
Attributes
----------
nlevels : int
Number of levels in the Gaussian pyramid
nbins : int
Number of bins to have in histograms used to estimate mutual information
optimization : str
Optimization method. Can be any method from scipy.optimize
"FuzzyPSO" for Fuzzy Self-Tuning PSO in the fst-pso package (https://pypi.org/project/fst-pso/)
"gp_minimize", "forest_minimize", "gbrt_minimize" from scikit-opt
transformation : str
Type of transformation, "EuclideanTransform" or "SimilarityTransform"
current_level : int
Current level of the Guassian pyramid that is being registered
accepts_xy : bool
Bool declaring whether or not the optimizer will use corresponding points to optimize the registration
Methods
-------
setup(moving, fixed, mask, initial_M=None)
Gets images ready for alignment
cost_fxn(fixed_image, transformed, mask)
Calculates metric that is to be minimized
align(moving, fixed, mask, initial_M=None, moving_xy=None, fixed_xy=None)
Align images by minimizing cost_fxn
Notes
-----
All AffineOptimizer subclasses need to have the method align(moving, fixed, mask, initial_M, moving_xy, fixed_xy)
that returns the aligned image, optimal_M, cost_list
AffineOptimizer subclasses must also have a cost_fxn(fixed_image, transformed, mask) method that
returns the registration metric value
If one wants to use the same optimization methods, but a different cost function, then the subclass only needs
to have a new cost_fxn method. See AffineOptimizerDisplacement for an example implementing a new cost function
Major overhauls are possible too. See AffineOptimizerMattesMI for an example on using SimpleITK's
optimization methods inside of an AffineOptimizer subclass
If the optimizer uses corressponding points, then the class attribute
accepts_xy needs to be set to True. The default is False.
"""
accepts_xy = False
[docs]
def __init__(self, nlevels=1, nbins=256, optimization="Powell", transformation="EuclideanTransform"):
"""AffineOptimizer registers moving and fixed images by minimizing a cost function
Parameters
----------
nlevels : int
Number of levels in the Gaussian pyramid
nbins : int
Number of bins to have in histograms used to estimate mutual information
optimization : str
Optimization method. Can be any method from scipy.optimize
transformation : str
Type of transformation, "EuclideanTransform" or "SimilarityTransform"
"""
self.nlevels = nlevels
self.nbins = nbins
self.optimization = optimization
self.transformation = transformation
self.current_level = nlevels - 1
self.accepts_xy = AffineOptimizer.accepts_xy
[docs]
def setup(self, moving, fixed, mask, initial_M=None):
"""Get images ready for alignment
Parameters
----------
moving : ndarray
Image to warp to align with fixed
fixed : ndarray
Image moving is warped to align to
mask : ndarray
2D array having non-zero pixel values, where values of 0 are ignnored during registration
initial_M : (3x3) array
Initial transformation matrix
"""
self.moving = moving
self.fixed = fixed
if mask is None:
self.mask = np.zeros(fixed.shape[0:2], dtype=np.uint8)
self.mask[fixed != 0] = 1
else:
self.mask = mask
self.pyramid_fixed = list(gaussian_pyramid(fixed, levels=self.nlevels))
self.pyramid_moving = list(gaussian_pyramid(moving, levels=self.nlevels))
self.pyramid_mask = list(gaussian_pyramid(self.mask, levels=self.nlevels))
if self.transformation == "EuclideanTransform":
self.p = np.zeros(3)
else:
self.p = np.zeros(4)
self.p[3] = 1
if initial_M is not None:
(tx, ty), rotation, (scale_x, scale_y), shear = \
get_affine_transformation_params(initial_M)
self.p[0] = rotation
self.p[1] = tx
self.p[2] = ty
if transform == "SimilarityTransform":
self.p[3] = scale_x
[docs]
def cost_fxn(self, fixed_image, transformed, mask):
return -normalized_mutual_information(fixed_image, transformed, mask, n_bins=self.nbins)
def calc_cost(self, p):
"""Static cost function passed into scipy.optimize
"""
transformation = make_transform(p)
transformed = transform.warp(self.pyramid_moving[self.current_level], transformation.params, order=3)
if np.all(transformed == 0):
return np.inf
return self.cost_fxn(self.pyramid_fixed[self.current_level], transformed, self.pyramid_mask[self.current_level])
[docs]
def align(self, moving, fixed, mask, initial_M=None, moving_xy=None, fixed_xy=None):
"""Align images by minimizing self.cost_fxn. Aligns each level of the Gaussian pyramid, and uses previous transform
as the initial guess in the next round of optimization. Also uses other "good" estimates to define the
parameter boundaries.
Parameters
----------
moving : ndarray
Image to warp to align with fixed
fixed : ndarray
Image moving is warped to align with
mask : ndarray
2D array having non-zero pixel values, where values of 0 are ignnored during registration
initial_M : (3x3) array
Initial transformation matrix
moving_xy : ndarray, optional
(N, 2) array containing points in the moving image that correspond to those in the fixed image
fixed_xy : ndarray, optional
(N, 2) array containing points in the fixed image that correspond to those in the moving image
Returns
-------
aligned : (N,M) array
Moving image warped to align with the fixed image
M : (3,3) array
Optimal transformation matrix
cost_list : list
list containing the minimized cost for each level in the pyramid
"""
self.setup(moving, fixed, mask, initial_M)
method = self.optimization
levels = range(self.nlevels-1, -1, -1) # Iterate from top to bottom of pyramid
cost_list = [None] * self.nlevels
other_params = None
for n in levels:
self.current_level = n
self.p[1:3] *= 2
if other_params is None:
max_tc = self.pyramid_moving[self.current_level].shape[1]
max_tr = self.pyramid_moving[self.current_level].shape[0]
param_bounds = [[0, np.deg2rad(360)],
[-max_tc, max_tc],
[-max_tr, max_tr]]
if self.transformation == "SimilarityTransform":
param_bounds.append([self.p[3] * 0.5, self.p[3] * 2])
# Update bounds based on best fits in previous level
else:
param_mins = np.min(other_params, axis=0)
param_maxes = np.max(other_params, axis=0)
param_bounds = [[param_mins[0], param_maxes[0]],
[2*param_mins[1], 2*param_maxes[1]],
[2*param_mins[2], 2*param_maxes[2]]]
if self.transformation == "SimilarityTransform":
param_bounds.append([param_mins[3], param_maxes[3]])
# Optimize #
if method.upper() == 'BH':
res = optimize.basinhopping(self.calc_cost, self.p)
new_p = res.x
cst = res.fun
if n <= self.nlevels//2: # avoid basin-hopping in lower levels
method = 'Powell'
elif method == 'Nelder-Mead':
res = optimize.minimize(self.calc_cost, self.p, method=method, bounds=param_bounds)
new_p = res.x
cst = np.float(res.fun)
else:
# Default is Powell, which doesn't accept bounds
res = optimize.minimize(self.calc_cost, self.p, method=method, options={"return_all": True})
new_p = res.x
cst = np.float(res.fun)
if hasattr(res, "allvecs"):
other_params = np.vstack(res.allvecs)
if n <= self.nlevels // 2: # avoid basin-hopping in lower levels
method = 'Powell'
# Update #
self.p = new_p
cost_list[self.current_level] = cst
tf = make_transform(self.p)
optimal_M = tf.params
w = transform.warp(self.pyramid_moving[n], optimal_M, order=3)
if np.all(w == 0):
print(Warning("Image warped out of bounds. Registration failed"))
return False, np.ones_like(optimal_M), cost_list
tf = make_transform(self.p)
M = tf.params
aligned = transform.warp(self.moving, M, order=3)
return aligned, M, cost_list
[docs]
class AffineOptimizerMattesMI(AffineOptimizer):
""" Optimize rigid registration using Simple ITK
AffineOptimizerMattesMI is an AffineOptimizer subclass that uses simple ITK's AdvancedMattesMutualInformation.
If moving_xy and fixed_xy are also provided, then Mattes mutual information will be maximized, while the distance
between moving_xy and fixed_xy will be minimized (the CorrespondingPointsEuclideanDistanceMetric in Simple ITK).
Attributes
----------
nlevels : int
Number of levels in the Gaussian pyramid
nbins : int
Number of bins to have in histograms used to estimate mutual information
transformation : str
Type of transformation, "EuclideanTransform" or "SimilarityTransform"
Reg : sitk.ElastixImageFilter
sitk.ElastixImageFilter object that will perform the optimization
fixed_kp_fname : str
Name of file where to fixed_xy will be temporarily be written. Eventually deleted
moving_kp_fname : str
Name of file where to moving_xy will be temporarily be written. Eventually deleted
Methods
-------
setup(moving, fixed, mask, initial_M=None, moving_xy=None, fixed_xy=None)
Create parameter map and initialize Reg
calc_cost(p)
Inherited but not used, returns None
write_elastix_kp(kp, fname)
Temporarily write fixed_xy and moving_xy to file
align(moving, fixed, mask, initial_M=None, moving_xy=None, fixed_xy=None)
Align images by minimizing cost_fxn
"""
accepts_xy = True
[docs]
def __init__(self, nlevels=4.0, nbins=32,
optimization="AdaptiveStochasticGradientDescent", transform="EuclideanTransform"):
super().__init__(nlevels, nbins, optimization, transform)
self.Reg = None
self.accepts_xy = AffineOptimizerMattesMI.accepts_xy
self.fixed_kp_fname = os.path.join(pathlib.Path(__file__).parent, ".fixedPointSet.pts")
self.moving_kp_fname = os.path.join(pathlib.Path(__file__).parent, ".movingPointSet.pts")
def cost_fxn(self, fixed_image, transformed, mask):
return None
[docs]
def write_elastix_kp(self, kp, fname):
"""
Temporarily write fixed_xy and moving_xy to file
Parameters
----------
kp: ndarray
(N, 2) numpy array of points (xy)
fname: str
Name of file in which to save the points
"""
argfile = open(fname, 'w')
npts = kp.shape[0]
argfile.writelines(f"index\n{npts}\n")
for i in range(npts):
xy = kp[i]
argfile.writelines(f"{xy[0]} {xy[1]}\n")
[docs]
def setup(self, moving, fixed, mask, initial_M=None, moving_xy=None, fixed_xy=None):
"""
Create parameter map and initialize Reg
Parameters
----------
moving : ndarray
Image to warp to align with fixed
fixed : ndarray
Image moving is warped to align to
mask : ndarray
2D array having non-zero pixel values, where values of 0 are ignnored during registration
initial_M : (3x3) array
Initial transformation matrix
moving_xy : ndarray, optional
(N, 2) array containing points in the moving image that correspond to those in the fixed image
fixed_xy : ndarray, optional
(N, 2) array containing points in the fixed image that correspond to those in the moving image
"""
if initial_M is None:
initial_M = np.eye(3)
self.moving = moving
self.fixed = fixed
self.Reg = sitk.ElastixImageFilter()
rigid_map = sitk.GetDefaultParameterMap('affine')
rigid_map['NumberOfResolutions'] = [str(int(self.nlevels))]
if self.transformation == "EuclideanTransform":
rigid_map["Transform"] = ["EulerTransform"]
else:
rigid_map["Transform"] = ["SimilarityTransform"]
rigid_map["Registration"] = ["MultiMetricMultiResolutionRegistration"]
if moving_xy is not None and fixed_xy is not None:
self.write_elastix_kp(fixed_xy, self.fixed_kp_fname)
self.write_elastix_kp(moving_xy, self.moving_kp_fname)
current_metrics = rigid_map["Metric"]
current_metrics = list(current_metrics)
current_metrics.append("CorrespondingPointsEuclideanDistanceMetric")
rigid_map["Metric"] = current_metrics
self.Reg.SetFixedPointSetFileName(self.fixed_kp_fname)
self.Reg.SetMovingPointSetFileName(self.moving_kp_fname)
rigid_map["Optimizer"] = [self.optimization]
rigid_map["NumberOfHistogramBins"] = [str(self.nbins)]
self.Reg.SetParameterMap(rigid_map)
if mask is not None:
self.Reg.SetFixedMask(sitk.GetImageFromArray(mask))
sitk_moving = sitk.GetImageFromArray(moving)
sitk_fixed = sitk.GetImageFromArray(fixed)
self.Reg.SetMovingImage(sitk_moving) # image to warp
self.Reg.SetFixedImage(sitk_fixed) # image to align with
[docs]
def calc_cost(self, p):
return None
[docs]
def align(self, moving, fixed, mask, initial_M=None,
moving_xy=None, fixed_xy=None):
"""
Optimize rigid registration
Parameters
----------
moving : ndarray
Image to warp to align with fixed
fixed : ndarray
Image moving is warped to align with
mask : ndarray
2D array having non-zero pixel values, where values of 0 are ignnored during registration
initial_M : (3x3) array
Initial transformation matrix
moving_xy : ndarray, optional
(N, 2) array containing points in the moving image that correspond to those in the fixed image
fixed_xy : ndarray, optional
(N, 2) array containing points in the fixed image that correspond to those in the moving image
Returns
-------
aligned : (N,M) array
Moving image warped to align with the fixed image
M : (3,3) array
Optimal transformation matrix
cost_list : None
None is returned because costs are not recorded
"""
self.setup(moving, fixed, mask, initial_M, moving_xy, fixed_xy)
self.Reg.Execute()
# See section 2.6 in manual. This is the inverse transform.
# Rotation is in radians
tform_params = self.Reg.GetTransformParameterMap()[0]["TransformParameters"]
if self.transformation == "EuclideanTransform":
rotation, tx, ty = [eval(v) for v in tform_params]
scale = 1.0
else:
scale, rotation, tx, ty = [eval(v) for v in tform_params]
M = transform.SimilarityTransform(scale=scale, rotation=rotation,
translation=(tx, ty)).params
aligned = transform.warp(self.moving, M, order=3)
# Clean up #
if moving_xy is not None and fixed_xy is not None:
if os.path.exists(self.fixed_kp_fname):
os.remove(self.fixed_kp_fname)
if os.path.exists(self.moving_kp_fname):
os.remove(self.moving_kp_fname)
tform_files = [f for f in os.listdir(".") if
f.startswith("TransformParameters.") and
f.endswith(".txt")]
if len(tform_files) > 0:
for f in tform_files:
os.remove(f)
return aligned, M, None
class AffineOptimizerRMI(AffineOptimizer):
def __init__(self, r=6, nlevels=1, nbins=256, optimization="Powell", transform="euclidean"):
super().__init__(nlevels, nbins, optimization, transform)
self.r = r
def cost_fxn(self, fixed_image, transformed, mask):
r_ratio = self.r/np.min(self.pyramid_fixed[0].shape)
level_rad = int(r_ratio*np.min(fixed_image.shape))
if level_rad == 0:
level_rad = 1
return -region_mi(fixed_image, transformed, mask, r=level_rad)
class AffineOptimizerDisplacement(AffineOptimizer):
def __init__(self, nlevels=1, nbins=256, optimization="Powell", transform="euclidean"):
super().__init__(nlevels, nbins, optimization, transform)
def cost_fxn(self, fixed_image, transformed, mask):
return displacement(fixed_image, transformed, mask)
class AffineOptimizerKNN(AffineOptimizer):
def __init__(self, nlevels=1, nbins=256, optimization="Powell", transform="euclidean"):
super().__init__(nlevels, nbins, optimization, transform)
self.HA_list = [None]*nlevels
def shannon_entropy(self, X, k=1):
"""
Adapted from https://pybilt.readthedocs.io/en/latest/_modules/pybilt/common/knn_entropy.html
to use sklearn's KNN, which is much faster
"""
from sklearn import neighbors
from scipy.special import gamma, psi
# Get distance to kth nearest neighbor
knn = neighbors.NearestNeighbors(n_neighbors=k)
knn.fit(X.reshape(-1, 1))
r_k, idx = knn.kneighbors()
lr_k = np.log(r_k[r_k > 0])
d = 1
if len(X.shape) == 2:
d = X.shape[1]
# volume of unit ball in d^n
v_unit_ball = np.pi ** (0.5 * d) / gamma(0.5 * d + 1.0)
n = len(X)
H = psi(n) - psi(k) + np.log(v_unit_ball) + (np.float(d) / np.float(n)) * (lr_k.sum())
return H
def mutual_information(self, A, B):
if self.HA_list[self.current_level] is None:
# Only need to caluclate once per level, becuase the fixed
# image doesn't change
self.HA_list[self.current_level] = self.shannon_entropy(A)
HA = self.HA_list[self.current_level]
HB = self.shannon_entropy(B)
joint = np.hstack([A, B])
Hjoint = self.shannon_entropy(joint, k=2)
MI = HA + HB - Hjoint
if MI < 0:
MI = 0
return MI
def cost_fxn(self, fixed_image, transformed, mask):
if mask is not None:
fixed_flat = fixed_image[mask != 0]
transformed_flat = transformed[mask != 0]
else:
fixed_flat = fixed_image.reshape(-1)
transformed_flat = transformed.reshape(-1)
return -self.mutual_information(fixed_flat, transformed_flat)
class AffineOptimizerOffGrid(AffineOptimizer):
def __init__(self, nlevels, nbins=256, optimization="Powell", transform="euclidean", spacing=5):
super().__init__(nlevels, nbins, optimization, transform)
self.spacing = spacing
def setup(self, moving, fixed, mask, initial_M=None):
AffineOptimizer.setup(self, moving, fixed, mask, initial_M)
self.moving_interps = [self.get_interp(img)
for img in self.pyramid_moving]
self.fixed_interps = [self.get_interp(img)
for img in self.pyramid_fixed]
self.z_range = (min(np.min(self.moving[self.nlevels - 1]),
np.min(self.fixed[self.nlevels - 1])),
max(np.max(self.moving[self.nlevels - 1]),
np.max(self.fixed[self.nlevels - 1])))
self.grid_spacings = [self.get_scpaing_for_levels(self.pyramid_fixed[i], self.spacing) for i in range(self.nlevels)]
self.grid_flat = [self.get_regular_grid_flat(i)
for i in range(self.nlevels)]
def get_scpaing_for_levels(self, img_shape, max_level_spacing):
max_shape = self.pyramid_fixed[self.nlevels - 1].shape
shape_ratio = np.mean([img_shape[0]/max_shape[0],
img_shape[0]/max_shape[0]])
level_spacing = int(max_level_spacing*shape_ratio)
if level_spacing == 0:
level_spacing = 1
return level_spacing
def get_regular_grid_flat(self, level):
sr, sc = np.meshgrid(np.arange(0, self.pyramid_fixed[level].shape[0],
self.grid_spacings[level]),
np.arange(0, self.pyramid_fixed[level].shape[1],
self.grid_spacings[level]))
sr = sr.reshape(-1)
sc = sc.reshape(-1)
filtered_sr = sr[self.pyramid_mask[level][sr, sc] > 0]
filtered_sc = sc[self.pyramid_mask[level][sr, sc] > 0]
return (filtered_sr, filtered_sc)
def get_interp(self, img):
return interpolate.RectBivariateSpline(np.arange(0, img.shape[0], dtype=np.float), np.arange(0, img.shape[1], dtype=np.float), img)
def interp_point(self, zr, zc, interp, z_range):
z = np.array([interp(zr[i], zc[i])[0][0] for i in range(zr.size)])
z[z < z_range[0]] = z_range[0]
z[z > z_range[1]] = z_range[1]
return z
def calc_cost(self, p):
transformation = make_transform(p)
corners_rc = get_corners_of_image(self.pyramid_fixed[self.current_level].shape)
warped_corners = warp_xy(corners_rc, transformation.params)
if np.any(warped_corners < 0) or \
np.any(warped_corners[:, 0] > self.pyramid_fixed[self.current_level].shape[0]) or \
np.any(warped_corners[:, 1] > self.pyramid_fixed[self.current_level].shape[1]):
return np.inf
sr, sc = self.grid_flat[self.current_level]
sample_r = sr + np.random.uniform(0, self.grid_spacings[self.current_level] / 2, sr.size)
sample_c = sc + np.random.uniform(0, self.grid_spacings[self.current_level] / 2, sc.size)
# Only sample points in mask
warped_xy = warp_xy(np.dstack([sample_c, sample_r])[0], transformation.params)
fixed_intensities = self.interp_point(warped_xy[:, 1], warped_xy[:, 0], self.fixed_interps[self.current_level], self.z_range)
moving_intensities = self.interp_point(sample_r, sample_c, self.moving_interps[self.current_level], self.z_range)
return self.cost_fxn(fixed_intensities, moving_intensities, self.pyramid_mask[self.current_level])
def cost_fxn(self, fixed_intensities, transformed_intensities, mask):
"""
"""
results, _, _ = np.histogram2d(fixed_intensities, transformed_intensities, bins=self.nbins)
n = np.sum(results)
results /= n
x_margins = np.sum(results, axis=0)
y_margins = np.sum(results, axis=1)
H_A = entropy(x_margins)
H_B = entropy(y_margins)
H_AB = entropy(results.flatten())
MI = (H_A + H_B) / H_AB
if MI < 0:
MI = 0
return -MI