MathOnco / valis

Virtual Alignment of pathoLogy Image Series
MIT License
122 stars 29 forks source link

Annotations transferred with VALIS cannot be modified (despite being unlocked) #75

Closed marco1108 closed 1 year ago

marco1108 commented 1 year ago

Dear @cdgatenbee and @pauldmccarthy, I am using a code to transfer annotations from a reference image to a target image using VALIS to align. I have pasted the script below. I have seen that some (but not all) the annotations in the co-registered geojson file do not allow to be modified - as if they were locked. However, the annotations are not locked on the co-registered geojson, and are unlocked and normally functioning in the reference geojson.

Do you have any idea about why that could be?

Here is the script I am using:

import numpy as np import json import os import pathlib import shapely import tqdm from copy import deepcopy

from skimage import exposure import numpy as np from valis import preprocessing

class PreProcessor(preprocessing.ImageProcesser): """Preprocess images for registration

Uses distance to background color and also subtracts the background intensity


def __init__(self, image, src_f, level, series, *args, **kwargs):
    super().__init__(image=image, src_f=src_f, level=level,
                     series=series, *args, **kwargs)

def create_mask(self):
    _, tissue_mask = preprocessing.create_tissue_mask_from_rgb(self.image)

    return tissue_mask

def process_image(self, brightness_q=0.99, *args, **kwargs):

    processed_img, cam = preprocessing.calc_background_color_dist(self.image, brightness_q=brightness_q)
    processed_img = exposure.rescale_intensity(processed_img, in_range="image", out_range=(0, 1))

    # Subtract background
    brightest_thresh = np.quantile(cam[..., 0], 0.9)
    brightest_idx = np.where(cam[..., 0] >= brightest_thresh)
    brightest_pixels = processed_img[brightest_idx]
    brightest_rgb = np.median(brightest_pixels, axis=0)
    no_bg = processed_img - brightest_rgb
    no_bg = np.clip(no_bg, 0, 255)

    # Adjust range and perform adaptive histogram equalization
    no_bg = (255*exposure.equalize_adapthist(no_bg/no_bg.max())).astype(np.uint8)

    return no_bg

from valis import registration, valtils, warp_tools

def clip_xy(xy, shape_rc): """Clip xy coordintaes to be within image

clipped_x =  np.clip(xy[:, 0], 0, shape_rc[1])
clipped_y =  np.clip(xy[:, 1], 0, shape_rc[0])

clipped_xy = np.dstack([clipped_x, clipped_y])[0]
return clipped_xy

def _warp_shapely(geom, warp_fxn, warp_kwargs): """Warp a shapely geometry Based on shapely.ops.trasform

if "dst_shape_rc" in warp_kwargs:
    dst_shape_rc = warp_kwargs["dst_shape_rc"]
elif "to_dst_shape_rc" in warp_kwargs:
    dst_shape_rc = warp_kwargs["to_dst_shape_rc"]
    dst_shape_rc  = None

if geom.geom_type in ("Point", "LineString", "LinearRing", "Polygon"):
    if geom.geom_type in ("Point", "LineString", "LinearRing"):
        warped_xy = warp_fxn(np.vstack(geom.coords), **warp_kwargs)
        if dst_shape_rc is not None:
            warped_xy = clip_xy(warped_xy, dst_shape_rc)

        return type(geom)(warped_xy.tolist())

    elif geom.geom_type == "Polygon":
        shell_xy = warp_fxn(np.vstack(geom.exterior.coords), **warp_kwargs)
        if dst_shape_rc is not None:
            shell_xy = clip_xy(shell_xy, dst_shape_rc)
        shell = type(geom.exterior)(shell_xy.tolist())
        holes = []
        for ring in geom.interiors:
            holes_xy = warp_fxn(np.vstack(ring.coords), **warp_kwargs)
            if dst_shape_rc is not None:
                holes_xy = clip_xy(holes_xy, dst_shape_rc)


        return type(geom)(shell, holes)

elif geom.geom_type.startswith("Multi") or geom.geom_type == "GeometryCollection":
    return type(geom)([_warp_shapely(part, warp_fxn, warp_kwargs) for part in geom.geoms])
    raise shapely.errors.GeometryTypeError(f"Type {geom.geom_type!r} not recognized")

def warp_shapely_geom_from_to(geom, from_M=None, from_transformation_src_shape_rc=None, from_transformation_dst_shape_rc=None, from_src_shape_rc=None, from_dst_shape_rc=None,from_bk_dxdy=None, from_fwd_dxdy=None, to_M=None, to_transformation_src_shape_rc=None, to_transformation_dst_shape_rc=None, to_src_shape_rc=None, to_dst_shape_rc=None, to_bk_dxdy=None, to_fwd_dxdy=None): """ Warp xy points using M and/or bk_dxdy/fwd_dxdy. If bk_dxdy is provided, it will be inverted to create fwd_dxdy

