| from flask import Blueprint, request, jsonify, current_app |
| from flask_jwt_extended import jwt_required, get_jwt_identity |
| from email_validator import validate_email, EmailNotValidError |
| from backend.services.auth_service import register_user, login_user, get_user_by_id, request_password_reset, reset_user_password |
| from backend.models.user import User |
| from backend.utils.country_language_data import COUNTRIES, LANGUAGES |
|
|
| auth_bp = Blueprint('auth', __name__) |
|
|
| def validate_email_format(email: str) -> tuple[bool, str]: |
| """ |
| Validate email format using email-validator library. |
| |
| Args: |
| email: Email string to validate |
| |
| Returns: |
| Tuple of (is_valid, validated_email_or_error_message) |
| """ |
| try: |
| validated = validate_email(email) |
| return True, validated['email'] |
| except EmailNotValidError as e: |
| return False, str(e) |
|
|
| @auth_bp.route('/', methods=['OPTIONS']) |
| def handle_options(): |
| """Handle OPTIONS requests for preflight CORS checks.""" |
| return '', 200 |
|
|
| @auth_bp.route('/register', methods=['OPTIONS']) |
| def handle_register_options(): |
| """Handle OPTIONS requests for preflight CORS checks for register route.""" |
| return '', 200 |
|
|
| @auth_bp.route('/register', methods=['POST']) |
| def register(): |
| """ |
| Register a new user. |
| |
| Request Body: |
| email (str): User email |
| password (str): User password |
| country (str, optional): User country (ISO 3166-1 alpha-2 code) |
| language (str, optional): User language (ISO 639-1 code) |
| |
| Returns: |
| JSON: Registration result |
| """ |
| try: |
| data = request.get_json() |
|
|
| |
| if not data or not all(k in data for k in ('email', 'password')): |
| return jsonify({ |
| 'success': False, |
| 'message': 'Email and password are required' |
| }), 400 |
|
|
| email = data['email'] |
| password = data['password'] |
| country = data.get('country') |
| language = data.get('language') |
|
|
| |
| is_valid_email, validated_email_or_error = validate_email_format(email) |
| if not is_valid_email: |
| return jsonify({ |
| 'success': False, |
| 'message': f'Invalid email format: {validated_email_or_error}' |
| }), 400 |
|
|
| |
| email = validated_email_or_error |
|
|
| |
| password_validation = validate_password_strength(password) |
| if not password_validation['valid']: |
| return jsonify({ |
| 'success': False, |
| 'message': password_validation['message'] |
| }), 400 |
|
|
| |
| if country: |
| |
| if not isinstance(country, str) or len(country) != 2 or not country.isalpha(): |
| return jsonify({ |
| 'success': False, |
| 'message': 'Country must be a valid ISO 3166-1 alpha-2 code (2 alphabetic characters)' |
| }), 400 |
|
|
| if language: |
| |
| if not isinstance(language, str) or len(language) != 2 or not language.isalpha(): |
| return jsonify({ |
| 'success': False, |
| 'message': 'Language must be a valid ISO 639-1 code (2 alphabetic characters)' |
| }), 400 |
|
|
| |
| result = register_user(email, password, country, language) |
|
|
| if result['success']: |
| return jsonify(result), 201 |
| else: |
| |
| if 'already exist' in result.get('message', '').lower(): |
| return jsonify({ |
| 'success': False, |
| 'message': 'Account with this email already exists' |
| }), 400 |
| return jsonify(result), 400 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Registration error: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred during registration' |
| }), 500 |
|
|
| @auth_bp.route('/login', methods=['OPTIONS']) |
| def handle_login_options(): |
| """Handle OPTIONS requests for preflight CORS checks for login route.""" |
| from flask import current_app |
| current_app.logger.info(f"OPTIONS request for /login from {request.remote_addr}") |
| current_app.logger.info(f"Request headers: {dict(request.headers)}") |
| return '', 200 |
|
|
| @auth_bp.route('/login', methods=['POST']) |
| def login(): |
| """ |
| Authenticate and login a user. |
| |
| Request Body: |
| email (str): User email |
| password (str): User password |
| remember_me (bool): Remember me flag for extended session (optional) |
| |
| Returns: |
| JSON: Login result with JWT token |
| """ |
| try: |
| |
| current_app.logger.info(f"Login request received from {request.remote_addr}") |
| current_app.logger.info(f"Request headers: {dict(request.headers)}") |
|
|
| data = request.get_json() |
|
|
| |
| if not data or not all(k in data for k in ('email', 'password')): |
| current_app.logger.warning("Login failed: Missing email or password") |
| return jsonify({ |
| 'success': False, |
| 'message': 'Email and password are required' |
| }), 400 |
|
|
| email = data['email'] |
| password = data['password'] |
| remember_me = data.get('remember_me', False) |
|
|
| |
| is_valid_email, validated_email_or_error = validate_email_format(email) |
| if not is_valid_email: |
| current_app.logger.warning(f"Login attempt with invalid email format: {email}") |
| |
| return jsonify({ |
| 'success': False, |
| 'message': 'Invalid email or password' |
| }), 401 |
|
|
| |
| email = validated_email_or_error |
|
|
| |
| result = login_user(email, password, remember_me) |
|
|
| if result['success']: |
| |
| response_data = jsonify(result) |
| response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000') |
| response_data.headers.add('Access-Control-Allow-Credentials', 'true') |
| current_app.logger.info(f"Login successful for user {email}") |
| return response_data, 200 |
| else: |
| current_app.logger.warning(f"Login failed for user {email}: {result.get('message', 'Unknown error')}") |
| return jsonify(result), 401 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Login error: {str(e)}", exc_info=True) |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred during login' |
| }), 500 |
|
|
| @auth_bp.route('/logout', methods=['OPTIONS']) |
| def handle_logout_options(): |
| """Handle OPTIONS requests for preflight CORS checks for logout route.""" |
| return '', 200 |
|
|
| @auth_bp.route('/logout', methods=['POST']) |
| @jwt_required() |
| def logout(): |
| """ |
| Logout current user. |
| |
| Returns: |
| JSON: Logout result |
| """ |
| try: |
| current_app.logger.info(f"Logout request for user: {get_jwt_identity()}") |
| return jsonify({ |
| 'success': True, |
| 'message': 'Logged out successfully' |
| }), 200 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Logout error: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred during logout' |
| }), 500 |
|
|
| @auth_bp.route('/user', methods=['OPTIONS']) |
| def handle_user_options(): |
| """Handle OPTIONS requests for preflight CORS checks for user route.""" |
| return '', 200 |
|
|
| @auth_bp.route('/user', methods=['GET']) |
| @jwt_required() |
| def get_current_user(): |
| """ |
| Get current authenticated user. |
| |
| Returns: |
| JSON: Current user data |
| """ |
| try: |
| user_id = get_jwt_identity() |
| current_app.logger.info(f"Get user profile request for user: {user_id}") |
| user_data = get_user_by_id(user_id) |
|
|
| if user_data: |
| |
| safe_user_data = {k: v for k, v in user_data.items() if k not in ['password', 'password_hash']} |
| return jsonify({ |
| 'success': True, |
| 'user': safe_user_data |
| }), 200 |
| else: |
| return jsonify({ |
| 'success': False, |
| 'message': 'User not found' |
| }), 404 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Get user error: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred while fetching user data' |
| }), 500 |
|
|
| @auth_bp.route('/registration-options', methods=['GET']) |
| def get_registration_options(): |
| """ |
| Get registration options including countries and languages. |
| |
| Returns: |
| JSON: Registration options |
| """ |
| try: |
| return jsonify({ |
| 'success': True, |
| 'countries': COUNTRIES, |
| 'languages': LANGUAGES |
| }), 200 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Get registration options error: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred while fetching registration options' |
| }), 500 |
|
|
| @auth_bp.route('/forgot-password', methods=['OPTIONS']) |
| def handle_forgot_password_options(): |
| """Handle OPTIONS requests for preflight CORS checks for forgot password route.""" |
| return '', 200 |
|
|
|
|
| @auth_bp.route('/forgot-password', methods=['POST']) |
| def forgot_password(): |
| """ |
| Request password reset for a user. |
| |
| Request Body: |
| email (str): User email |
| |
| Returns: |
| JSON: Password reset request result |
| """ |
| try: |
| data = request.get_json() |
|
|
| |
| if not data or 'email' not in data: |
| return jsonify({ |
| 'success': False, |
| 'message': 'Email is required' |
| }), 400 |
|
|
| email = data['email'] |
|
|
| |
| is_valid_email, validated_email_or_error = validate_email_format(email) |
| if not is_valid_email: |
| |
| current_app.logger.warning(f"Forgot password request with invalid email format: {email}") |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| }), 200 |
|
|
| |
| email = validated_email_or_error |
|
|
| |
| result = request_password_reset(current_app.supabase, email) |
|
|
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| }), 200 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Forgot password error: {str(e)}") |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| }), 200 |
|
|
|
|
| @auth_bp.route('/reset-password', methods=['OPTIONS']) |
| def handle_reset_password_options(): |
| """Handle OPTIONS requests for preflight CORS checks for reset password route.""" |
| return '', 200 |
|
|
| @auth_bp.route('/reset-password', methods=['GET']) |
| def show_reset_password_form(): |
| """ |
| Serve the password reset form. |
| This endpoint is accessed via the link sent in the password reset email. |
| The token will be available as a query parameter (e.g., ?token=abc123). |
| The SPA frontend should read this token and display the form accordingly. |
| """ |
| |
| current_app.logger.info("Password reset form page accessed.") |
| |
| |
| |
| |
| return '', 200 |
|
|
| @auth_bp.route('/reset-password', methods=['POST']) |
| def reset_password(): |
| """ |
| Reset user password with token. |
| |
| Request Body: |
| token (str): Password reset token |
| password (str): New password |
| |
| Returns: |
| JSON: Password reset result |
| """ |
| try: |
| data = request.get_json() |
|
|
| |
| if not data or not all(k in data for k in ('token', 'password')): |
| return jsonify({ |
| 'success': False, |
| 'message': 'Token and password are required' |
| }), 400 |
|
|
| token = data['token'] |
| password = data['password'] |
|
|
| |
| password_validation = validate_password_strength(password) |
| if not password_validation['valid']: |
| return jsonify({ |
| 'success': False, |
| 'message': password_validation['message'] |
| }), 400 |
|
|
| |
| result = reset_user_password(current_app.supabase, token, password) |
|
|
| if result['success']: |
| return jsonify(result), 200 |
| else: |
| return jsonify(result), 400 |
|
|
| except Exception as e: |
| current_app.logger.error(f"Reset password error: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': 'An error occurred while resetting your password' |
| }), 500 |
|
|
| def validate_password_strength(password: str) -> dict: |
| """ |
| Validates password strength based on security requirements. |
| |
| Args: |
| password: Password string to validate |
| |
| Returns: |
| Dictionary with validation result and message |
| """ |
| if len(password) < 8: |
| return { |
| 'valid': False, |
| 'message': 'Password must be at least 8 characters long' |
| } |
|
|
| |
| has_upper = any(c.isupper() for c in password) |
| has_lower = any(c.islower() for c in password) |
| has_digit = any(c.isdigit() for c in password) |
| has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password) |
|
|
| if not has_upper: |
| return { |
| 'valid': False, |
| 'message': 'Password must contain at least one uppercase letter' |
| } |
|
|
| if not has_lower: |
| return { |
| 'valid': False, |
| 'message': 'Password must contain at least one lowercase letter' |
| } |
|
|
| if not has_digit: |
| return { |
| 'valid': False, |
| 'message': 'Password must contain at least one number' |
| } |
|
|
| if not has_special: |
| return { |
| 'valid': False, |
| 'message': 'Password must contain at least one special character' |
| } |
|
|
| return { |
| 'valid': True, |
| 'message': 'Password is valid' |
| } |