mirror of
https://github.com/th30d4y/OpenLearnX.git
synced 2026-05-26 19:26:33 +00:00
804 lines
28 KiB
Python
804 lines
28 KiB
Python
from flask import Blueprint, request, jsonify
|
|
from datetime import datetime, timedelta
|
|
from pymongo import MongoClient
|
|
import os
|
|
import uuid
|
|
import jwt
|
|
import logging
|
|
from eth_account.messages import encode_defunct
|
|
from web3 import Web3
|
|
from activity_logger import log_user_activity
|
|
|
|
bp = Blueprint('auth', __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# MongoDB connection
|
|
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
|
|
client = MongoClient(mongo_uri)
|
|
db = client.openlearnx
|
|
|
|
# JWT secret - must be set via environment variable
|
|
JWT_SECRET = os.getenv('JWT_SECRET')
|
|
if not JWT_SECRET:
|
|
import warnings
|
|
import tempfile
|
|
import stat
|
|
import secrets as secrets_module
|
|
warnings.warn("JWT_SECRET environment variable not set. Using persistent dev secret.", UserWarning)
|
|
|
|
def _generate_and_store_secret():
|
|
"""Generate a random secret and store it with restrictive permissions."""
|
|
return secrets_module.token_hex(32)
|
|
|
|
# Use persistent file-based secret for development to avoid invalidating tokens on restart
|
|
_secret_file = os.path.join(tempfile.gettempdir(), '.openlearnx_dev_jwt_secret_auth')
|
|
try:
|
|
if os.path.exists(_secret_file):
|
|
with open(_secret_file, 'r') as f:
|
|
JWT_SECRET = f.read().strip()
|
|
if not JWT_SECRET:
|
|
JWT_SECRET = _generate_and_store_secret()
|
|
with open(_secret_file, 'w') as f:
|
|
f.write(JWT_SECRET)
|
|
# Set restrictive permissions (owner read/write only)
|
|
os.chmod(_secret_file, stat.S_IRUSR | stat.S_IWUSR)
|
|
except Exception:
|
|
JWT_SECRET = _generate_and_store_secret()
|
|
|
|
@bp.route('/nonce', methods=['POST', 'OPTIONS'])
|
|
def get_nonce():
|
|
"""Generate nonce for MetaMask authentication"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
wallet_address = data.get('wallet_address')
|
|
|
|
if not wallet_address:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Wallet address required"
|
|
}), 400
|
|
|
|
# Generate unique nonce
|
|
nonce = str(uuid.uuid4())
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Create message to sign
|
|
message = f"Sign this message to authenticate with OpenLearnX:\n\nNonce: {nonce}\nTimestamp: {timestamp}\nAddress: {wallet_address}"
|
|
|
|
logger.info(f"🔐 Generated nonce for wallet: {wallet_address}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"nonce": nonce,
|
|
"message": message,
|
|
"timestamp": timestamp
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error generating nonce: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@bp.route('/verify', methods=['POST', 'OPTIONS'])
|
|
def verify_signature():
|
|
"""Verify MetaMask signature and authenticate user"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
wallet_address = data.get('wallet_address')
|
|
signature = data.get('signature')
|
|
message = data.get('message')
|
|
|
|
if not all([wallet_address, signature, message]):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Wallet address, signature, and message are required"
|
|
}), 400
|
|
|
|
# Verify the signature
|
|
try:
|
|
# Create the message hash that was signed
|
|
message_hash = encode_defunct(text=message)
|
|
|
|
# Recover the address from the signature
|
|
w3 = Web3()
|
|
recovered_address = w3.eth.account.recover_message(message_hash, signature=signature)
|
|
|
|
# Check if recovered address matches the claimed address
|
|
if recovered_address.lower() != wallet_address.lower():
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Signature verification failed"
|
|
}), 401
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Signature verification error: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid signature"
|
|
}), 401
|
|
|
|
# Check if user exists, create if not
|
|
user = db.users.find_one({"wallet_address": wallet_address.lower()})
|
|
|
|
if not user:
|
|
# Create new user
|
|
user = {
|
|
"wallet_address": wallet_address.lower(),
|
|
"role": "student",
|
|
"status": "active",
|
|
"created_at": datetime.now(),
|
|
"last_login": datetime.now(),
|
|
"login_count": 1
|
|
}
|
|
result = db.users.insert_one(user)
|
|
user["_id"] = str(result.inserted_id)
|
|
logger.info(f"✅ Created new user: {wallet_address}")
|
|
log_user_activity(
|
|
db,
|
|
wallet_address.lower(),
|
|
"auth_register",
|
|
"Account registered",
|
|
"Created account via wallet authentication",
|
|
{"auth_method": "wallet"},
|
|
)
|
|
else:
|
|
account_status = str(user.get("status", "active")).lower().strip()
|
|
if account_status == "banned":
|
|
logger.warning(f"⛔ Banned wallet login blocked: {wallet_address}")
|
|
log_user_activity(
|
|
db,
|
|
wallet_address.lower(),
|
|
"account_status",
|
|
"Login blocked",
|
|
"Login blocked because account is banned",
|
|
{"status": "banned"},
|
|
)
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Your account is banned. Contact admin."
|
|
}), 403
|
|
|
|
# Update existing user
|
|
db.users.update_one(
|
|
{"wallet_address": wallet_address.lower()},
|
|
{
|
|
"$set": {"last_login": datetime.now()},
|
|
"$inc": {"login_count": 1}
|
|
}
|
|
)
|
|
user["_id"] = str(user["_id"])
|
|
logger.info(f"✅ Updated existing user: {wallet_address}")
|
|
|
|
log_user_activity(
|
|
db,
|
|
wallet_address.lower(),
|
|
"auth_login",
|
|
"Login successful",
|
|
"Wallet login completed successfully",
|
|
{"auth_method": "wallet"},
|
|
)
|
|
|
|
# Generate JWT token
|
|
token_payload = {
|
|
"user_id": user["wallet_address"],
|
|
"wallet_address": user["wallet_address"],
|
|
"iat": datetime.utcnow(),
|
|
"exp": datetime.utcnow() + timedelta(days=7)
|
|
}
|
|
|
|
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
|
|
|
|
# Prepare user data for response
|
|
user_response = {
|
|
"id": user["wallet_address"],
|
|
"wallet_address": user["wallet_address"],
|
|
"email": user.get("email", ""),
|
|
"name": user.get("name", ""),
|
|
"bio": user.get("bio", ""),
|
|
"avatar": user.get("avatar", ""),
|
|
"role": user.get("role", "student"),
|
|
"status": user.get("status", "active"),
|
|
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
|
|
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
|
|
}
|
|
|
|
logger.info(f"✅ Authentication successful for: {wallet_address}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"token": token,
|
|
"user": user_response,
|
|
"message": "Authentication successful"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error verifying signature: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@bp.route('/register', methods=['POST', 'OPTIONS'])
|
|
def register():
|
|
"""Register a new user with email and password"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
email = data.get('email', '').strip().lower()
|
|
password = data.get('password', '')
|
|
username = data.get('username', '').strip()
|
|
|
|
if not email or not password:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Email and password are required"
|
|
}), 400
|
|
|
|
if len(password) < 6:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Password must be at least 6 characters"
|
|
}), 400
|
|
|
|
# Check if user already exists
|
|
existing_user = db.users.find_one({"email": email})
|
|
if existing_user:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Email already registered"
|
|
}), 409
|
|
|
|
# Hash password using simple approach for development
|
|
# TODO: Use werkzeug.security.generate_password_hash for production
|
|
import hashlib
|
|
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
# Create new user
|
|
user = {
|
|
"email": email,
|
|
"username": username or email.split("@")[0],
|
|
"password_hash": password_hash,
|
|
"name": "",
|
|
"bio": "",
|
|
"avatar": "",
|
|
"role": "student",
|
|
"status": "active",
|
|
"created_at": datetime.now(),
|
|
"last_login": datetime.now(),
|
|
"login_count": 1,
|
|
"auth_method": "email"
|
|
}
|
|
|
|
result = db.users.insert_one(user)
|
|
user["_id"] = str(result.inserted_id)
|
|
|
|
# Generate JWT token
|
|
token_payload = {
|
|
"user_id": str(result.inserted_id),
|
|
"email": email,
|
|
"iat": datetime.utcnow(),
|
|
"exp": datetime.utcnow() + timedelta(days=7)
|
|
}
|
|
|
|
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
|
|
|
|
user_response = {
|
|
"id": str(result.inserted_id),
|
|
"email": email,
|
|
"username": username or email.split("@")[0],
|
|
"name": "",
|
|
"bio": "",
|
|
"avatar": "",
|
|
"role": "student",
|
|
"status": "active",
|
|
"created_at": user["created_at"].isoformat(),
|
|
"last_login": user["last_login"].isoformat()
|
|
}
|
|
|
|
log_user_activity(
|
|
db,
|
|
str(result.inserted_id),
|
|
"auth_register",
|
|
"Account registered",
|
|
"Created account with email and password",
|
|
{"auth_method": "email"},
|
|
)
|
|
|
|
logger.info(f"✅ New user registered: {email}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"token": token,
|
|
"user": user_response,
|
|
"message": "Registration successful"
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during registration: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@bp.route('/login', methods=['POST', 'OPTIONS'])
|
|
def login():
|
|
"""Login with email and password"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
data = request.get_json()
|
|
email = data.get('email', '').strip().lower()
|
|
password = data.get('password', '')
|
|
|
|
if not email or not password:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Email and password are required"
|
|
}), 400
|
|
|
|
# Find user by email
|
|
user = db.users.find_one({"email": email})
|
|
if not user:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid email or password"
|
|
}), 401
|
|
|
|
account_status = str(user.get("status", "active")).lower().strip()
|
|
if account_status == "banned":
|
|
logger.warning(f"⛔ Banned email login blocked: {email}")
|
|
log_user_activity(
|
|
db,
|
|
str(user.get("_id")),
|
|
"account_status",
|
|
"Login blocked",
|
|
"Login blocked because account is banned",
|
|
{"status": "banned", "email": email},
|
|
)
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Your account is banned. Contact admin."
|
|
}), 403
|
|
|
|
if account_status == "suspended":
|
|
log_user_activity(
|
|
db,
|
|
str(user.get("_id")),
|
|
"account_status",
|
|
"Login attempted while suspended",
|
|
"User logged in while account status is suspended",
|
|
{"status": "suspended", "email": email},
|
|
)
|
|
|
|
# Verify password
|
|
import hashlib
|
|
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
if password_hash != user.get('password_hash'):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid email or password"
|
|
}), 401
|
|
|
|
# Update last login
|
|
db.users.update_one(
|
|
{"_id": user["_id"]},
|
|
{
|
|
"$set": {"last_login": datetime.now()},
|
|
"$inc": {"login_count": 1}
|
|
}
|
|
)
|
|
|
|
# Generate JWT token
|
|
token_payload = {
|
|
"user_id": str(user["_id"]),
|
|
"email": email,
|
|
"iat": datetime.utcnow(),
|
|
"exp": datetime.utcnow() + timedelta(days=7)
|
|
}
|
|
|
|
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
|
|
|
|
user_response = {
|
|
"id": str(user["_id"]),
|
|
"email": email,
|
|
"username": user.get('username', ''),
|
|
"name": user.get('name', ''),
|
|
"bio": user.get('bio', ''),
|
|
"avatar": user.get('avatar', ''),
|
|
"role": user.get('role', 'student'),
|
|
"status": user.get('status', 'active'),
|
|
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
|
|
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
|
|
}
|
|
|
|
log_user_activity(
|
|
db,
|
|
str(user.get("_id")),
|
|
"auth_login",
|
|
"Login successful",
|
|
"Email login completed successfully",
|
|
{"auth_method": "email", "email": email},
|
|
)
|
|
|
|
logger.info(f"✅ User logged in: {email}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"token": token,
|
|
"user": user_response,
|
|
"message": "Login successful"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during login: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@bp.route('/profile/update', methods=['POST', 'OPTIONS'])
|
|
def update_profile():
|
|
"""Update user profile (name, bio, avatar)"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
# Get token from header
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Authorization header required"
|
|
}), 401
|
|
|
|
token = auth_header.split('Bearer ')[1]
|
|
|
|
# Verify and decode token
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
|
user_id = payload.get('user_id')
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid token"
|
|
}), 401
|
|
|
|
data = request.get_json()
|
|
name = data.get('name', '').strip()
|
|
bio = data.get('bio', '').strip()
|
|
avatar = data.get('avatar', '').strip()
|
|
|
|
# Update user profile
|
|
from bson.objectid import ObjectId
|
|
result = db.users.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{
|
|
"$set": {
|
|
"name": name,
|
|
"bio": bio,
|
|
"avatar": avatar,
|
|
"updated_at": datetime.now()
|
|
}
|
|
}
|
|
)
|
|
|
|
if result.matched_count == 0:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "User not found"
|
|
}), 404
|
|
|
|
# Get updated user
|
|
user = db.users.find_one({"_id": ObjectId(user_id)})
|
|
|
|
user_response = {
|
|
"id": str(user["_id"]),
|
|
"email": user.get('email', ''),
|
|
"username": user.get('username', ''),
|
|
"name": user.get('name', ''),
|
|
"bio": user.get('bio', ''),
|
|
"avatar": user.get('avatar', ''),
|
|
"role": user.get('role', 'student'),
|
|
"status": user.get('status', 'active'),
|
|
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
|
|
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
|
|
}
|
|
|
|
logger.info(f"✅ Profile updated for user: {user_id}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"user": user_response,
|
|
"message": "Profile updated successfully"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating profile: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
@bp.route('/metamask/add-email', methods=['POST', 'OPTIONS'])
|
|
def add_metamask_email():
|
|
"""Store contact email for MetaMask wallet"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
# Get token from header
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Authorization header required"
|
|
}), 401
|
|
|
|
token = auth_header.split('Bearer ')[1]
|
|
|
|
# Verify and decode token
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
|
wallet_address = payload.get('wallet_address')
|
|
if not wallet_address:
|
|
wallet_address = payload.get('user_id')
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid token"
|
|
}), 401
|
|
|
|
data = request.get_json()
|
|
email = data.get('email', '').strip().lower()
|
|
name = data.get('name', '').strip()
|
|
|
|
if not email:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Email is required"
|
|
}), 400
|
|
|
|
# Validate email format
|
|
import re
|
|
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid email format"
|
|
}), 400
|
|
|
|
# Check if email already used by different wallet
|
|
existing_user = db.users.find_one({"email": email, "wallet_address": {"$ne": wallet_address.lower()}})
|
|
if existing_user:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Email already associated with another wallet"
|
|
}), 409
|
|
|
|
# Update user with email and name
|
|
from bson.objectid import ObjectId
|
|
|
|
# Try updating by wallet address first (for new users)
|
|
result = db.users.update_one(
|
|
{"wallet_address": wallet_address.lower()},
|
|
{
|
|
"$set": {
|
|
"email": email,
|
|
"name": name or "",
|
|
"updated_at": datetime.now()
|
|
}
|
|
}
|
|
)
|
|
|
|
if result.matched_count == 0:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "User not found"
|
|
}), 404
|
|
|
|
# Get updated user
|
|
user = db.users.find_one({"wallet_address": wallet_address.lower()})
|
|
|
|
user_response = {
|
|
"id": str(user.get("_id", wallet_address)),
|
|
"wallet_address": user.get("wallet_address", wallet_address),
|
|
"email": user.get("email", ""),
|
|
"name": user.get("name", ""),
|
|
"bio": user.get("bio", ""),
|
|
"avatar": user.get("avatar", ""),
|
|
"role": user.get("role", "student"),
|
|
"status": user.get("status", "active"),
|
|
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
|
|
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now()))
|
|
}
|
|
|
|
logger.info(f"✅ Email added for MetaMask wallet: {wallet_address}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"user": user_response,
|
|
"message": "Email saved successfully"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error saving MetaMask email: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
|
|
@bp.route('/verify-token', methods=['POST', 'OPTIONS'])
|
|
def verify_token():
|
|
"""Validate JWT token and return the latest user payload."""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({"valid": False, "error": "Authorization header required"}), 401
|
|
|
|
token = auth_header.split('Bearer ')[1]
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({"valid": False, "error": "Invalid token"}), 401
|
|
|
|
user = None
|
|
wallet_address = payload.get('wallet_address')
|
|
email = payload.get('email')
|
|
user_id = payload.get('user_id')
|
|
|
|
if wallet_address:
|
|
user = db.users.find_one({"wallet_address": str(wallet_address).lower()})
|
|
elif email:
|
|
user = db.users.find_one({"email": str(email).lower()})
|
|
elif user_id:
|
|
try:
|
|
from bson.objectid import ObjectId
|
|
user = db.users.find_one({"_id": ObjectId(user_id)})
|
|
except Exception:
|
|
user = None
|
|
|
|
if not user:
|
|
return jsonify({"valid": False, "error": "User not found"}), 404
|
|
|
|
status = str(user.get("status", "active")).lower().strip()
|
|
if status == "banned":
|
|
return jsonify({"valid": False, "error": "Account is banned"}), 403
|
|
|
|
user_response = {
|
|
"id": str(user.get("_id", user.get("wallet_address", ""))),
|
|
"wallet_address": user.get("wallet_address", ""),
|
|
"email": user.get("email", ""),
|
|
"username": user.get("username", ""),
|
|
"name": user.get("name", ""),
|
|
"bio": user.get("bio", ""),
|
|
"avatar": user.get("avatar", ""),
|
|
"role": user.get("role", "student"),
|
|
"status": user.get("status", "active"),
|
|
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
|
|
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now())),
|
|
}
|
|
|
|
return jsonify({"valid": True, "user": user_response})
|
|
except Exception as e:
|
|
logger.error(f"❌ verify-token error: {str(e)}")
|
|
return jsonify({"valid": False, "error": str(e)}), 500
|
|
|
|
|
|
@bp.route('/me', methods=['GET', 'OPTIONS'])
|
|
def get_me():
|
|
"""Return authenticated user profile for current token."""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
verify_resp = verify_token()
|
|
try:
|
|
body, status = verify_resp
|
|
if status != 200:
|
|
return body, status
|
|
data = body.get_json()
|
|
return jsonify({"success": True, "user": data.get("user", {})})
|
|
except Exception:
|
|
return verify_resp
|
|
|
|
@bp.route('/upload-image', methods=['POST', 'OPTIONS'])
|
|
def upload_image():
|
|
"""Upload and convert image (PNG/JPG only) to base64"""
|
|
if request.method == "OPTIONS":
|
|
return jsonify({'status': 'ok'})
|
|
|
|
try:
|
|
# Get token from header
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Authorization header required"
|
|
}), 401
|
|
|
|
token = auth_header.split('Bearer ')[1]
|
|
|
|
# Verify and decode token
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
|
user_id = payload.get('user_id')
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Invalid token"
|
|
}), 401
|
|
|
|
# Check if file is in request
|
|
if 'file' not in request.files:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "No file provided"
|
|
}), 400
|
|
|
|
file = request.files['file']
|
|
|
|
if file.filename == '':
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "No file selected"
|
|
}), 400
|
|
|
|
# Validate file type - only PNG and JPG
|
|
allowed_extensions = {'png', 'jpg', 'jpeg'}
|
|
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
|
|
|
|
if file_ext not in allowed_extensions:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Only PNG and JPG formats are allowed"
|
|
}), 400
|
|
|
|
# Validate file size (max 5MB)
|
|
file.seek(0, 2) # Seek to end
|
|
file_size = file.tell()
|
|
file.seek(0) # Seek back to start
|
|
|
|
max_size = 5 * 1024 * 1024 # 5MB
|
|
if file_size > max_size:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "File size must be less than 5MB"
|
|
}), 400
|
|
|
|
# Read file and convert to base64
|
|
import base64
|
|
file_data = file.read()
|
|
base64_image = base64.b64encode(file_data).decode('utf-8')
|
|
|
|
# Create data URL for the image
|
|
mime_type = f"image/{file_ext if file_ext != 'jpg' else 'jpeg'}"
|
|
data_url = f"data:{mime_type};base64,{base64_image}"
|
|
|
|
logger.info(f"✅ Image uploaded for user: {user_id}, size: {file_size} bytes")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"image": data_url,
|
|
"size": file_size,
|
|
"message": "Image uploaded successfully"
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error uploading image: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|