from pathlib import Path import math from rich.console import Console from rich.table import Table from rich.pretty import Pretty import numpy as np import pandas as pd import cv2 from sklearn.cluster import MeanShift from skimage.transform import hough_circle, hough_circle_peaks import torch from torch.utils.data import Dataset, DataLoader from torchvision import transforms from torchvision.models.detection.faster_rcnn import FastRCNNPredictor from torchvision.models.detection import ( fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights, ) import pytorch_lightning as pl from pytorch_lightning.callbacks import RichProgressBar from pytorch_lightning import Trainer import albumentations as A from albumentations.pytorch.transforms import ToTensorV2 import matplotlib.pyplot as plt import com_const as cc import com_image as ci g_device = ( "mps" if torch.backends.mps.is_built() is True else "cuda" if torch.backends.cuda.is_built() else "cpu" ) def load_tray_image(image_name): return ci.load_image( file_name=image_name, path_to_images=cc.path_to_plates, rgb=True ) def build_albumentations( image_size: int = 10, gamma=(60, 180), mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), ): return { "resize": [ A.Resize(height=image_size * 32 * 2, width=image_size * 32 * 3, p=1) ], "train": [ A.HorizontalFlip(p=0.3), A.RandomBrightnessContrast( brightness_limit=0.25, contrast_limit=0.25, p=0.5 ), A.RandomGamma(gamma_limit=gamma, p=0.5), ], "to_tensor": [A.Normalize(mean=mean, std=std, p=1), ToTensorV2()], "un_normalize": [ A.Normalize( mean=[-m / s for m, s in zip(mean, std)], std=[1.0 / s for s in std], always_apply=True, max_pixel_value=1.0, ), ], } def get_augmentations( image_size: int = 10, gamma=(60, 180), kinds: list = ["resize", "to_tensor"], mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), inferrence: bool = False, ): td_ = build_albumentations( image_size=image_size, gamma=gamma, mean=mean, std=std, ) augs = [] for k in kinds: augs += td_[k] if inferrence is True: return A.Compose(augs) else: return A.Compose( augs, bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, ) def safe_row_col(row, col): """Ensures that row is a string and col is an integer Args: row (int or str): row output must be string col (int or str): col output must be int """ if row is not None and col is not None: if isinstance(col, str): row, col = col, row return row, col def _update_axis(axis, image, title=None, fontsize=10, remove_axis=True): axis.imshow(image, origin="upper") if title is not None: axis.set_title(title, fontsize=fontsize) def make_patches_grid(images, row_count, col_count=None, figsize=(20, 20)): col_count = row_count if col_count is None else col_count _, axii = plt.subplots(row_count, col_count, figsize=figsize) for ax, image in zip(axii.reshape(-1), images): if isinstance(image, tuple): title = image[1] image = image[0] else: title = None try: _update_axis(axis=ax, image=image, remove_axis=True, title=title) except: pass ax.set_axis_off() plt.tight_layout() plt.show() def print_boxes( image_name, boxes, highlight=(None, None), draw_first_line: bool = False, return_plot: bool = True, ): r, c = safe_row_col(*highlight) image = load_tray_image(image_name=image_name) fnt = cv2.FONT_HERSHEY_SIMPLEX fnt_scale = 3 fnt_thickness = 8 column_colors = { 1: (255, 0, 0), 2: (0, 0, 255), 3: (255, 255, 0), 4: (0, 255, 255), } for box in boxes[["x1", "y1", "x2", "y2", "cx", "cy", "row", "col"]].values: color = ( (255, 0, 255) if c == box[7] and r == box[6] else column_colors.get(box[7], (255, 255, 244)) ) thickness = 20 if c == box[7] and r == box[6] else 10 image = cv2.rectangle( image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, thickness, ) label = str(box[6]).upper() + str(int(box[7])) (w, h), _ = cv2.getTextSize(label, fnt, fnt_scale, fnt_thickness) x, y = (int(box[0]), int(box[1]) - fnt_thickness) image = cv2.rectangle( image, (x - fnt_thickness, y - h - fnt_thickness), (x + fnt_thickness + w, y + fnt_thickness), color, -1, ) image = cv2.putText( image, label, (x + fnt_thickness, y), fnt, fnt_scale, (0, 0, 0), fnt_thickness, ) if draw_first_line is True: line = get_first_vert_line(image_name=image_name) if line is not None: x1, y1, x2, y2 = line cv2.line( image, [ int(i) for i in (np.array([x2, y2]) - np.array([x1, y1])) * 10 + np.array([x1, y1]) ], [ int(i) for i in (np.array([x1, y1]) - np.array([x2, y2])) * 10 + np.array([x2, y2]) ], (255, 0, 255), 20, lineType=8, ) if return_plot is True: plt.figure(figsize=(10, 10)) plt.imshow(image) plt.tight_layout() plt.axis("off") plt.show() else: return image def crop_to_vert(image): return image[0 : image.shape[1] // 2, 0 : image.shape[0] // 3] def get_first_vert_line(image_name, min_angle=80, max_angle=100): r, *_ = cv2.split(load_tray_image(image_name)) red_crop = cv2.normalize( crop_to_vert(r), None, alpha=0, beta=200, norm_type=cv2.NORM_MINMAX, ) lines = cv2.HoughLinesP( image=ci.close( cv2.Canny(red_crop, 50, 200, None, 3), kernel_size=5, proc_times=5, ), rho=1, theta=np.pi / 180, threshold=50, minLineLength=red_crop.shape[0] // 5, maxLineGap=20, ) if lines is not None: min_x = red_crop.shape[0] sel_line = None for _, line in enumerate(lines): x1, y1, x2, y2 = line[0] min_angle, max_angle = min(min_angle, max_angle), max(min_angle, max_angle) line_angle = math.atan2(y2 - y1, x2 - x1) * 180 / math.pi * -1 if min_angle <= abs(line_angle) <= max_angle and min(x1, x2) < min_x: min_x = min(x1, x2) sel_line = (x1, y1, x2, y2) if sel_line is not None: return sel_line else: return None def draw_first_line(image_name, dot_size=10, crop_canvas: bool = False): line = get_first_vert_line(image_name=image_name) if line is None: return canvas x1, y1, x2, y2 = line canvas = load_tray_image(image_name) if crop_canvas is True: canvas = crop_to_vert(canvas) cv2.circle(canvas, (x1, y1), dot_size, (255, 0, 0)) cv2.circle(canvas, (x2, y2), dot_size, (0, 255, 0)) cv2.line(canvas, (x1, y1), (x2, y2), (0, 0, 255), 10) return canvas def get_bbox(image_name, bboxes, row, col): if isinstance(bboxes, pd.Series): return bboxes else: row, col = safe_row_col(row, col) return bboxes[ ( bboxes.file_name == (image_name.name if isinstance(image_name, Path) else image_name) ) & (bboxes.row == row) & (bboxes.col == col) ].iloc[0] def get_hough_leaf_disc_circle( image_name, bboxes, row=-1, col=-1, padding: int = 10, allow_move: bool = False, ): padded_leaf_disk = get_leaf_disk_wbb( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, ) *_, b = cv2.split(padded_leaf_disk) min_t, max_t = 100, 200 rb = cv2.Canny( cv2.normalize( b, None, alpha=0, beta=200, norm_type=cv2.NORM_MINMAX, ), min_t, max_t, None, 3, ) bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) hough_radii = np.arange(bbox.max_size // 2 - 10, bbox.max_size // 2 + 10, 10) hough_res = hough_circle(rb, hough_radii) # Select the most prominent n circles _, cx, cy, radii = hough_circle_peaks( hough_res, hough_radii, min_xdistance=10, min_ydistance=10, total_num_peaks=1, ) cx = cx[0] cy = cy[0] r = radii[0] if allow_move is True: h, w, c = padded_leaf_disk.shape if cx - r < 0: cx += abs(r - cx) if cx + r > w: cx -= abs(r - cx) if cy - r < 0: cy += abs(cy - r) if cy + r > h: cy -= abs(cy - r) return dict(cx=cx, cy=cy, r=radii) def get_hough_leaf_disk_patch( image_name, bboxes, patch_size=-1, row=-1, col=-1, padding: int = 10, radius_crop=0, disc=None, allow_move: bool = False, image_folder=None, ): if patch_size > 0: try: bbox = get_bbox(image_name, bboxes, row, col) cx = int(bbox.cx) cy = int(bbox.cy) except: return None patch_size = patch_size // 2 return A.crop( load_tray_image(image_name, image_folder=image_folder), cx - patch_size, cy - patch_size, cx + patch_size, cy + patch_size, ) else: if disc is None: disc = get_hough_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, allow_move=allow_move, ) r = int((disc["r"] - radius_crop) / math.sqrt(2)) cx = int(disc["cx"]) cy = int(disc["cy"]) left = cx - r top = cy - r right = cx + r bottom = cy + r return get_leaf_disk_wbb( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, )[top:bottom, left:right] def get_hough_segment_disk( image_name, bboxes, row=-1, col=-1, padding: int = 10, radius_crop=0, disc=None, allow_move: bool = False, ): if disc is None: disc = get_hough_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, allow_move=allow_move, ) padded_leaf_disk = get_leaf_disk_wbb( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, ) r = int(disc["r"] - radius_crop) rc = int((disc["r"] - radius_crop) / math.sqrt(2)) cx = int(disc["cx"]) cy = int(disc["cy"]) left = cx - r top = cy - r right = cx + r bottom = cy + r return cv2.bitwise_and( padded_leaf_disk, padded_leaf_disk, mask=cv2.circle(np.zeros_like(padded_leaf_disk[:, :, 0]), (cx, cy), r, 255, -1), )[top:bottom, left:right] def draw_hough_bb_to_patch_process( image_name, bboxes, row=-1, col=-1, padding: int = 10, radius_crop=0, disc=None, allow_move: bool = False, ): if disc is None: disc = get_hough_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, allow_move=allow_move, ) padded_leaf_disk = get_leaf_disk_wbb( image_name=image_name, bboxes=bboxes, row=row, col=col, padding=padding, ) r = int(disc["r"] - radius_crop) rc = int((disc["r"] - radius_crop) / math.sqrt(2)) cx = int(disc["cx"]) cy = int(disc["cy"]) left = cx - r top = cy - r right = cx + r bottom = cy + r return cv2.circle( cv2.circle( cv2.rectangle( cv2.rectangle( padded_leaf_disk, (cx - rc, cy - rc), (cx + rc, cy + rc), (0, 255, 0), 5, ), (left, top), (right, bottom), (255, 0, 155), 5, ), (cx, cy), 10, (255, 0, 155), -1, ), (cx, cy), r, (255, 0, 155), 5, ) def get_leaf_disk_wbb(image_name, bboxes, row=-1, col=-1, image_path: Path = None): try: bbox = get_bbox(image_name, bboxes, row, col) return load_tray_image(image_name if image_path is None else image_path)[ int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2) ] except: return None def get_fast_leaf_disc_circle( image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0 ): bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) return int(bbox.cx), int(bbox.cy), int((bbox.max_size / 2) * percent_radius) def get_fast_segment_disk( image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0, image_path: Path = None, ): cx, cy, r = get_fast_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, percent_radius=percent_radius, ) src_image = load_tray_image(image_name if image_path is None else image_path) left = cx - r top = cy - r right = cx + r bottom = cy + r return cv2.bitwise_and( src_image, src_image, mask=cv2.circle(np.zeros_like(src_image[:, :, 0]), (cx, cy), r, 255, -1), )[top:bottom, left:right] def get_fast_leaf_disk_patch( image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0, image_path: Path = None, ): cx, cy, r = get_fast_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, percent_radius=percent_radius, ) r = int(r / math.sqrt(2)) left = cx - r top = cy - r right = cx + r bottom = cy + r return load_tray_image(image_name if image_path is None else image_path)[ top:bottom, left:right ] def draw_fast_bb_to_patch_process( image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0, image_path: Path = None, add_center: bool = True, ): cx, cy, r = get_fast_leaf_disc_circle( image_name=image_name, bboxes=bboxes, row=row, col=col, percent_radius=percent_radius, ) bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) image = load_tray_image(image_name if image_path is None else image_path) rc = int(r / math.sqrt(2)) cv2.circle(image, (cx, cy), r, color=(255, 0, 155), thickness=5) if add_center is True: cv2.circle(image, (cx, cy), 10, color=(255, 0, 155), thickness=-1) cv2.rectangle(image, (cx - rc, cy - rc), (cx + rc, cy + rc), (0, 255, 0), 5) return image[int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2)] class LeafDiskDetectorDataset(Dataset): def __init__( self, csv, transform=None, yxyx: bool = False, return_id: bool = False, bboxes: bool = True, ): self.boxes = csv.copy() self.images = list(self.boxes.plate_name.unique()) self.transforms = transform if transform is not None: self.width, self.height = transform[0].width, transform[0].height else: self.width, self.height = 0, 0 self.yxyx = yxyx self.return_id = return_id self.bboxes = bboxes def __len__(self): return len(self.images) def load_boxes(self, idx): if "x" in self.boxes.columns: boxes = self.boxes[self.boxes.plate_name == self.images[idx]].dropna() size = boxes.shape[0] return ( (size, boxes[["x1", "y1", "x2", "y2"]].values) if size > 0 else (0, []) ) return 0, [] def load_tray_image(self, idx): return load_tray_image(self.images[idx]) def get_by_sample_name(self, plate_name): return self[self.images.index(plate_name)] def get_image_by_name(self, plate_name): return load_tray_image(plate_name) def draw_image_with_boxes(self, plate_name): image, labels, *_ = self[self.images.index(plate_name)] boxes = labels[self.get_boxes_key()] for box in boxes: box_indexes = [1, 0, 3, 2] if self.yxyx is True else [0, 1, 2, 3] image = cv2.rectangle( image, # Boxes are in yxyx format (int(box[box_indexes[0]]), int(box[box_indexes[1]])), (int(box[box_indexes[2]]), int(box[box_indexes[3]])), (255, 0, 0), 2, ) return image def get_boxes_key(self): return "bboxes" if self.bboxes is True else "boxes" def __getitem__(self, index): num_box, boxes = self.load_boxes( index ) # return list of [xmin, ymin, xmax, ymax] img = self.load_tray_image(index) # return an image if num_box > 0: boxes = torch.as_tensor(boxes, dtype=torch.float32) else: # negative example, ref: https://github.com/pytorch/vision/issues/2144 boxes = torch.zeros((0, 4), dtype=torch.float32) image_id = torch.tensor([index]) labels = torch.ones((num_box,), dtype=torch.int64) target = { self.get_boxes_key(): boxes, "labels": labels, "image_id": image_id, "area": torch.as_tensor( (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]), dtype=torch.float32, ), "iscrowd": torch.zeros((num_box,), dtype=torch.int64), "img_size": torch.tensor([self.height, self.width]), "img_scale": torch.tensor([1.0]), } if self.transforms is not None: sample = { "image": img, "bboxes": target[self.get_boxes_key()], "labels": labels, } sample = self.transforms(**sample) img = sample["image"] if num_box > 0: # Convert to ndarray to allow slicing boxes = np.array(sample["bboxes"]) # Convert to yxyx if self.yxyx is True: boxes[:, [0, 1, 2, 3]] = boxes[:, [1, 0, 3, 2]] # Convert to tensor target[self.get_boxes_key()] = torch.as_tensor( boxes, dtype=torch.float32 ) else: target[self.get_boxes_key()] = torch.zeros((0, 4), dtype=torch.float32) else: img = transforms.ToTensor()(img) if self.return_id is True: return img, target, image_id else: return img, target def collate_fn(batch): images, targets = tuple(zip(*batch)) images = torch.stack(images) images = images.float() boxes = [target["boxes"].float() for target in targets] labels = [target["labels"].float() for target in targets] return images, targets def find_best_lr(model, default_root_dir=cc.path_to_chk_detector): # run learning rate finder, results override hparams.learning_rate trainer = Trainer( default_root_dir=default_root_dir, auto_lr_find=True, accelerator="gpu", callbacks=[RichProgressBar()], ) # call tune to find the lr trainer.tune(model) return model.learning_rate class LeafDiskDetector(pl.LightningModule): def __init__( self, batch_size: int, learning_rate: float, max_epochs: int, image_factor: int, train_data: pd.DataFrame, val_data: pd.DataFrame, test_data: pd.DataFrame, augmentations_kinds: list = ["resize", "train", "to_tensor"], augmentations_params: dict = {"gamma": (60, 180)}, num_workers: int = 0, accumulate_grad_batches: int = 3, selected_device: str = g_device, optimizer: str = "adam", scheduler: str = None, scheduler_params: dict = {}, ): super().__init__() self.model_name = "ldd" # Hyperparameters self.batch_size = batch_size self.selected_device = selected_device self.learning_rate = learning_rate self.num_workers = num_workers self.max_epochs = max_epochs self.accumulate_grad_batches = accumulate_grad_batches # dataframes self.train_data = train_data self.val_data = val_data self.test_data = test_data # Optimizer self.optimizer = optimizer self.scheduler = scheduler self.scheduler_params = scheduler_params # albumentations self.image_factor = image_factor self.augmentations_kinds = augmentations_kinds self.augmentations_params = augmentations_params self.train_augmentations = get_augmentations( image_size=self.image_factor, kinds=self.augmentations_kinds, **self.augmentations_params, ) self.val_augmentations = get_augmentations( image_size=self.image_factor, kinds=["resize", "to_tensor"], **self.augmentations_params, ) # Model self.encoder = fasterrcnn_resnet50_fpn_v2( weights=FasterRCNN_ResNet50_FPN_V2_Weights ) num_classes = 2 # 1 class (wheat) + background # get number of input features for the classifier in_features = self.encoder.roi_heads.box_predictor.cls_score.in_features # replace the pre-trained head with a new one self.encoder.roi_heads.box_predictor = FastRCNNPredictor( in_features, num_classes ) self.save_hyperparameters() def hr_desc(self): table = Table(title=f"{self.model_name} params & values") table.add_column("Param", justify="right", style="bold", no_wrap=True) table.add_column("Value") def add_pairs(table_, attributes: list) -> None: for a in attributes: try: table_.add_row(a, Pretty(getattr(self, a))) except: pass add_pairs( table, ["model_name", "batch_size", "num_workers", "accumulate_grad_batches"], ) table.add_row("image_width", Pretty(self.train_augmentations[0].width)) table.add_row("image_height", Pretty(self.train_augmentations[0].height)) add_pairs( table, ["image_factor", "augmentations_kinds", "augmentations_params"], ) add_pairs( table, ["learning_rate", "optimizer", "scheduler", "scheduler_params"], ) for name, df in zip( ["train", "val", "test"], [self.train_data, self.val_data, self.test_data], ): table.add_row( name, Pretty( f"shape: {str(df.shape)}, images: {len(df.plate_name.unique())}" ), ) Console().print(table) def configure_optimizers(self): # Optimizer if self.optimizer == "adam": optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) elif self.optimizer == "sgd": optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate) else: optimizer = None # Scheduler if self.scheduler == "cycliclr": scheduler = torch.optim.lr_scheduler.CyclicLR( optimizer, base_lr=self.learning_rate, max_lr=0.01, step_size_up=100, mode=self.scheduler_mode, ) elif self.scheduler == "steplr": self.scheduler_params["optimizer"] = optimizer scheduler = torch.optim.lr_scheduler.StepLR(**self.scheduler_params) self.scheduler_params.pop("optimizer") elif self.scheduler == "plateau": scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="min", factor=0.2, patience=10, min_lr=1e-6, ) scheduler = {"scheduler": scheduler, "monitor": "val_loss"} else: scheduler = None if scheduler is None: return optimizer else: return [optimizer], [scheduler] def train_dataloader(self): return DataLoader( LeafDiskDetectorDataset( csv=self.train_data, transform=self.train_augmentations, bboxes=False, ), batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers, collate_fn=collate_fn, pin_memory=True, ) def val_dataloader(self): return DataLoader( LeafDiskDetectorDataset( csv=self.train_data, transform=self.val_augmentations, bboxes=False, ), batch_size=self.batch_size, num_workers=self.num_workers, collate_fn=collate_fn, pin_memory=True, ) def test_dataloader(self): return DataLoader( LeafDiskDetectorDataset( csv=self.train_data, transform=self.val_augmentations, bboxes=False, ), batch_size=self.batch_size, num_workers=self.num_workers, collate_fn=collate_fn, pin_memory=True, ) def forward(self, x): return self.encoder(x) def step_(self, batch, batch_index): x, y = batch self.train() loss_dict = self.encoder(x, y) return sum(loss for loss in loss_dict.values()) def training_step(self, batch, batch_idx): loss = self.step_(batch=batch, batch_index=batch_idx) self.log( "train_loss", loss, on_step=True, prog_bar=True, batch_size=self.batch_size ) return loss def validation_step(self, batch, batch_idx): loss = self.step_(batch=batch, batch_index=batch_idx) self.log( "val_loss", loss, on_epoch=True, on_step=False, prog_bar=True, batch_size=self.batch_size, ) self.log("train_loss", loss) return loss def test_step(self, batch, batch_idx): loss = self.step_( batch=batch, batch_index=batch_idx, batch_size=self.batch_size ) self.log("test_loss", loss) return loss def prepare_bboxes( self, image_name, score_threshold=0.90, ar_threshold=1.5, size_threshold=0.30, ): augs = get_augmentations( image_size=self.image_factor, kinds=["resize", "to_tensor"], inferrence=True, **self.augmentations_params, ) image = load_tray_image(image_name=image_name) self.to(g_device) self.eval() predictions = self(augs(image=image)["image"].to(g_device).unsqueeze(0)) boxes = predictions[0]["boxes"].detach().to("cpu").numpy() scores = predictions[0]["scores"].detach().to("cpu").numpy() filtered_predictions = [ [box[i] for i in range(4)] for box, score in zip(boxes, scores) if score > score_threshold ] restore_size = A.Compose( [A.Resize(width=image.shape[1], height=image.shape[0])], # [A.Resize(width=5000, height=5000)], bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, ) sample = { "image": image, "bboxes": filtered_predictions, "labels": [1 for _ in range(len(filtered_predictions))], } sample = restore_size(**sample) resized_predictions = sample["bboxes"] from siuba import _, filter, mutate boxes = ( pd.DataFrame(data=resized_predictions, columns=["x1", "y1", "x2", "y2"]) >> mutate( x1=_.x1 * image.shape[1] / augs[0].width, y1=_.y1 * image.shape[0] / augs[0].height, x2=_.x2 * image.shape[1] / augs[0].width, y2=_.y2 * image.shape[0] / augs[0].height, ) >> mutate(width=_.x2 - _.x1, height=_.y2 - _.y1) >> mutate(cx=(_.x1 + _.x2) / 2, cy=(_.y1 + _.y2) / 2) >> mutate(area=_.width * _.height) >> mutate(ar=_.width / _.height) ) boxes.insert( 0, "file_name", image_name.name if isinstance(image_name, Path) else image_name, ) boxes["max_size"] = boxes[["width", "height"]].max(axis=1) ar_boxes = ( boxes >> filter(_.width / _.height < ar_threshold) >> filter(_.height / _.width < ar_threshold) ) return ar_boxes[ar_boxes.area > ar_boxes.area.max() * size_threshold] @staticmethod def init_cols(bboxes): bboxes = bboxes.copy() # Handle columns X = np.reshape(bboxes.cx.to_list(), (-1, 1)) ms = MeanShift(bandwidth=100, bin_seeding=True) ms.fit(X) cols = ms.predict(X) bboxes["col"] = cols bboxes = bboxes.sort_values("cx") bboxes["mean_cx"] = ( bboxes.groupby("col").transform("mean", numeric_only=True).cx ) bboxes = bboxes.sort_values("mean_cx") for i, val in enumerate(bboxes.mean_cx.unique()): bboxes.loc[bboxes["mean_cx"] == val, "col"] = i # Handle Rows bboxes = bboxes.sort_values("cy") X = np.reshape(bboxes.cy.to_list(), (-1, 1)) ms = MeanShift(bandwidth=100, bin_seeding=True) ms.fit(X) rows = ms.predict(X) bboxes["row"] = rows bboxes = bboxes.sort_values("cy") bboxes["mean_cy"] = ( bboxes.groupby("row").transform("mean", numeric_only=True).cy ) bboxes = bboxes.sort_values("mean_cy") for i, val in zip(["a", "b", "c"], bboxes.mean_cy.unique()): bboxes.loc[bboxes["mean_cy"] == val, "row"] = i bboxes = bboxes.sort_values("cx") return bboxes @staticmethod def finalize_indexing(bboxes, image_name): bboxes = bboxes.copy() bboxes = bboxes.sort_values("cx") labels_unique = bboxes.col.unique() labels = bboxes.col.to_numpy() if len(labels_unique) < 4: inc_labels = [[i, 0] for i in range(len(labels_unique))] max_width = bboxes.max_size.max() # Handle left-most label # We remove half of max width to take into account trails margins left_most_line = get_first_vert_line(image_name=image_name) if left_most_line is not None: left_most_point = bboxes.x1.min() - min( left_most_line[0], left_most_line[1] ) else: left_most_point = bboxes.x1.min() - (max_width / 2) i = 1 while left_most_point > i * 1.1 * max_width: inc_labels[0][1] += 1 i += 1 # Handle the next labels prev_min_min = bboxes[bboxes.col == 0].x2.max() for label in labels_unique[1:]: current_label_contours = bboxes[bboxes.col == label] max_width = current_label_contours.max_size.max() min_left = current_label_contours.x1.min() i = 1 while min_left - prev_min_min > i * 1.1 * max_width: inc_labels[label][1] += 1 i += 1 prev_min_min = min_left + max_width for pos, inc in reversed(inc_labels): labels[labels >= pos] += inc bboxes["col"] = labels labels_unique = np.unique(labels) bboxes["col"] += 1 return bboxes.sort_values(["row", "col"]) def index_plate( self, image_name, score_threshold=0.90, ar_threshold=1.5, size_threshold=0.50, ): bboxes = self.prepare_bboxes( image_name=image_name, score_threshold=score_threshold, ar_threshold=ar_threshold, size_threshold=size_threshold, ) if bboxes.shape[0] == 0: return bboxes bboxes = self.init_cols(bboxes=bboxes) bboxes = self.finalize_indexing(bboxes=bboxes, image_name=image_name) return bboxes def test_augmentations( df, image_size, kinds: list = ["resize", "train"], row_count=2, col_count=4, **aug_params, ): src_dataset = LeafDiskDetectorDataset( csv=df, transform=get_augmentations( image_size=image_size, kinds=["resize"], **aug_params ), ) test_dataset = LeafDiskDetectorDataset( csv=df, transform=get_augmentations(image_size=image_size, kinds=kinds, **aug_params), ) image_name = df.sample(n=1).iloc[0].plate_name images = [(src_dataset.draw_image_with_boxes(plate_name=image_name), "Source")] + [ (test_dataset.draw_image_with_boxes(plate_name=image_name), "Augmented") for i in range(row_count * col_count - 1) ] make_patches_grid( images=images, row_count=row_count, col_count=col_count, figsize=(col_count * 4, row_count * 3), ) def get_file_path_from_row(row, path_to_patches: Path): return path_to_patches.joinpath(row.file_name) def get_fast_images( row, path_to_patches, percent_radius: float = 1.0, add_process_image: bool = False ): d = {} try: d["leaf_disc_box"] = get_leaf_disk_wbb( row.file_name, row, image_path=get_file_path_from_row(row, path_to_patches) ) except: pass try: d["segmented_leaf_disc"] = get_fast_segment_disk( image_name=row.file_name, bboxes=row, percent_radius=percent_radius, image_path=get_file_path_from_row(row, path_to_patches), ) except: pass try: d["leaf_disc_patch"] = get_fast_leaf_disk_patch( image_name=row.file_name, bboxes=row, percent_radius=percent_radius, image_path=get_file_path_from_row(row, path_to_patches), ) except: pass if add_process_image is True: try: d["process_image"] = draw_fast_bb_to_patch_process( image_name=row.file_name, bboxes=row, percent_radius=percent_radius, image_path=get_file_path_from_row(row, path_to_patches), ) except: pass return d def save_images(row: pd.Series, images_data: dict, errors: dict, paths: dict): fn = f"{Path(row.file_name).stem}_{row.row}_{int(row.col)}.png" for k, image in images_data.items(): if k not in paths: continue path_to_image = paths[k].joinpath(fn) if image is not None: if path_to_image.is_file() is False: cv2.imwrite(str(path_to_image), cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) elif errors is not None: errors[k].append(row.file_name) else: pass def handle_bbox( row: pd.Series, paths: dict, errors: dict = None, percent_radius: float = 1.0, add_process_image: bool = False, ): save_images( row=row, images_data=get_fast_images( row=row, percent_radius=percent_radius, add_process_image=add_process_image, path_to_patches=paths["plates"], ), errors=errors, paths=paths, )