| import os |
| import time |
| import base64 |
| import hashlib |
| from cryptography.hazmat.primitives.asymmetric import ed25519 |
| from cryptography.hazmat.primitives import serialization |
|
|
| class MEPIdentity: |
| def __init__(self, key_path="private.pem"): |
| self.key_path = key_path |
| self._load_or_generate() |
| |
| def _load_or_generate(self): |
| if os.path.exists(self.key_path): |
| with open(self.key_path, "rb") as f: |
| self.private_key = serialization.load_pem_private_key(f.read(), password=None) |
| else: |
| self.private_key = ed25519.Ed25519PrivateKey.generate() |
| with open(self.key_path, "wb") as f: |
| f.write(self.private_key.private_bytes( |
| encoding=serialization.Encoding.PEM, |
| format=serialization.PrivateFormat.PKCS8, |
| encryption_algorithm=serialization.NoEncryption() |
| )) |
| |
| self.public_key = self.private_key.public_key() |
| self.pub_pem = self.public_key.public_bytes( |
| encoding=serialization.Encoding.PEM, |
| format=serialization.PublicFormat.SubjectPublicKeyInfo |
| ).decode('utf-8') |
| |
| sha = hashlib.sha256(self.pub_pem.encode('utf-8')).hexdigest() |
| self.node_id = f"node_{sha[:12]}" |
| |
| def sign(self, payload: str, timestamp: str) -> str: |
| message = f"{payload}{timestamp}".encode('utf-8') |
| signature = self.private_key.sign(message) |
| return base64.b64encode(signature).decode('utf-8') |
| |
| def get_auth_headers(self, payload: str) -> dict: |
| ts = str(int(time.time())) |
| sig = self.sign(payload, ts) |
| return { |
| "X-MEP-NodeID": self.node_id, |
| "X-MEP-Timestamp": ts, |
| "X-MEP-Signature": sig |
| } |
|
|