Files
OpenLearnX/backend/routes/auth.py

804 lines
28 KiB
Python

from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from pymongo import MongoClient
import os
import uuid
import jwt
import logging
from eth_account.messages import encode_defunct
from web3 import Web3
from activity_logger import log_user_activity
bp = Blueprint('auth', __name__)
logger = logging.getLogger(__name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
# JWT secret - must be set via environment variable
JWT_SECRET = os.getenv('JWT_SECRET')
if not JWT_SECRET:
import warnings
import tempfile
import stat
import secrets as secrets_module
warnings.warn("JWT_SECRET environment variable not set. Using persistent dev secret.", UserWarning)
def _generate_and_store_secret():
"""Generate a random secret and store it with restrictive permissions."""
return secrets_module.token_hex(32)
# Use persistent file-based secret for development to avoid invalidating tokens on restart
_secret_file = os.path.join(tempfile.gettempdir(), '.openlearnx_dev_jwt_secret_auth')
try:
if os.path.exists(_secret_file):
with open(_secret_file, 'r') as f:
JWT_SECRET = f.read().strip()
if not JWT_SECRET:
JWT_SECRET = _generate_and_store_secret()
with open(_secret_file, 'w') as f:
f.write(JWT_SECRET)
# Set restrictive permissions (owner read/write only)
os.chmod(_secret_file, stat.S_IRUSR | stat.S_IWUSR)
except Exception:
JWT_SECRET = _generate_and_store_secret()
@bp.route('/nonce', methods=['POST', 'OPTIONS'])
def get_nonce():
"""Generate nonce for MetaMask authentication"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
wallet_address = data.get('wallet_address')
if not wallet_address:
return jsonify({
"success": False,
"error": "Wallet address required"
}), 400
# Generate unique nonce
nonce = str(uuid.uuid4())
timestamp = datetime.now().isoformat()
# Create message to sign
message = f"Sign this message to authenticate with OpenLearnX:\n\nNonce: {nonce}\nTimestamp: {timestamp}\nAddress: {wallet_address}"
logger.info(f"🔐 Generated nonce for wallet: {wallet_address}")
return jsonify({
"success": True,
"nonce": nonce,
"message": message,
"timestamp": timestamp
})
except Exception as e:
logger.error(f"❌ Error generating nonce: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/verify', methods=['POST', 'OPTIONS'])
def verify_signature():
"""Verify MetaMask signature and authenticate user"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
wallet_address = data.get('wallet_address')
signature = data.get('signature')
message = data.get('message')
if not all([wallet_address, signature, message]):
return jsonify({
"success": False,
"error": "Wallet address, signature, and message are required"
}), 400
# Verify the signature
try:
# Create the message hash that was signed
message_hash = encode_defunct(text=message)
# Recover the address from the signature
w3 = Web3()
recovered_address = w3.eth.account.recover_message(message_hash, signature=signature)
# Check if recovered address matches the claimed address
if recovered_address.lower() != wallet_address.lower():
return jsonify({
"success": False,
"error": "Signature verification failed"
}), 401
except Exception as e:
logger.error(f"❌ Signature verification error: {str(e)}")
return jsonify({
"success": False,
"error": "Invalid signature"
}), 401
# Check if user exists, create if not
user = db.users.find_one({"wallet_address": wallet_address.lower()})
if not user:
# Create new user
user = {
"wallet_address": wallet_address.lower(),
"role": "student",
"status": "active",
"created_at": datetime.now(),
"last_login": datetime.now(),
"login_count": 1
}
result = db.users.insert_one(user)
user["_id"] = str(result.inserted_id)
logger.info(f"✅ Created new user: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"auth_register",
"Account registered",
"Created account via wallet authentication",
{"auth_method": "wallet"},
)
else:
account_status = str(user.get("status", "active")).lower().strip()
if account_status == "banned":
logger.warning(f"⛔ Banned wallet login blocked: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"account_status",
"Login blocked",
"Login blocked because account is banned",
{"status": "banned"},
)
return jsonify({
"success": False,
"error": "Your account is banned. Contact admin."
}), 403
# Update existing user
db.users.update_one(
{"wallet_address": wallet_address.lower()},
{
"$set": {"last_login": datetime.now()},
"$inc": {"login_count": 1}
}
)
user["_id"] = str(user["_id"])
logger.info(f"✅ Updated existing user: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"auth_login",
"Login successful",
"Wallet login completed successfully",
{"auth_method": "wallet"},
)
# Generate JWT token
token_payload = {
"user_id": user["wallet_address"],
"wallet_address": user["wallet_address"],
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
# Prepare user data for response
user_response = {
"id": user["wallet_address"],
"wallet_address": user["wallet_address"],
"email": user.get("email", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
logger.info(f"✅ Authentication successful for: {wallet_address}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Authentication successful"
})
except Exception as e:
logger.error(f"❌ Error verifying signature: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/register', methods=['POST', 'OPTIONS'])
def register():
"""Register a new user with email and password"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
email = data.get('email', '').strip().lower()
password = data.get('password', '')
username = data.get('username', '').strip()
if not email or not password:
return jsonify({
"success": False,
"error": "Email and password are required"
}), 400
if len(password) < 6:
return jsonify({
"success": False,
"error": "Password must be at least 6 characters"
}), 400
# Check if user already exists
existing_user = db.users.find_one({"email": email})
if existing_user:
return jsonify({
"success": False,
"error": "Email already registered"
}), 409
# Hash password using simple approach for development
# TODO: Use werkzeug.security.generate_password_hash for production
import hashlib
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Create new user
user = {
"email": email,
"username": username or email.split("@")[0],
"password_hash": password_hash,
"name": "",
"bio": "",
"avatar": "",
"role": "student",
"status": "active",
"created_at": datetime.now(),
"last_login": datetime.now(),
"login_count": 1,
"auth_method": "email"
}
result = db.users.insert_one(user)
user["_id"] = str(result.inserted_id)
# Generate JWT token
token_payload = {
"user_id": str(result.inserted_id),
"email": email,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
user_response = {
"id": str(result.inserted_id),
"email": email,
"username": username or email.split("@")[0],
"name": "",
"bio": "",
"avatar": "",
"role": "student",
"status": "active",
"created_at": user["created_at"].isoformat(),
"last_login": user["last_login"].isoformat()
}
log_user_activity(
db,
str(result.inserted_id),
"auth_register",
"Account registered",
"Created account with email and password",
{"auth_method": "email"},
)
logger.info(f"✅ New user registered: {email}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Registration successful"
}), 201
except Exception as e:
logger.error(f"❌ Error during registration: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/login', methods=['POST', 'OPTIONS'])
def login():
"""Login with email and password"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
email = data.get('email', '').strip().lower()
password = data.get('password', '')
if not email or not password:
return jsonify({
"success": False,
"error": "Email and password are required"
}), 400
# Find user by email
user = db.users.find_one({"email": email})
if not user:
return jsonify({
"success": False,
"error": "Invalid email or password"
}), 401
account_status = str(user.get("status", "active")).lower().strip()
if account_status == "banned":
logger.warning(f"⛔ Banned email login blocked: {email}")
log_user_activity(
db,
str(user.get("_id")),
"account_status",
"Login blocked",
"Login blocked because account is banned",
{"status": "banned", "email": email},
)
return jsonify({
"success": False,
"error": "Your account is banned. Contact admin."
}), 403
if account_status == "suspended":
log_user_activity(
db,
str(user.get("_id")),
"account_status",
"Login attempted while suspended",
"User logged in while account status is suspended",
{"status": "suspended", "email": email},
)
# Verify password
import hashlib
password_hash = hashlib.sha256(password.encode()).hexdigest()
if password_hash != user.get('password_hash'):
return jsonify({
"success": False,
"error": "Invalid email or password"
}), 401
# Update last login
db.users.update_one(
{"_id": user["_id"]},
{
"$set": {"last_login": datetime.now()},
"$inc": {"login_count": 1}
}
)
# Generate JWT token
token_payload = {
"user_id": str(user["_id"]),
"email": email,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
user_response = {
"id": str(user["_id"]),
"email": email,
"username": user.get('username', ''),
"name": user.get('name', ''),
"bio": user.get('bio', ''),
"avatar": user.get('avatar', ''),
"role": user.get('role', 'student'),
"status": user.get('status', 'active'),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
log_user_activity(
db,
str(user.get("_id")),
"auth_login",
"Login successful",
"Email login completed successfully",
{"auth_method": "email", "email": email},
)
logger.info(f"✅ User logged in: {email}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Login successful"
})
except Exception as e:
logger.error(f"❌ Error during login: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/profile/update', methods=['POST', 'OPTIONS'])
def update_profile():
"""Update user profile (name, bio, avatar)"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user_id = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
data = request.get_json()
name = data.get('name', '').strip()
bio = data.get('bio', '').strip()
avatar = data.get('avatar', '').strip()
# Update user profile
from bson.objectid import ObjectId
result = db.users.update_one(
{"_id": ObjectId(user_id)},
{
"$set": {
"name": name,
"bio": bio,
"avatar": avatar,
"updated_at": datetime.now()
}
}
)
if result.matched_count == 0:
return jsonify({
"success": False,
"error": "User not found"
}), 404
# Get updated user
user = db.users.find_one({"_id": ObjectId(user_id)})
user_response = {
"id": str(user["_id"]),
"email": user.get('email', ''),
"username": user.get('username', ''),
"name": user.get('name', ''),
"bio": user.get('bio', ''),
"avatar": user.get('avatar', ''),
"role": user.get('role', 'student'),
"status": user.get('status', 'active'),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
logger.info(f"✅ Profile updated for user: {user_id}")
return jsonify({
"success": True,
"user": user_response,
"message": "Profile updated successfully"
})
except Exception as e:
logger.error(f"❌ Error updating profile: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/metamask/add-email', methods=['POST', 'OPTIONS'])
def add_metamask_email():
"""Store contact email for MetaMask wallet"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
wallet_address = payload.get('wallet_address')
if not wallet_address:
wallet_address = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
data = request.get_json()
email = data.get('email', '').strip().lower()
name = data.get('name', '').strip()
if not email:
return jsonify({
"success": False,
"error": "Email is required"
}), 400
# Validate email format
import re
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return jsonify({
"success": False,
"error": "Invalid email format"
}), 400
# Check if email already used by different wallet
existing_user = db.users.find_one({"email": email, "wallet_address": {"$ne": wallet_address.lower()}})
if existing_user:
return jsonify({
"success": False,
"error": "Email already associated with another wallet"
}), 409
# Update user with email and name
from bson.objectid import ObjectId
# Try updating by wallet address first (for new users)
result = db.users.update_one(
{"wallet_address": wallet_address.lower()},
{
"$set": {
"email": email,
"name": name or "",
"updated_at": datetime.now()
}
}
)
if result.matched_count == 0:
return jsonify({
"success": False,
"error": "User not found"
}), 404
# Get updated user
user = db.users.find_one({"wallet_address": wallet_address.lower()})
user_response = {
"id": str(user.get("_id", wallet_address)),
"wallet_address": user.get("wallet_address", wallet_address),
"email": user.get("email", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now()))
}
logger.info(f"✅ Email added for MetaMask wallet: {wallet_address}")
return jsonify({
"success": True,
"user": user_response,
"message": "Email saved successfully"
})
except Exception as e:
logger.error(f"❌ Error saving MetaMask email: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/verify-token', methods=['POST', 'OPTIONS'])
def verify_token():
"""Validate JWT token and return the latest user payload."""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({"valid": False, "error": "Authorization header required"}), 401
token = auth_header.split('Bearer ')[1]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except jwt.InvalidTokenError:
return jsonify({"valid": False, "error": "Invalid token"}), 401
user = None
wallet_address = payload.get('wallet_address')
email = payload.get('email')
user_id = payload.get('user_id')
if wallet_address:
user = db.users.find_one({"wallet_address": str(wallet_address).lower()})
elif email:
user = db.users.find_one({"email": str(email).lower()})
elif user_id:
try:
from bson.objectid import ObjectId
user = db.users.find_one({"_id": ObjectId(user_id)})
except Exception:
user = None
if not user:
return jsonify({"valid": False, "error": "User not found"}), 404
status = str(user.get("status", "active")).lower().strip()
if status == "banned":
return jsonify({"valid": False, "error": "Account is banned"}), 403
user_response = {
"id": str(user.get("_id", user.get("wallet_address", ""))),
"wallet_address": user.get("wallet_address", ""),
"email": user.get("email", ""),
"username": user.get("username", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now())),
}
return jsonify({"valid": True, "user": user_response})
except Exception as e:
logger.error(f"❌ verify-token error: {str(e)}")
return jsonify({"valid": False, "error": str(e)}), 500
@bp.route('/me', methods=['GET', 'OPTIONS'])
def get_me():
"""Return authenticated user profile for current token."""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
verify_resp = verify_token()
try:
body, status = verify_resp
if status != 200:
return body, status
data = body.get_json()
return jsonify({"success": True, "user": data.get("user", {})})
except Exception:
return verify_resp
@bp.route('/upload-image', methods=['POST', 'OPTIONS'])
def upload_image():
"""Upload and convert image (PNG/JPG only) to base64"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user_id = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
# Check if file is in request
if 'file' not in request.files:
return jsonify({
"success": False,
"error": "No file provided"
}), 400
file = request.files['file']
if file.filename == '':
return jsonify({
"success": False,
"error": "No file selected"
}), 400
# Validate file type - only PNG and JPG
allowed_extensions = {'png', 'jpg', 'jpeg'}
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
if file_ext not in allowed_extensions:
return jsonify({
"success": False,
"error": "Only PNG and JPG formats are allowed"
}), 400
# Validate file size (max 5MB)
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Seek back to start
max_size = 5 * 1024 * 1024 # 5MB
if file_size > max_size:
return jsonify({
"success": False,
"error": "File size must be less than 5MB"
}), 400
# Read file and convert to base64
import base64
file_data = file.read()
base64_image = base64.b64encode(file_data).decode('utf-8')
# Create data URL for the image
mime_type = f"image/{file_ext if file_ext != 'jpg' else 'jpeg'}"
data_url = f"data:{mime_type};base64,{base64_image}"
logger.info(f"✅ Image uploaded for user: {user_id}, size: {file_size} bytes")
return jsonify({
"success": True,
"image": data_url,
"size": file_size,
"message": "Image uploaded successfully"
}), 200
except Exception as e:
logger.error(f"❌ Error uploading image: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500