Files

804 lines
28 KiB
Python

from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from pymongo import MongoClient
import os
import uuid
import jwt
import logging
from eth_account.messages import encode_defunct
from web3 import Web3
from activity_logger import log_user_activity
bp = Blueprint('auth', __name__)
logger = logging.getLogger(__name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
# JWT secret - must be set via environment variable
JWT_SECRET = os.getenv('JWT_SECRET')
if not JWT_SECRET:
import warnings
import tempfile
import stat
import secrets as secrets_module
warnings.warn("JWT_SECRET environment variable not set. Using persistent dev secret.", UserWarning)
def _generate_and_store_secret():
"""Generate a random secret and store it with restrictive permissions."""
return secrets_module.token_hex(32)
# Use persistent file-based secret for development to avoid invalidating tokens on restart
_secret_file = os.path.join(tempfile.gettempdir(), '.openlearnx_dev_jwt_secret_auth')
try:
if os.path.exists(_secret_file):
with open(_secret_file, 'r') as f:
JWT_SECRET = f.read().strip()
if not JWT_SECRET:
JWT_SECRET = _generate_and_store_secret()
with open(_secret_file, 'w') as f:
f.write(JWT_SECRET)
# Set restrictive permissions (owner read/write only)
os.chmod(_secret_file, stat.S_IRUSR | stat.S_IWUSR)
except Exception:
JWT_SECRET = _generate_and_store_secret()
@bp.route('/nonce', methods=['POST', 'OPTIONS'])
def get_nonce():
"""Generate nonce for MetaMask authentication"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
wallet_address = data.get('wallet_address')
if not wallet_address:
return jsonify({
"success": False,
"error": "Wallet address required"
}), 400
# Generate unique nonce
nonce = str(uuid.uuid4())
timestamp = datetime.now().isoformat()
# Create message to sign
message = f"Sign this message to authenticate with OpenLearnX:\n\nNonce: {nonce}\nTimestamp: {timestamp}\nAddress: {wallet_address}"
logger.info(f"🔐 Generated nonce for wallet: {wallet_address}")
return jsonify({
"success": True,
"nonce": nonce,
"message": message,
"timestamp": timestamp
})
except Exception as e:
logger.error(f"❌ Error generating nonce: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/verify', methods=['POST', 'OPTIONS'])
def verify_signature():
"""Verify MetaMask signature and authenticate user"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
wallet_address = data.get('wallet_address')
signature = data.get('signature')
message = data.get('message')
if not all([wallet_address, signature, message]):
return jsonify({
"success": False,
"error": "Wallet address, signature, and message are required"
}), 400
# Verify the signature
try:
# Create the message hash that was signed
message_hash = encode_defunct(text=message)
# Recover the address from the signature
w3 = Web3()
recovered_address = w3.eth.account.recover_message(message_hash, signature=signature)
# Check if recovered address matches the claimed address
if recovered_address.lower() != wallet_address.lower():
return jsonify({
"success": False,
"error": "Signature verification failed"
}), 401
except Exception as e:
logger.error(f"❌ Signature verification error: {str(e)}")
return jsonify({
"success": False,
"error": "Invalid signature"
}), 401
# Check if user exists, create if not
user = db.users.find_one({"wallet_address": wallet_address.lower()})
if not user:
# Create new user
user = {
"wallet_address": wallet_address.lower(),
"role": "student",
"status": "active",
"created_at": datetime.now(),
"last_login": datetime.now(),
"login_count": 1
}
result = db.users.insert_one(user)
user["_id"] = str(result.inserted_id)
logger.info(f"✅ Created new user: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"auth_register",
"Account registered",
"Created account via wallet authentication",
{"auth_method": "wallet"},
)
else:
account_status = str(user.get("status", "active")).lower().strip()
if account_status == "banned":
logger.warning(f"⛔ Banned wallet login blocked: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"account_status",
"Login blocked",
"Login blocked because account is banned",
{"status": "banned"},
)
return jsonify({
"success": False,
"error": "Your account is banned. Contact admin."
}), 403
# Update existing user
db.users.update_one(
{"wallet_address": wallet_address.lower()},
{
"$set": {"last_login": datetime.now()},
"$inc": {"login_count": 1}
}
)
user["_id"] = str(user["_id"])
logger.info(f"✅ Updated existing user: {wallet_address}")
log_user_activity(
db,
wallet_address.lower(),
"auth_login",
"Login successful",
"Wallet login completed successfully",
{"auth_method": "wallet"},
)
# Generate JWT token
token_payload = {
"user_id": user["wallet_address"],
"wallet_address": user["wallet_address"],
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
# Prepare user data for response
user_response = {
"id": user["wallet_address"],
"wallet_address": user["wallet_address"],
"email": user.get("email", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
logger.info(f"✅ Authentication successful for: {wallet_address}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Authentication successful"
})
except Exception as e:
logger.error(f"❌ Error verifying signature: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/register', methods=['POST', 'OPTIONS'])
def register():
"""Register a new user with email and password"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
email = data.get('email', '').strip().lower()
password = data.get('password', '')
username = data.get('username', '').strip()
if not email or not password:
return jsonify({
"success": False,
"error": "Email and password are required"
}), 400
if len(password) < 6:
return jsonify({
"success": False,
"error": "Password must be at least 6 characters"
}), 400
# Check if user already exists
existing_user = db.users.find_one({"email": email})
if existing_user:
return jsonify({
"success": False,
"error": "Email already registered"
}), 409
# Hash password using simple approach for development
# TODO: Use werkzeug.security.generate_password_hash for production
import hashlib
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Create new user
user = {
"email": email,
"username": username or email.split("@")[0],
"password_hash": password_hash,
"name": "",
"bio": "",
"avatar": "",
"role": "student",
"status": "active",
"created_at": datetime.now(),
"last_login": datetime.now(),
"login_count": 1,
"auth_method": "email"
}
result = db.users.insert_one(user)
user["_id"] = str(result.inserted_id)
# Generate JWT token
token_payload = {
"user_id": str(result.inserted_id),
"email": email,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
user_response = {
"id": str(result.inserted_id),
"email": email,
"username": username or email.split("@")[0],
"name": "",
"bio": "",
"avatar": "",
"role": "student",
"status": "active",
"created_at": user["created_at"].isoformat(),
"last_login": user["last_login"].isoformat()
}
log_user_activity(
db,
str(result.inserted_id),
"auth_register",
"Account registered",
"Created account with email and password",
{"auth_method": "email"},
)
logger.info(f"✅ New user registered: {email}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Registration successful"
}), 201
except Exception as e:
logger.error(f"❌ Error during registration: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/login', methods=['POST', 'OPTIONS'])
def login():
"""Login with email and password"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
data = request.get_json()
email = data.get('email', '').strip().lower()
password = data.get('password', '')
if not email or not password:
return jsonify({
"success": False,
"error": "Email and password are required"
}), 400
# Find user by email
user = db.users.find_one({"email": email})
if not user:
return jsonify({
"success": False,
"error": "Invalid email or password"
}), 401
account_status = str(user.get("status", "active")).lower().strip()
if account_status == "banned":
logger.warning(f"⛔ Banned email login blocked: {email}")
log_user_activity(
db,
str(user.get("_id")),
"account_status",
"Login blocked",
"Login blocked because account is banned",
{"status": "banned", "email": email},
)
return jsonify({
"success": False,
"error": "Your account is banned. Contact admin."
}), 403
if account_status == "suspended":
log_user_activity(
db,
str(user.get("_id")),
"account_status",
"Login attempted while suspended",
"User logged in while account status is suspended",
{"status": "suspended", "email": email},
)
# Verify password
import hashlib
password_hash = hashlib.sha256(password.encode()).hexdigest()
if password_hash != user.get('password_hash'):
return jsonify({
"success": False,
"error": "Invalid email or password"
}), 401
# Update last login
db.users.update_one(
{"_id": user["_id"]},
{
"$set": {"last_login": datetime.now()},
"$inc": {"login_count": 1}
}
)
# Generate JWT token
token_payload = {
"user_id": str(user["_id"]),
"email": email,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(token_payload, JWT_SECRET, algorithm="HS256")
user_response = {
"id": str(user["_id"]),
"email": email,
"username": user.get('username', ''),
"name": user.get('name', ''),
"bio": user.get('bio', ''),
"avatar": user.get('avatar', ''),
"role": user.get('role', 'student'),
"status": user.get('status', 'active'),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
log_user_activity(
db,
str(user.get("_id")),
"auth_login",
"Login successful",
"Email login completed successfully",
{"auth_method": "email", "email": email},
)
logger.info(f"✅ User logged in: {email}")
return jsonify({
"success": True,
"token": token,
"user": user_response,
"message": "Login successful"
})
except Exception as e:
logger.error(f"❌ Error during login: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/profile/update', methods=['POST', 'OPTIONS'])
def update_profile():
"""Update user profile (name, bio, avatar)"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user_id = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
data = request.get_json()
name = data.get('name', '').strip()
bio = data.get('bio', '').strip()
avatar = data.get('avatar', '').strip()
# Update user profile
from bson.objectid import ObjectId
result = db.users.update_one(
{"_id": ObjectId(user_id)},
{
"$set": {
"name": name,
"bio": bio,
"avatar": avatar,
"updated_at": datetime.now()
}
}
)
if result.matched_count == 0:
return jsonify({
"success": False,
"error": "User not found"
}), 404
# Get updated user
user = db.users.find_one({"_id": ObjectId(user_id)})
user_response = {
"id": str(user["_id"]),
"email": user.get('email', ''),
"username": user.get('username', ''),
"name": user.get('name', ''),
"bio": user.get('bio', ''),
"avatar": user.get('avatar', ''),
"role": user.get('role', 'student'),
"status": user.get('status', 'active'),
"created_at": user["created_at"].isoformat() if isinstance(user["created_at"], datetime) else str(user["created_at"]),
"last_login": user["last_login"].isoformat() if isinstance(user["last_login"], datetime) else str(user["last_login"])
}
logger.info(f"✅ Profile updated for user: {user_id}")
return jsonify({
"success": True,
"user": user_response,
"message": "Profile updated successfully"
})
except Exception as e:
logger.error(f"❌ Error updating profile: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/metamask/add-email', methods=['POST', 'OPTIONS'])
def add_metamask_email():
"""Store contact email for MetaMask wallet"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
wallet_address = payload.get('wallet_address')
if not wallet_address:
wallet_address = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
data = request.get_json()
email = data.get('email', '').strip().lower()
name = data.get('name', '').strip()
if not email:
return jsonify({
"success": False,
"error": "Email is required"
}), 400
# Validate email format
import re
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return jsonify({
"success": False,
"error": "Invalid email format"
}), 400
# Check if email already used by different wallet
existing_user = db.users.find_one({"email": email, "wallet_address": {"$ne": wallet_address.lower()}})
if existing_user:
return jsonify({
"success": False,
"error": "Email already associated with another wallet"
}), 409
# Update user with email and name
from bson.objectid import ObjectId
# Try updating by wallet address first (for new users)
result = db.users.update_one(
{"wallet_address": wallet_address.lower()},
{
"$set": {
"email": email,
"name": name or "",
"updated_at": datetime.now()
}
}
)
if result.matched_count == 0:
return jsonify({
"success": False,
"error": "User not found"
}), 404
# Get updated user
user = db.users.find_one({"wallet_address": wallet_address.lower()})
user_response = {
"id": str(user.get("_id", wallet_address)),
"wallet_address": user.get("wallet_address", wallet_address),
"email": user.get("email", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now()))
}
logger.info(f"✅ Email added for MetaMask wallet: {wallet_address}")
return jsonify({
"success": True,
"user": user_response,
"message": "Email saved successfully"
})
except Exception as e:
logger.error(f"❌ Error saving MetaMask email: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@bp.route('/verify-token', methods=['POST', 'OPTIONS'])
def verify_token():
"""Validate JWT token and return the latest user payload."""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({"valid": False, "error": "Authorization header required"}), 401
token = auth_header.split('Bearer ')[1]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except jwt.InvalidTokenError:
return jsonify({"valid": False, "error": "Invalid token"}), 401
user = None
wallet_address = payload.get('wallet_address')
email = payload.get('email')
user_id = payload.get('user_id')
if wallet_address:
user = db.users.find_one({"wallet_address": str(wallet_address).lower()})
elif email:
user = db.users.find_one({"email": str(email).lower()})
elif user_id:
try:
from bson.objectid import ObjectId
user = db.users.find_one({"_id": ObjectId(user_id)})
except Exception:
user = None
if not user:
return jsonify({"valid": False, "error": "User not found"}), 404
status = str(user.get("status", "active")).lower().strip()
if status == "banned":
return jsonify({"valid": False, "error": "Account is banned"}), 403
user_response = {
"id": str(user.get("_id", user.get("wallet_address", ""))),
"wallet_address": user.get("wallet_address", ""),
"email": user.get("email", ""),
"username": user.get("username", ""),
"name": user.get("name", ""),
"bio": user.get("bio", ""),
"avatar": user.get("avatar", ""),
"role": user.get("role", "student"),
"status": user.get("status", "active"),
"created_at": user.get("created_at", datetime.now()).isoformat() if isinstance(user.get("created_at"), datetime) else str(user.get("created_at", datetime.now())),
"last_login": user.get("last_login", datetime.now()).isoformat() if isinstance(user.get("last_login"), datetime) else str(user.get("last_login", datetime.now())),
}
return jsonify({"valid": True, "user": user_response})
except Exception as e:
logger.error(f"❌ verify-token error: {str(e)}")
return jsonify({"valid": False, "error": str(e)}), 500
@bp.route('/me', methods=['GET', 'OPTIONS'])
def get_me():
"""Return authenticated user profile for current token."""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
verify_resp = verify_token()
try:
body, status = verify_resp
if status != 200:
return body, status
data = body.get_json()
return jsonify({"success": True, "user": data.get("user", {})})
except Exception:
return verify_resp
@bp.route('/upload-image', methods=['POST', 'OPTIONS'])
def upload_image():
"""Upload and convert image (PNG/JPG only) to base64"""
if request.method == "OPTIONS":
return jsonify({'status': 'ok'})
try:
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"success": False,
"error": "Authorization header required"
}), 401
token = auth_header.split('Bearer ')[1]
# Verify and decode token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user_id = payload.get('user_id')
except jwt.InvalidTokenError:
return jsonify({
"success": False,
"error": "Invalid token"
}), 401
# Check if file is in request
if 'file' not in request.files:
return jsonify({
"success": False,
"error": "No file provided"
}), 400
file = request.files['file']
if file.filename == '':
return jsonify({
"success": False,
"error": "No file selected"
}), 400
# Validate file type - only PNG and JPG
allowed_extensions = {'png', 'jpg', 'jpeg'}
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
if file_ext not in allowed_extensions:
return jsonify({
"success": False,
"error": "Only PNG and JPG formats are allowed"
}), 400
# Validate file size (max 5MB)
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Seek back to start
max_size = 5 * 1024 * 1024 # 5MB
if file_size > max_size:
return jsonify({
"success": False,
"error": "File size must be less than 5MB"
}), 400
# Read file and convert to base64
import base64
file_data = file.read()
base64_image = base64.b64encode(file_data).decode('utf-8')
# Create data URL for the image
mime_type = f"image/{file_ext if file_ext != 'jpg' else 'jpeg'}"
data_url = f"data:{mime_type};base64,{base64_image}"
logger.info(f"✅ Image uploaded for user: {user_id}, size: {file_size} bytes")
return jsonify({
"success": True,
"image": data_url,
"size": file_size,
"message": "Image uploaded successfully"
}), 200
except Exception as e:
logger.error(f"❌ Error uploading image: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500