| import ast |
| import os |
| import re |
| from copy import deepcopy |
| from csv import QUOTE_NONNUMERIC |
|
|
| import cv2 |
| import pandas as pd |
| from rich.table import Table |
|
|
| from src.logger import console, logger |
| from src.schemas.constants import ( |
| BONUS_SECTION_PREFIX, |
| DEFAULT_SECTION_KEY, |
| MARKING_VERDICT_TYPES, |
| ) |
| from src.utils.parsing import ( |
| get_concatenated_response, |
| open_evaluation_with_validation, |
| parse_fields, |
| parse_float_or_fraction, |
| ) |
|
|
|
|
| class AnswerMatcher: |
| def __init__(self, answer_item, section_marking_scheme): |
| self.section_marking_scheme = section_marking_scheme |
| self.answer_item = answer_item |
| self.answer_type = self.validate_and_get_answer_type(answer_item) |
| self.set_defaults_from_scheme(section_marking_scheme) |
|
|
| @staticmethod |
| def is_a_marking_score(answer_element): |
| |
| |
| return type(answer_element) == str or type(answer_element) == int |
|
|
| @staticmethod |
| def is_standard_answer(answer_element): |
| return type(answer_element) == str and len(answer_element) >= 1 |
|
|
| def validate_and_get_answer_type(self, answer_item): |
| if self.is_standard_answer(answer_item): |
| return "standard" |
| elif type(answer_item) == list: |
| if ( |
| |
| len(answer_item) >= 2 |
| and all( |
| self.is_standard_answer(answers_or_score) |
| for answers_or_score in answer_item |
| ) |
| ): |
| return "multiple-correct" |
| elif ( |
| |
| len(answer_item) >= 1 |
| and all( |
| type(answer_and_score) == list and len(answer_and_score) == 2 |
| for answer_and_score in answer_item |
| ) |
| and all( |
| self.is_standard_answer(allowed_answer) |
| and self.is_a_marking_score(answer_score) |
| for allowed_answer, answer_score in answer_item |
| ) |
| ): |
| return "multiple-correct-weighted" |
|
|
| logger.critical( |
| f"Unable to determine answer type for answer item: {answer_item}" |
| ) |
| raise Exception("Unable to determine answer type") |
|
|
| def set_defaults_from_scheme(self, section_marking_scheme): |
| answer_type = self.answer_type |
| self.empty_val = section_marking_scheme.empty_val |
| answer_item = self.answer_item |
| self.marking = deepcopy(section_marking_scheme.marking) |
| |
| if answer_type == "standard": |
| |
| pass |
| elif answer_type == "multiple-correct": |
| |
| for allowed_answer in answer_item: |
| self.marking[f"correct-{allowed_answer}"] = self.marking["correct"] |
| elif answer_type == "multiple-correct-weighted": |
| |
| for allowed_answer, answer_score in answer_item: |
| self.marking[f"correct-{allowed_answer}"] = parse_float_or_fraction( |
| answer_score |
| ) |
|
|
| def get_marking_scheme(self): |
| return self.section_marking_scheme |
|
|
| def get_section_explanation(self): |
| answer_type = self.answer_type |
| if answer_type in ["standard", "multiple-correct"]: |
| return self.section_marking_scheme.section_key |
| elif answer_type == "multiple-correct-weighted": |
| return f"Custom: {self.marking}" |
|
|
| def get_verdict_marking(self, marked_answer): |
| answer_type = self.answer_type |
| question_verdict = "incorrect" |
| if answer_type == "standard": |
| question_verdict = self.get_standard_verdict(marked_answer) |
| elif answer_type == "multiple-correct": |
| question_verdict = self.get_multiple_correct_verdict(marked_answer) |
| elif answer_type == "multiple-correct-weighted": |
| question_verdict = self.get_multiple_correct_weighted_verdict(marked_answer) |
| return question_verdict, self.marking[question_verdict] |
|
|
| def get_standard_verdict(self, marked_answer): |
| allowed_answer = self.answer_item |
| if marked_answer == self.empty_val: |
| return "unmarked" |
| elif marked_answer == allowed_answer: |
| return "correct" |
| else: |
| return "incorrect" |
|
|
| def get_multiple_correct_verdict(self, marked_answer): |
| allowed_answers = self.answer_item |
| if marked_answer == self.empty_val: |
| return "unmarked" |
| elif marked_answer in allowed_answers: |
| return f"correct-{marked_answer}" |
| else: |
| return "incorrect" |
|
|
| def get_multiple_correct_weighted_verdict(self, marked_answer): |
| allowed_answers = [ |
| allowed_answer for allowed_answer, _answer_score in self.answer_item |
| ] |
| if marked_answer == self.empty_val: |
| return "unmarked" |
| elif marked_answer in allowed_answers: |
| return f"correct-{marked_answer}" |
| else: |
| return "incorrect" |
|
|
| def __str__(self): |
| return f"{self.answer_item}" |
|
|
|
|
| class SectionMarkingScheme: |
| def __init__(self, section_key, section_scheme, empty_val): |
| |
| self.empty_val = empty_val |
| self.section_key = section_key |
| |
| if section_key == DEFAULT_SECTION_KEY: |
| self.questions = None |
| self.marking = self.parse_scheme_marking(section_scheme) |
| else: |
| self.questions = parse_fields(section_key, section_scheme["questions"]) |
| self.marking = self.parse_scheme_marking(section_scheme["marking"]) |
|
|
| def __str__(self): |
| return self.section_key |
|
|
| def parse_scheme_marking(self, marking): |
| parsed_marking = {} |
| for verdict_type in MARKING_VERDICT_TYPES: |
| verdict_marking = parse_float_or_fraction(marking[verdict_type]) |
| if ( |
| verdict_marking > 0 |
| and verdict_type == "incorrect" |
| and not self.section_key.startswith(BONUS_SECTION_PREFIX) |
| ): |
| logger.warning( |
| f"Found positive marks({round(verdict_marking, 2)}) for incorrect answer in the schema '{self.section_key}'. For Bonus sections, add a prefix 'BONUS_' to them." |
| ) |
| parsed_marking[verdict_type] = verdict_marking |
|
|
| return parsed_marking |
|
|
| def match_answer(self, marked_answer, answer_matcher): |
| question_verdict, verdict_marking = answer_matcher.get_verdict_marking( |
| marked_answer |
| ) |
|
|
| return verdict_marking, question_verdict |
|
|
|
|
| class EvaluationConfig: |
| """Note: this instance will be reused for multiple omr sheets""" |
|
|
| def __init__(self, curr_dir, evaluation_path, template, tuning_config): |
| self.path = evaluation_path |
| evaluation_json = open_evaluation_with_validation(evaluation_path) |
| options, marking_schemes, source_type = map( |
| evaluation_json.get, ["options", "marking_schemes", "source_type"] |
| ) |
| self.should_explain_scoring = options.get("should_explain_scoring", False) |
| self.has_non_default_section = False |
| self.exclude_files = [] |
| self.enable_evaluation_table_to_csv = options.get( |
| "enable_evaluation_table_to_csv", False |
| ) |
|
|
| if source_type == "csv": |
| csv_path = curr_dir.joinpath(options["answer_key_csv_path"]) |
| if not os.path.exists(csv_path): |
| logger.warning(f"Answer key csv does not exist at: '{csv_path}'.") |
|
|
| answer_key_image_path = options.get("answer_key_image_path", None) |
| if os.path.exists(csv_path): |
| |
| answer_key = pd.read_csv( |
| csv_path, |
| header=None, |
| names=["question", "answer"], |
| converters={"question": str, "answer": self.parse_answer_column}, |
| ) |
|
|
| self.questions_in_order = answer_key["question"].to_list() |
| answers_in_order = answer_key["answer"].to_list() |
| elif not answer_key_image_path: |
| raise Exception(f"Answer key csv not found at '{csv_path}'") |
| else: |
| image_path = str(curr_dir.joinpath(answer_key_image_path)) |
| if not os.path.exists(image_path): |
| raise Exception(f"Answer key image not found at '{image_path}'") |
|
|
| |
|
|
| logger.debug( |
| f"Attempting to generate answer key from image: '{image_path}'" |
| ) |
| |
| in_omr = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
| in_omr = template.image_instance_ops.apply_preprocessors( |
| image_path, in_omr, template |
| ) |
| if in_omr is None: |
| raise Exception( |
| f"Could not read answer key from image {image_path}" |
| ) |
| ( |
| response_dict, |
| _final_marked, |
| _multi_marked, |
| _multi_roll, |
| ) = template.image_instance_ops.read_omr_response( |
| template, |
| image=in_omr, |
| name=image_path, |
| save_dir=None, |
| ) |
| omr_response = get_concatenated_response(response_dict, template) |
|
|
| empty_val = template.global_empty_val |
| empty_answer_regex = ( |
| rf"{re.escape(empty_val)}+" if empty_val != "" else r"^$" |
| ) |
|
|
| if "questions_in_order" in options: |
| self.questions_in_order = self.parse_questions_in_order( |
| options["questions_in_order"] |
| ) |
| empty_answered_questions = [ |
| question |
| for question in self.questions_in_order |
| if re.search(empty_answer_regex, omr_response[question]) |
| ] |
| if len(empty_answered_questions) > 0: |
| logger.error( |
| f"Found empty answers for questions: {empty_answered_questions}, empty value used: '{empty_val}'" |
| ) |
| raise Exception( |
| f"Found empty answers in file '{image_path}'. Please check your template again in the --setLayout mode." |
| ) |
| else: |
| logger.warning( |
| f"questions_in_order not provided, proceeding to use non-empty values as answer key" |
| ) |
| self.questions_in_order = sorted( |
| question |
| for (question, answer) in omr_response.items() |
| if not re.search(empty_answer_regex, answer) |
| ) |
| answers_in_order = [ |
| omr_response[question] for question in self.questions_in_order |
| ] |
| |
| else: |
| self.questions_in_order = self.parse_questions_in_order( |
| options["questions_in_order"] |
| ) |
| answers_in_order = options["answers_in_order"] |
|
|
| self.validate_questions(answers_in_order) |
|
|
| self.section_marking_schemes, self.question_to_scheme = {}, {} |
| for section_key, section_scheme in marking_schemes.items(): |
| section_marking_scheme = SectionMarkingScheme( |
| section_key, section_scheme, template.global_empty_val |
| ) |
| if section_key != DEFAULT_SECTION_KEY: |
| self.section_marking_schemes[section_key] = section_marking_scheme |
| for q in section_marking_scheme.questions: |
| |
| self.question_to_scheme[q] = section_marking_scheme |
| self.has_non_default_section = True |
| else: |
| self.default_marking_scheme = section_marking_scheme |
|
|
| self.validate_marking_schemes() |
|
|
| self.question_to_answer_matcher = self.parse_answers_and_map_questions( |
| answers_in_order |
| ) |
| self.validate_answers(answers_in_order, tuning_config) |
|
|
| def __str__(self): |
| return str(self.path) |
|
|
| |
| def prepare_and_validate_omr_response(self, omr_response): |
| self.reset_explanation_table() |
|
|
| omr_response_questions = set(omr_response.keys()) |
| all_questions = set(self.questions_in_order) |
| missing_questions = sorted(all_questions.difference(omr_response_questions)) |
| if len(missing_questions) > 0: |
| logger.critical(f"Missing OMR response for: {missing_questions}") |
| raise Exception( |
| f"Some questions are missing in the OMR response for the given answer key" |
| ) |
|
|
| prefixed_omr_response_questions = set( |
| [k for k in omr_response.keys() if k.startswith("q")] |
| ) |
| missing_prefixed_questions = sorted( |
| prefixed_omr_response_questions.difference(all_questions) |
| ) |
| if len(missing_prefixed_questions) > 0: |
| logger.warning( |
| f"No answer given for potential questions in OMR response: {missing_prefixed_questions}" |
| ) |
|
|
| def match_answer_for_question(self, current_score, question, marked_answer): |
| answer_matcher = self.question_to_answer_matcher[question] |
| question_verdict, delta = answer_matcher.get_verdict_marking(marked_answer) |
| self.conditionally_add_explanation( |
| answer_matcher, |
| delta, |
| marked_answer, |
| question_verdict, |
| question, |
| current_score, |
| ) |
| return delta |
|
|
| def conditionally_print_explanation(self): |
| if self.should_explain_scoring: |
| console.print(self.explanation_table, justify="center") |
|
|
| |
| def conditionally_save_explanation_csv(self, file_path, evaluation_output_dir): |
| if self.enable_evaluation_table_to_csv: |
| data = {col.header: col._cells for col in self.explanation_table.columns} |
|
|
| output_path = os.path.join( |
| evaluation_output_dir, |
| f"{file_path.stem}_evaluation.csv", |
| ) |
|
|
| pd.DataFrame(data, dtype=str).to_csv( |
| output_path, |
| mode="a", |
| quoting=QUOTE_NONNUMERIC, |
| index=False, |
| ) |
|
|
| def get_should_explain_scoring(self): |
| return self.should_explain_scoring |
|
|
| def get_exclude_files(self): |
| return self.exclude_files |
|
|
| @staticmethod |
| def parse_answer_column(answer_column): |
| if answer_column[0] == "[": |
| |
| parsed_answer = ast.literal_eval(answer_column) |
| elif "," in answer_column: |
| |
| parsed_answer = answer_column.split(",") |
| else: |
| |
| parsed_answer = answer_column |
| return parsed_answer |
|
|
| def parse_questions_in_order(self, questions_in_order): |
| return parse_fields("questions_in_order", questions_in_order) |
|
|
| def validate_answers(self, answers_in_order, tuning_config): |
| answer_matcher_map = self.question_to_answer_matcher |
| if tuning_config.outputs.filter_out_multimarked_files: |
| multi_marked_answer = False |
| for question, answer_item in zip(self.questions_in_order, answers_in_order): |
| answer_type = answer_matcher_map[question].answer_type |
| if answer_type == "standard": |
| if len(answer_item) > 1: |
| multi_marked_answer = True |
| if answer_type == "multiple-correct": |
| for single_answer in answer_item: |
| if len(single_answer) > 1: |
| multi_marked_answer = True |
| break |
| if answer_type == "multiple-correct-weighted": |
| for single_answer, _answer_score in answer_item: |
| if len(single_answer) > 1: |
| multi_marked_answer = True |
|
|
| if multi_marked_answer: |
| raise Exception( |
| f"Provided answer key contains multiple correct answer(s), but config.filter_out_multimarked_files is True. Scoring will get skipped." |
| ) |
|
|
| def validate_questions(self, answers_in_order): |
| questions_in_order = self.questions_in_order |
| len_questions_in_order, len_answers_in_order = len(questions_in_order), len( |
| answers_in_order |
| ) |
| if len_questions_in_order != len_answers_in_order: |
| logger.critical( |
| f"questions_in_order({len_questions_in_order}): {questions_in_order}\nanswers_in_order({len_answers_in_order}): {answers_in_order}" |
| ) |
| raise Exception( |
| f"Unequal lengths for questions_in_order and answers_in_order ({len_questions_in_order} != {len_answers_in_order})" |
| ) |
|
|
| def validate_marking_schemes(self): |
| section_marking_schemes = self.section_marking_schemes |
| section_questions = set() |
| for section_key, section_scheme in section_marking_schemes.items(): |
| if section_key == DEFAULT_SECTION_KEY: |
| continue |
| current_set = set(section_scheme.questions) |
| if not section_questions.isdisjoint(current_set): |
| raise Exception( |
| f"Section '{section_key}' has overlapping question(s) with other sections" |
| ) |
| section_questions = section_questions.union(current_set) |
|
|
| all_questions = set(self.questions_in_order) |
| missing_questions = sorted(section_questions.difference(all_questions)) |
| if len(missing_questions) > 0: |
| logger.critical(f"Missing answer key for: {missing_questions}") |
| raise Exception( |
| f"Some questions are missing in the answer key for the given marking scheme" |
| ) |
|
|
| def parse_answers_and_map_questions(self, answers_in_order): |
| question_to_answer_matcher = {} |
| for question, answer_item in zip(self.questions_in_order, answers_in_order): |
| section_marking_scheme = self.get_marking_scheme_for_question(question) |
| answer_matcher = AnswerMatcher(answer_item, section_marking_scheme) |
| question_to_answer_matcher[question] = answer_matcher |
| if ( |
| answer_matcher.answer_type == "multiple-correct-weighted" |
| and section_marking_scheme.section_key != DEFAULT_SECTION_KEY |
| ): |
| logger.warning( |
| f"The custom scheme '{section_marking_scheme}' will not apply to question '{question}' as it will use the given answer weights f{answer_item}" |
| ) |
| return question_to_answer_matcher |
|
|
| |
| def reset_explanation_table(self): |
| self.explanation_table = None |
| self.prepare_explanation_table() |
|
|
| def prepare_explanation_table(self): |
| |
| if not self.should_explain_scoring: |
| return |
| table = Table(title="Evaluation Explanation Table", show_lines=True) |
| table.add_column("Question") |
| table.add_column("Marked") |
| table.add_column("Answer(s)") |
| table.add_column("Verdict") |
| table.add_column("Delta") |
| table.add_column("Score") |
| |
| if self.has_non_default_section: |
| table.add_column("Section") |
| self.explanation_table = table |
|
|
| def get_marking_scheme_for_question(self, question): |
| return self.question_to_scheme.get(question, self.default_marking_scheme) |
|
|
| def conditionally_add_explanation( |
| self, |
| answer_matcher, |
| delta, |
| marked_answer, |
| question_verdict, |
| question, |
| current_score, |
| ): |
| if self.should_explain_scoring: |
| next_score = current_score + delta |
| |
| row = [ |
| item |
| for item in [ |
| question, |
| marked_answer, |
| str(answer_matcher), |
| str.title(question_verdict), |
| str(round(delta, 2)), |
| str(round(next_score, 2)), |
| ( |
| answer_matcher.get_section_explanation() |
| if self.has_non_default_section |
| else None |
| ), |
| ] |
| if item is not None |
| ] |
| self.explanation_table.add_row(*row) |
|
|
|
|
| def evaluate_concatenated_response( |
| concatenated_response, evaluation_config, file_path, evaluation_output_dir |
| ): |
| evaluation_config.prepare_and_validate_omr_response(concatenated_response) |
| current_score = 0.0 |
| for question in evaluation_config.questions_in_order: |
| marked_answer = concatenated_response[question] |
| delta = evaluation_config.match_answer_for_question( |
| current_score, question, marked_answer |
| ) |
| current_score += delta |
|
|
| evaluation_config.conditionally_print_explanation() |
| evaluation_config.conditionally_save_explanation_csv(file_path, evaluation_output_dir) |
|
|
| return current_score |
|
|