| import os |
| from collections import defaultdict |
| from typing import Any |
|
|
| import cv2 |
| import matplotlib.pyplot as plt |
| import numpy as np |
|
|
| import src.constants as constants |
| from src.logger import logger |
| from src.utils.image import CLAHE_HELPER, ImageUtils |
| from src.utils.interaction import InteractionUtils |
|
|
|
|
| class ImageInstanceOps: |
| """Class to hold fine-tuned utilities for a group of images. One instance for each processing directory.""" |
|
|
| save_img_list: Any = defaultdict(list) |
|
|
| def __init__(self, tuning_config): |
| super().__init__() |
| self.tuning_config = tuning_config |
| self.save_image_level = tuning_config.outputs.save_image_level |
|
|
| def apply_preprocessors(self, file_path, in_omr, template): |
| tuning_config = self.tuning_config |
| |
| in_omr = ImageUtils.resize_util( |
| in_omr, |
| tuning_config.dimensions.processing_width, |
| tuning_config.dimensions.processing_height, |
| ) |
|
|
| |
| for pre_processor in template.pre_processors: |
| in_omr = pre_processor.apply_filter(in_omr, file_path) |
| return in_omr |
|
|
| def read_omr_response(self, template, image, name, save_dir=None): |
| config = self.tuning_config |
| auto_align = config.alignment_params.auto_align |
| try: |
| img = image.copy() |
| |
| img = ImageUtils.resize_util( |
| img, template.page_dimensions[0], template.page_dimensions[1] |
| ) |
| if img.max() > img.min(): |
| img = ImageUtils.normalize_util(img) |
| |
| transp_layer = img.copy() |
| final_marked = img.copy() |
|
|
| morph = img.copy() |
| self.append_save_img(3, morph) |
|
|
| if auto_align: |
| |
| morph = CLAHE_HELPER.apply(morph) |
| self.append_save_img(3, morph) |
| |
| morph = ImageUtils.adjust_gamma( |
| morph, config.threshold_params.GAMMA_LOW |
| ) |
| |
| _, morph = cv2.threshold(morph, 220, 220, cv2.THRESH_TRUNC) |
| morph = ImageUtils.normalize_util(morph) |
| self.append_save_img(3, morph) |
| if config.outputs.show_image_level >= 4: |
| InteractionUtils.show("morph1", morph, 0, 1, config) |
|
|
| |
| |
| alpha = 0.65 |
| omr_response = {} |
| multi_marked, multi_roll = 0, 0 |
|
|
| |
| |
| |
|
|
| if config.outputs.show_image_level >= 5: |
| all_c_box_vals = {"int": [], "mcq": []} |
| |
| q_nums = {"int": [], "mcq": []} |
|
|
| |
| if auto_align: |
| |
| |
| v_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 10)) |
| morph_v = cv2.morphologyEx( |
| morph, cv2.MORPH_OPEN, v_kernel, iterations=3 |
| ) |
| _, morph_v = cv2.threshold(morph_v, 200, 200, cv2.THRESH_TRUNC) |
| morph_v = 255 - ImageUtils.normalize_util(morph_v) |
|
|
| if config.outputs.show_image_level >= 3: |
| InteractionUtils.show( |
| "morphed_vertical", morph_v, 0, 1, config=config |
| ) |
|
|
| |
| |
|
|
| self.append_save_img(3, morph_v) |
|
|
| morph_thr = 60 |
| _, morph_v = cv2.threshold(morph_v, morph_thr, 255, cv2.THRESH_BINARY) |
| |
| morph_v = cv2.erode(morph_v, np.ones((5, 5), np.uint8), iterations=2) |
|
|
| self.append_save_img(3, morph_v) |
| |
| |
| |
| |
| |
| |
| |
| if config.outputs.show_image_level >= 3: |
| InteractionUtils.show( |
| "morph_thr_eroded", morph_v, 0, 1, config=config |
| ) |
|
|
| self.append_save_img(6, morph_v) |
|
|
| |
| for field_block in template.field_blocks: |
| s, d = field_block.origin, field_block.dimensions |
|
|
| match_col, max_steps, align_stride, thk = map( |
| config.alignment_params.get, |
| [ |
| "match_col", |
| "max_steps", |
| "stride", |
| "thickness", |
| ], |
| ) |
| shift, steps = 0, 0 |
| while steps < max_steps: |
| left_mean = np.mean( |
| morph_v[ |
| s[1] : s[1] + d[1], |
| s[0] + shift - thk : -thk + s[0] + shift + match_col, |
| ] |
| ) |
| right_mean = np.mean( |
| morph_v[ |
| s[1] : s[1] + d[1], |
| s[0] |
| + shift |
| - match_col |
| + d[0] |
| + thk : thk |
| + s[0] |
| + shift |
| + d[0], |
| ] |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| left_shift, right_shift = left_mean > 100, right_mean > 100 |
| if left_shift: |
| if right_shift: |
| break |
| else: |
| shift -= align_stride |
| else: |
| if right_shift: |
| shift += align_stride |
| else: |
| break |
| steps += 1 |
|
|
| field_block.shift = shift |
| |
| |
| |
| |
|
|
| final_align = None |
| if config.outputs.show_image_level >= 2: |
| initial_align = self.draw_template_layout(img, template, shifted=False) |
| final_align = self.draw_template_layout( |
| img, template, shifted=True, draw_qvals=True |
| ) |
| |
| self.append_save_img(2, initial_align) |
| self.append_save_img(2, final_align) |
|
|
| if auto_align: |
| final_align = np.hstack((initial_align, final_align)) |
| self.append_save_img(5, img) |
|
|
| |
| all_q_vals, all_q_strip_arrs, all_q_std_vals = [], [], [] |
| total_q_strip_no = 0 |
| for field_block in template.field_blocks: |
| box_w, box_h = field_block.bubble_dimensions |
| q_std_vals = [] |
| for field_block_bubbles in field_block.traverse_bubbles: |
| q_strip_vals = [] |
| for pt in field_block_bubbles: |
| |
| x, y = (pt.x + field_block.shift, pt.y) |
| rect = [y, y + box_h, x, x + box_w] |
| q_strip_vals.append( |
| cv2.mean(img[rect[0] : rect[1], rect[2] : rect[3]])[0] |
| |
| ) |
| q_std_vals.append(round(np.std(q_strip_vals), 2)) |
| all_q_strip_arrs.append(q_strip_vals) |
| |
| |
| |
| |
| all_q_vals.extend(q_strip_vals) |
| |
| total_q_strip_no += 1 |
| all_q_std_vals.extend(q_std_vals) |
|
|
| global_std_thresh, _, _ = self.get_global_threshold( |
| all_q_std_vals |
| ) |
| |
| |
| |
|
|
| |
| |
| |
| global_thr, _, _ = self.get_global_threshold(all_q_vals, looseness=4) |
|
|
| logger.info( |
| f"Thresholding: \tglobal_thr: {round(global_thr, 2)} \tglobal_std_THR: {round(global_std_thresh, 2)}\t{'(Looks like a Xeroxed OMR)' if (global_thr == 255) else ''}" |
| ) |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| per_omr_threshold_avg, total_q_strip_no, total_q_box_no = 0, 0, 0 |
| for field_block in template.field_blocks: |
| block_q_strip_no = 1 |
| box_w, box_h = field_block.bubble_dimensions |
| shift = field_block.shift |
| s, d = field_block.origin, field_block.dimensions |
| key = field_block.name[:3] |
| |
| |
| for field_block_bubbles in field_block.traverse_bubbles: |
| |
| no_outliers = all_q_std_vals[total_q_strip_no] < global_std_thresh |
| |
| |
| per_q_strip_threshold = self.get_local_threshold( |
| all_q_strip_arrs[total_q_strip_no], |
| global_thr, |
| no_outliers, |
| f"Mean Intensity Histogram for {key}.{field_block_bubbles[0].field_label}.{block_q_strip_no}", |
| config.outputs.show_image_level >= 6, |
| ) |
| |
| |
| per_omr_threshold_avg += per_q_strip_threshold |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| detected_bubbles = [] |
| for bubble in field_block_bubbles: |
| bubble_is_marked = ( |
| per_q_strip_threshold > all_q_vals[total_q_box_no] |
| ) |
| total_q_box_no += 1 |
| if bubble_is_marked: |
| detected_bubbles.append(bubble) |
| x, y, field_value = ( |
| bubble.x + field_block.shift, |
| bubble.y, |
| bubble.field_value, |
| ) |
| cv2.rectangle( |
| final_marked, |
| (int(x + box_w / 12), int(y + box_h / 12)), |
| ( |
| int(x + box_w - box_w / 12), |
| int(y + box_h - box_h / 12), |
| ), |
| constants.CLR_DARK_GRAY, |
| 3, |
| ) |
|
|
| cv2.putText( |
| final_marked, |
| str(field_value), |
| (x, y), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| constants.TEXT_SIZE, |
| (20, 20, 10), |
| int(1 + 3.5 * constants.TEXT_SIZE), |
| ) |
| else: |
| cv2.rectangle( |
| final_marked, |
| (int(x + box_w / 10), int(y + box_h / 10)), |
| ( |
| int(x + box_w - box_w / 10), |
| int(y + box_h - box_h / 10), |
| ), |
| constants.CLR_GRAY, |
| -1, |
| ) |
|
|
| for bubble in detected_bubbles: |
| field_label, field_value = ( |
| bubble.field_label, |
| bubble.field_value, |
| ) |
| |
| multi_marked_local = field_label in omr_response |
| omr_response[field_label] = ( |
| (omr_response[field_label] + field_value) |
| if multi_marked_local |
| else field_value |
| ) |
| |
| |
| multi_marked = multi_marked or multi_marked_local |
|
|
| if len(detected_bubbles) == 0: |
| field_label = field_block_bubbles[0].field_label |
| omr_response[field_label] = field_block.empty_val |
|
|
| if config.outputs.show_image_level >= 5: |
| if key in all_c_box_vals: |
| q_nums[key].append(f"{key[:2]}_c{str(block_q_strip_no)}") |
| all_c_box_vals[key].append( |
| all_q_strip_arrs[total_q_strip_no] |
| ) |
|
|
| block_q_strip_no += 1 |
| total_q_strip_no += 1 |
| |
|
|
| per_omr_threshold_avg /= total_q_strip_no |
| per_omr_threshold_avg = round(per_omr_threshold_avg, 2) |
| |
| cv2.addWeighted( |
| final_marked, alpha, transp_layer, 1 - alpha, 0, final_marked |
| ) |
| |
| if config.outputs.show_image_level >= 6: |
| |
| f, axes = plt.subplots(len(all_c_box_vals), sharey=True) |
| f.canvas.manager.set_window_title(name) |
| ctr = 0 |
| type_name = { |
| "int": "Integer", |
| "mcq": "MCQ", |
| "med": "MED", |
| "rol": "Roll", |
| } |
| for k, boxvals in all_c_box_vals.items(): |
| axes[ctr].title.set_text(type_name[k] + " Type") |
| axes[ctr].boxplot(boxvals) |
| |
| |
| axes[ctr].set_ylabel("Intensity") |
| axes[ctr].set_xticklabels(q_nums[k]) |
| |
| ctr += 1 |
| |
| plt.tight_layout(pad=0.5) |
| plt.show() |
|
|
| if config.outputs.show_image_level >= 3 and final_align is not None: |
| final_align = ImageUtils.resize_util_h( |
| final_align, int(config.dimensions.display_height) |
| ) |
| |
| InteractionUtils.show( |
| "Template Alignment Adjustment", final_align, 0, 0, config=config |
| ) |
|
|
| if config.outputs.save_detections and save_dir is not None: |
| if multi_roll: |
| save_dir = save_dir.joinpath("_MULTI_") |
| image_path = str(save_dir.joinpath(name)) |
| ImageUtils.save_img(image_path, final_marked) |
|
|
| self.append_save_img(2, final_marked) |
|
|
| if save_dir is not None: |
| for i in range(config.outputs.save_image_level): |
| self.save_image_stacks(i + 1, name, save_dir) |
|
|
| return omr_response, final_marked, multi_marked, multi_roll |
|
|
| except Exception as e: |
| raise e |
|
|
| @staticmethod |
| def draw_template_layout(img, template, shifted=True, draw_qvals=False, border=-1): |
| img = ImageUtils.resize_util( |
| img, template.page_dimensions[0], template.page_dimensions[1] |
| ) |
| final_align = img.copy() |
| for field_block in template.field_blocks: |
| s, d = field_block.origin, field_block.dimensions |
| box_w, box_h = field_block.bubble_dimensions |
| shift = field_block.shift |
| if shifted: |
| cv2.rectangle( |
| final_align, |
| (s[0] + shift, s[1]), |
| (s[0] + shift + d[0], s[1] + d[1]), |
| constants.CLR_BLACK, |
| 3, |
| ) |
| else: |
| cv2.rectangle( |
| final_align, |
| (s[0], s[1]), |
| (s[0] + d[0], s[1] + d[1]), |
| constants.CLR_BLACK, |
| 3, |
| ) |
| for field_block_bubbles in field_block.traverse_bubbles: |
| for pt in field_block_bubbles: |
| x, y = (pt.x + field_block.shift, pt.y) if shifted else (pt.x, pt.y) |
| cv2.rectangle( |
| final_align, |
| (int(x + box_w / 10), int(y + box_h / 10)), |
| (int(x + box_w - box_w / 10), int(y + box_h - box_h / 10)), |
| constants.CLR_GRAY, |
| border, |
| ) |
| if draw_qvals: |
| rect = [y, y + box_h, x, x + box_w] |
| cv2.putText( |
| final_align, |
| f"{int(cv2.mean(img[rect[0] : rect[1], rect[2] : rect[3]])[0])}", |
| (rect[2] + 2, rect[0] + (box_h * 2) // 3), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, |
| constants.CLR_BLACK, |
| 2, |
| ) |
| if shifted: |
| text_in_px = cv2.getTextSize( |
| field_block.name, cv2.FONT_HERSHEY_SIMPLEX, constants.TEXT_SIZE, 4 |
| ) |
| cv2.putText( |
| final_align, |
| field_block.name, |
| (int(s[0] + d[0] - text_in_px[0][0]), int(s[1] - text_in_px[0][1])), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| constants.TEXT_SIZE, |
| constants.CLR_BLACK, |
| 4, |
| ) |
| return final_align |
|
|
| def get_global_threshold( |
| self, |
| q_vals_orig, |
| plot_title=None, |
| plot_show=True, |
| sort_in_plot=True, |
| looseness=1, |
| ): |
| """ |
| Note: Cannot assume qStrip has only-gray or only-white bg |
| (in which case there is only one jump). |
| So there will be either 1 or 2 jumps. |
| 1 Jump : |
| ...... |
| |||||| |
| |||||| <-- risky THR |
| |||||| <-- safe THR |
| ....|||||| |
| |||||||||| |
| |
| 2 Jumps : |
| ...... |
| |||||| <-- wrong THR |
| ....|||||| |
| |||||||||| <-- safe THR |
| ..|||||||||| |
| |||||||||||| |
| |
| The abstract "First LARGE GAP" is perfect for this. |
| Current code is considering ONLY TOP 2 jumps(>= MIN_GAP) to be big, |
| gives the smaller one |
| |
| """ |
| config = self.tuning_config |
| PAGE_TYPE_FOR_THRESHOLD, MIN_JUMP, JUMP_DELTA = map( |
| config.threshold_params.get, |
| [ |
| "PAGE_TYPE_FOR_THRESHOLD", |
| "MIN_JUMP", |
| "JUMP_DELTA", |
| ], |
| ) |
|
|
| global_default_threshold = ( |
| constants.GLOBAL_PAGE_THRESHOLD_WHITE |
| if PAGE_TYPE_FOR_THRESHOLD == "white" |
| else constants.GLOBAL_PAGE_THRESHOLD_BLACK |
| ) |
|
|
| |
| |
| q_vals = sorted(q_vals_orig) |
| |
| ls = (looseness + 1) // 2 |
| l = len(q_vals) - ls |
| max1, thr1 = MIN_JUMP, global_default_threshold |
| for i in range(ls, l): |
| jump = q_vals[i + ls] - q_vals[i - ls] |
| if jump > max1: |
| max1 = jump |
| thr1 = q_vals[i - ls] + jump / 2 |
|
|
| |
| |
| |
| max2, thr2 = MIN_JUMP, global_default_threshold |
| |
| for i in range(ls, l): |
| jump = q_vals[i + ls] - q_vals[i - ls] |
| new_thr = q_vals[i - ls] + jump / 2 |
| if jump > max2 and abs(thr1 - new_thr) > JUMP_DELTA: |
| max2 = jump |
| thr2 = new_thr |
| |
| global_thr, j_low, j_high = thr1, thr1 - max1 // 2, thr1 + max1 // 2 |
|
|
| |
| |
| |
| |
| |
|
|
| if plot_title: |
| _, ax = plt.subplots() |
| ax.bar(range(len(q_vals_orig)), q_vals if sort_in_plot else q_vals_orig) |
| ax.set_title(plot_title) |
| thrline = ax.axhline(global_thr, color="green", ls="--", linewidth=5) |
| thrline.set_label("Global Threshold") |
| thrline = ax.axhline(thr2, color="red", ls=":", linewidth=3) |
| thrline.set_label("THR2 Line") |
| |
| |
| |
| |
| ax.set_ylabel("Values") |
| ax.set_xlabel("Position") |
| ax.legend() |
| if plot_show: |
| plt.title(plot_title) |
| plt.show() |
|
|
| return global_thr, j_low, j_high |
|
|
| def get_local_threshold( |
| self, q_vals, global_thr, no_outliers, plot_title=None, plot_show=True |
| ): |
| """ |
| TODO: Update this documentation too- |
| //No more - Assumption : Colwise background color is uniformly gray or white, |
| but not alternating. In this case there is atmost one jump. |
| |
| 0 Jump : |
| <-- safe THR? |
| ....... |
| ...||||||| |
| |||||||||| <-- safe THR? |
| // How to decide given range is above or below gray? |
| -> global q_vals shall absolutely help here. Just run same function |
| on total q_vals instead of colwise _// |
| How to decide it is this case of 0 jumps |
| |
| 1 Jump : |
| ...... |
| |||||| |
| |||||| <-- risky THR |
| |||||| <-- safe THR |
| ....|||||| |
| |||||||||| |
| |
| """ |
| config = self.tuning_config |
| |
| q_vals = sorted(q_vals) |
|
|
| |
| |
| if len(q_vals) < 3: |
| thr1 = ( |
| global_thr |
| if np.max(q_vals) - np.min(q_vals) < config.threshold_params.MIN_GAP |
| else np.mean(q_vals) |
| ) |
| else: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| l = len(q_vals) - 1 |
| max1, thr1 = config.threshold_params.MIN_JUMP, 255 |
| for i in range(1, l): |
| jump = q_vals[i + 1] - q_vals[i - 1] |
| if jump > max1: |
| max1 = jump |
| thr1 = q_vals[i - 1] + jump / 2 |
| |
|
|
| confident_jump = ( |
| config.threshold_params.MIN_JUMP |
| + config.threshold_params.CONFIDENT_SURPLUS |
| ) |
| |
| if max1 < confident_jump: |
| if no_outliers: |
| |
| thr1 = global_thr |
| else: |
| |
| pass |
|
|
| |
| |
|
|
| |
| if plot_show and plot_title is not None: |
| _, ax = plt.subplots() |
| ax.bar(range(len(q_vals)), q_vals) |
| thrline = ax.axhline(thr1, color="green", ls=("-."), linewidth=3) |
| thrline.set_label("Local Threshold") |
| thrline = ax.axhline(global_thr, color="red", ls=":", linewidth=5) |
| thrline.set_label("Global Threshold") |
| ax.set_title(plot_title) |
| ax.set_ylabel("Bubble Mean Intensity") |
| ax.set_xlabel("Bubble Number(sorted)") |
| ax.legend() |
| |
| |
| if plot_show: |
| plt.show() |
| return thr1 |
|
|
| def append_save_img(self, key, img): |
| if self.save_image_level >= int(key): |
| self.save_img_list[key].append(img.copy()) |
|
|
| def save_image_stacks(self, key, filename, save_dir): |
| config = self.tuning_config |
| if self.save_image_level >= int(key) and self.save_img_list[key] != []: |
| name = os.path.splitext(filename)[0] |
| result = np.hstack( |
| tuple( |
| [ |
| ImageUtils.resize_util_h(img, config.dimensions.display_height) |
| for img in self.save_img_list[key] |
| ] |
| ) |
| ) |
| result = ImageUtils.resize_util( |
| result, |
| min( |
| len(self.save_img_list[key]) * config.dimensions.display_width // 3, |
| int(config.dimensions.display_width * 2.5), |
| ), |
| ) |
| ImageUtils.save_img(f"{save_dir}stack/{name}_{str(key)}_stack.jpg", result) |
|
|
| def reset_all_save_img(self): |
| for i in range(self.save_image_level): |
| self.save_img_list[i + 1] = [] |
|
|