geom : shapely.geometery
    Shapely geom to warp

M : ndarray, optional
     3x3 affine transformation matrix to perform rigid warp

transformation_src_shape_rc : (int, int)
    Shape of image that was used to find the transformation.
    For example, this could be the original image in which features were detected

transformation_dst_shape_rc : (int, int), optional
    Shape of the image with shape `transformation_src_shape_rc` after warping.
    This could be the shape of the original image after applying `M`.

src_shape_rc : optional, (int, int)
    Shape of the image from which the points originated. For example,
    this could be a larger/smaller version of the image that was
    used for feature detection.

dst_shape_rc : optional, (int, int)
    Shape of image (with shape `src_shape_rc`) after warping

bk_dxdy : ndarray, pyvips.Image
    (2, N, M) numpy array of pixel displacements in the x and y
    directions from the reference image. dx = bk_dxdy[0],
    and dy=bk_dxdy[1]. If `bk_dxdy` is not None, but
    `fwd_dxdy` is None, then `bk_dxdy` will be inverted to warp `xy`.

fwd_dxdy : ndarray, pyvips.Image
    Inverse of bk_dxdy. dx = fwd_dxdy[0], and dy=fwd_dxdy[1].
    This is what is actually used to warp the points.

pt_buffer : int
    If `bk_dxdy` or `fwd_dxdy` are pyvips.Image object, then
    pt_buffer` determines the size of the window around the point used to
    get the local displacements.

warped_geom : shapely.geom
   Warped `geom`


warp_kwargs = {"from_M": from_M,
               "from_transformation_src_shape_rc": from_transformation_src_shape_rc,
               "from_transformation_dst_shape_rc": from_transformation_dst_shape_rc,
               "from_src_shape_rc": from_src_shape_rc,
               "to_transformation_src_shape_rc": to_transformation_src_shape_rc,
               "to_transformation_dst_shape_rc": to_transformation_dst_shape_rc,
               "to_src_shape_rc": to_src_shape_rc,
               "to_dst_shape_rc": to_dst_shape_rc, "to_bk_dxdy": to_bk_dxdy,

warped_geom = _warp_shapely(geom, warp_tools.warp_xy_from_to, warp_kwargs)

return warped_geom

def warp_geojson_from_to(geojson_f, annotation_slide, to_slide_obj, src_slide_level=0, src_pt_level=0, dst_slide_level=0, non_rigid=True): """Warp geoms in geojson file from annotation slide to another unwarped slide

Takes a set of geometries found in this annotation slide, and warps them to
their position in the unwarped "to" slide.

geojson_f : str
    Path to geojson file containing the annotation geometries

to_slide_obj : Slide
    Slide to which the points will be warped. I.e. `xy`
    will be warped from this Slide to their position in
    the unwarped slide associated with `to_slide_obj`.

src_pt_level: int, tuple, optional
    Pyramid level of the slide/image in which `xy` originated.
    For example, if `xy` are from the centroids of cell segmentation
    performed on the unwarped full resolution image, this should be 0.
    Alternatively, the value can be a tuple of the image's shape (row, col)
    from which the points came. For example, if `xy` are  bounding
    box coordinates from an analysis on a lower resolution image,
    then pt_level is that lower resolution image's shape (row, col).

dst_slide_level: int, tuple, optional
    Pyramid level of the slide/image in to `xy` will be warped.
    Similar to `src_pt_level`, if `dst_slide_level` is an int then
    the points will be warped to that pyramid level. If `dst_slide_level`
    is the "to" image's shape (row, col), then the points will be warped
    to their location in an image with that same shape.

non_rigid : bool, optional
    Whether or not to conduct non-rigid warping. If False,
    then only a rigid transformation will be applied.


if np.issubdtype(type(src_pt_level), np.integer):
    src_pt_dim_rc = annotation_slide.slide_dimensions_wh[src_pt_level][::-1]
    src_pt_dim_rc = np.array(src_pt_level)

if np.issubdtype(type(dst_slide_level), np.integer):
    to_slide_src_shape_rc = to_slide_obj.slide_dimensions_wh[dst_slide_level][::-1]
    to_slide_src_shape_rc = np.array(dst_slide_level)

if src_slide_level != 0:
    if np.issubdtype(type(src_slide_level), np.integer):
        aligned_slide_shape = annotation_slide.val_obj.get_aligned_slide_shape(src_slide_level)
        aligned_slide_shape = np.array(src_slide_level)
    aligned_slide_shape = annotation_slide.aligned_slide_shape_rc

if non_rigid:
    src_fwd_dxdy = annotation_slide.fwd_dxdy
    dst_bk_dxdy = to_slide_obj.bk_dxdy

    src_fwd_dxdy = None
    dst_bk_dxdy = None

with open(geojson_f) as f:
    annotation_geojson = json.load(f)

warped_features = [None]*len(annotation_geojson["features"])
for i, ft in tqdm.tqdm(enumerate(annotation_geojson["features"])):
    geom = shapely.geometry.shape(ft["geometry"])
    warped_geom = warp_shapely_geom_from_to(geom=geom,

    warped_ft = deepcopy(ft)
    warped_ft["geometry"] = shapely.geometry.mapping(warped_geom)
    warped_features[i] = warped_ft

warped_geojson = {"type":annotation_geojson["type"], "features":warped_features}

return warped_geojson

if name == "main":

import sys

print(sys.argv, file=sys.stderr)
if (len(sys.argv) < 3):
    print("""ERROR: 2 arguments needed.

Usage: Exiting.""", file=sys.stderr) sys.exit(1)

## FIRST ARG slide, results, annotation dir
slide_src_dir = sys.argv[1]
results_dst_dir = sys.argv[1]
annotation_dir = sys.argv[1]

## SECOND ARG file prefix
prefix = sys.argv[2]

annotation_geojson_f = prefix + "_40X.geojson"
annotation_img_f = prefix + "_BAPP_40X.svs"
target_img_f_fibrinogen = prefix + "_fibrinogen_40X.svs"

## for testing only; remove when done
## sys.exit(0)

# slide_src_dir = "/Users/marcopisa/Library/CloudStorage/OneDrive-Nexus365/Projects/BAPP/Valis/MS018C"
# results_dst_dir = "/Users/marcopisa/Library/CloudStorage/OneDrive-Nexus365/Projects/BAPP/Valis/MS018C"
# annotation_dir = "/Users/marcopisa/Library/CloudStorage/OneDrive-Nexus365/Projects/BAPP/Valis/MS018C"
# annotation_geojson_f = "MS018_SC_C_40X.geojson"
# annotation_img_f = "MS018_SC_C_BAPP_40X.svs"
# target_img_f_fibrinogen = "MS018_SC_C_fibrinogen_40X.svs"

import os
warped_geojson_annotation_f_fibrinogen = os.path.join(results_dst_dir, f"{valtils.get_name(target_img_f_fibrinogen)}_annotations.geojson")

# Perform registration
registrar = registration.Valis(slide_src_dir, results_dst_dir, reference_img_f=annotation_img_f, align_to_reference=True)
rigid_registrar, non_rigid_registrar, error_df = registrar.register(brightfield_processing_cls=PreProcessor)

print('Registration complete')

# Transfer annotation
annotation_geojson_f = os.path.join(annotation_dir, annotation_geojson_f)
# annotation_geojson_f = os.path.join(annotation_dir, "MS018_SC_C_40X.geojson")
annotation_source_slide = registrar.get_slide(annotation_img_f)

target_slide_fibrinogen = registrar.get_slide(target_img_f_fibrinogen)

warped_geojson_fibrinogen = warp_geojson_from_to(annotation_geojson_f, annotation_source_slide, target_slide_fibrinogen)

# Save annotation, which can be dragged and dropped into QuPath
with open(warped_geojson_annotation_f_fibrinogen, 'w') as f:
    json.dump(warped_geojson_fibrinogen, f)