| | import gc |
| | import uuid |
| |
|
| | import chromadb |
| | import numpy as np |
| | import torch |
| | import torch.nn.functional as F |
| | from PIL import Image |
| | from transformers import AutoModel, AutoImageProcessor |
| |
|
| | from src.utils.utils import extract_images_from_file |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class is_conf_image: |
| | def __init__(self): |
| | self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| | self.feature_extractor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
| | cache_dir="../weights", use_fast=True, |
| | trust_remote_code=True) |
| | self.model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
| | cache_dir="../weights", trust_remote_code=True).eval().to(self.device) |
| |
|
| | self.client = chromadb.PersistentClient(path="../db/image") |
| | self.collection = self.client.get_or_create_collection(name="image_embedding", metadata={"hnsw": "cosine"}, ) |
| | self.max_size: int = 800 |
| | self.cnt: int = 0 |
| | self.cnt_infer: int = 0 |
| |
|
| | async def making_embedding_vector(self, image_path: str, category: int, ): |
| | image = Image.open(image_path).convert("RGB") |
| | image = np.array(image) |
| |
|
| | embedding_vector = self.inference(image) |
| | |
| |
|
| | self.add_vectors(embedding_vector, {"image": image_path, "category": category}) |
| |
|
| | if (self.cnt + 1) % 200 == 0: |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | self.cnt += 1 |
| | else: |
| | self.cnt += 1 |
| | return embedding_vector |
| |
|
| | async def infer_image(self, image_path: str, threshold: float = 0.45, top_k: int = 2): |
| | image = Image.open(image_path).convert("RGB") |
| | image = np.array(image) |
| |
|
| | if image.shape[0] > self.max_size or image.shape[1] > self.max_size or image_path.endswith('.pdf'): |
| | results = [] |
| | for image in extract_images_from_file(image_path, max_size=self.max_size): |
| | image = Image.open(image).convert("RGB") |
| | image = np.array(image) |
| | embedding_vector = self.inference(image) |
| | result = self.finding_from_db(embedding_vector, threshold, top_k) |
| | results.append(result) |
| | return results |
| |
|
| | embedding_vector = self.inference(image) |
| | results = self.finding_from_db(embedding_vector, threshold, top_k) |
| |
|
| | |
| | |
| | if (self.cnt_infer + 1) % 200 == 0: |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | self.cnt_infer += 1 |
| | else: |
| | self.cnt_infer += 1 |
| | return results |
| |
|
| | def finding_from_db(self, embedding_vector, threshold: float, top_k: int, ) -> dict: |
| | result_out, idx = {}, 0 |
| | |
| | results = self.collection.query(query_embeddings=embedding_vector, n_results=top_k, |
| | include=["embeddings", "metadatas", "distances"]) |
| | |
| |
|
| | for j in range(len(results["distances"][0])): |
| | if results["distances"][0][j] <= threshold: |
| | result_out["similar_image" + str(idx)] = results["metadatas"][0][j]["image"] |
| | result_out["category" + str(idx)] = results["metadatas"][0][j]["category"] |
| | result_out["cosine distance" + str(idx)] = results["distances"][0][j] |
| | return result_out |
| |
|
| | @torch.inference_mode() |
| | def inference(self, image: np.array): |
| | inputs = self.feature_extractor(images=image, return_tensors="pt").to(self.device) |
| | outputs = self.model(**inputs).last_hidden_state |
| | outputs = F.normalize(outputs[:, 0], p=2, dim=1).detach().cpu().numpy() |
| |
|
| | return outputs.tolist() |
| |
|
| | def add_vectors(self, vectors, metadatas): |
| | |
| | self.collection.add( |
| | embeddings=vectors[0], |
| | metadatas=metadatas, |
| | ids=str(uuid.uuid4()) |
| | ) |
| |
|