update & add

This commit is contained in:
5t4l1n
2025-07-27 03:54:54 +05:30
parent cc16c970d6
commit 0a63d19b59
24 changed files with 6298 additions and 953 deletions
+690
View File
@@ -0,0 +1,690 @@
{
"contract_address": "0xC2FE2F49B3a1384aEdFAae127F054FAf216eF684",
"transaction_hash": "0xfe5a433dae316bd2d60b7190c21866a1fde30777f08d9d37e403ed642433fa28",
"deployer": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266",
"network": "local",
"abi": [
{
"type": "constructor",
"inputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "approve",
"inputs": [
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "balanceOf",
"inputs": [
{
"name": "owner",
"type": "address",
"internalType": "address"
}
],
"outputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "certificates",
"inputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "subject",
"type": "string",
"internalType": "string"
},
{
"name": "studentName",
"type": "string",
"internalType": "string"
},
{
"name": "score",
"type": "uint256",
"internalType": "uint256"
},
{
"name": "timestamp",
"type": "uint256",
"internalType": "uint256"
},
{
"name": "verified",
"type": "bool",
"internalType": "bool"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "getApproved",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "address",
"internalType": "address"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "getCertificate",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "tuple",
"internalType": "struct CertificateNFT.Certificate",
"components": [
{
"name": "subject",
"type": "string",
"internalType": "string"
},
{
"name": "studentName",
"type": "string",
"internalType": "string"
},
{
"name": "score",
"type": "uint256",
"internalType": "uint256"
},
{
"name": "timestamp",
"type": "uint256",
"internalType": "uint256"
},
{
"name": "verified",
"type": "bool",
"internalType": "bool"
}
]
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "getUserCertificates",
"inputs": [
{
"name": "user",
"type": "address",
"internalType": "address"
}
],
"outputs": [
{
"name": "",
"type": "uint256[]",
"internalType": "uint256[]"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "isApprovedForAll",
"inputs": [
{
"name": "owner",
"type": "address",
"internalType": "address"
},
{
"name": "operator",
"type": "address",
"internalType": "address"
}
],
"outputs": [
{
"name": "",
"type": "bool",
"internalType": "bool"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "mintCertificate",
"inputs": [
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "_tokenURI",
"type": "string",
"internalType": "string"
}
],
"outputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "mintCertificateWithDetails",
"inputs": [
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "_tokenURI",
"type": "string",
"internalType": "string"
},
{
"name": "subject",
"type": "string",
"internalType": "string"
},
{
"name": "studentName",
"type": "string",
"internalType": "string"
},
{
"name": "score",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "name",
"inputs": [],
"outputs": [
{
"name": "",
"type": "string",
"internalType": "string"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "owner",
"inputs": [],
"outputs": [
{
"name": "",
"type": "address",
"internalType": "address"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "ownerOf",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "address",
"internalType": "address"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "renounceOwnership",
"inputs": [],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "safeTransferFrom",
"inputs": [
{
"name": "from",
"type": "address",
"internalType": "address"
},
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "safeTransferFrom",
"inputs": [
{
"name": "from",
"type": "address",
"internalType": "address"
},
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
},
{
"name": "data",
"type": "bytes",
"internalType": "bytes"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "setApprovalForAll",
"inputs": [
{
"name": "operator",
"type": "address",
"internalType": "address"
},
{
"name": "approved",
"type": "bool",
"internalType": "bool"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "supportsInterface",
"inputs": [
{
"name": "interfaceId",
"type": "bytes4",
"internalType": "bytes4"
}
],
"outputs": [
{
"name": "",
"type": "bool",
"internalType": "bool"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "symbol",
"inputs": [],
"outputs": [
{
"name": "",
"type": "string",
"internalType": "string"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "tokenURI",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "string",
"internalType": "string"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "totalSupply",
"inputs": [],
"outputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "transferFrom",
"inputs": [
{
"name": "from",
"type": "address",
"internalType": "address"
},
{
"name": "to",
"type": "address",
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "transferOwnership",
"inputs": [
{
"name": "newOwner",
"type": "address",
"internalType": "address"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "function",
"name": "userCertificates",
"inputs": [
{
"name": "",
"type": "address",
"internalType": "address"
},
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "uint256",
"internalType": "uint256"
}
],
"stateMutability": "view"
},
{
"type": "function",
"name": "verifyCertificate",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"internalType": "uint256"
}
],
"outputs": [
{
"name": "",
"type": "bool",
"internalType": "bool"
}
],
"stateMutability": "view"
},
{
"type": "event",
"name": "Approval",
"inputs": [
{
"name": "owner",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "approved",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"indexed": true,
"internalType": "uint256"
}
],
"anonymous": false
},
{
"type": "event",
"name": "ApprovalForAll",
"inputs": [
{
"name": "owner",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "operator",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "approved",
"type": "bool",
"indexed": false,
"internalType": "bool"
}
],
"anonymous": false
},
{
"type": "event",
"name": "BatchMetadataUpdate",
"inputs": [
{
"name": "_fromTokenId",
"type": "uint256",
"indexed": false,
"internalType": "uint256"
},
{
"name": "_toTokenId",
"type": "uint256",
"indexed": false,
"internalType": "uint256"
}
],
"anonymous": false
},
{
"type": "event",
"name": "CertificateMinted",
"inputs": [
{
"name": "tokenId",
"type": "uint256",
"indexed": true,
"internalType": "uint256"
},
{
"name": "student",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "subject",
"type": "string",
"indexed": false,
"internalType": "string"
},
{
"name": "score",
"type": "uint256",
"indexed": false,
"internalType": "uint256"
},
{
"name": "tokenURI",
"type": "string",
"indexed": false,
"internalType": "string"
}
],
"anonymous": false
},
{
"type": "event",
"name": "MetadataUpdate",
"inputs": [
{
"name": "_tokenId",
"type": "uint256",
"indexed": false,
"internalType": "uint256"
}
],
"anonymous": false
},
{
"type": "event",
"name": "OwnershipTransferred",
"inputs": [
{
"name": "previousOwner",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "newOwner",
"type": "address",
"indexed": true,
"internalType": "address"
}
],
"anonymous": false
},
{
"type": "event",
"name": "Transfer",
"inputs": [
{
"name": "from",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "to",
"type": "address",
"indexed": true,
"internalType": "address"
},
{
"name": "tokenId",
"type": "uint256",
"indexed": true,
"internalType": "uint256"
}
],
"anonymous": false
}
],
"gas_used": 3387337,
"block_number": 22994809,
"status": 1
}
+11
View File
@@ -0,0 +1,11 @@
[profile.default]
src = "contracts"
out = "out"
libs = ["lib"]
remappings = [
"@openzeppelin/=lib/openzeppelin-contracts/"
]
[rpc_endpoints]
local = "http://127.0.0.1:8545"
sepolia = "https://sepolia.infura.io/v3/${INFURA_API_KEY}"
File diff suppressed because it is too large Load Diff
+30
View File
@@ -0,0 +1,30 @@
from bson import ObjectId
from datetime import datetime
from pymongo.collection import Collection
class UserModel:
def __init__(self, collection: Collection):
self.collection = collection
async def get_by_wallet(self, wallet_address: str):
return await self.collection.find_one({"wallet_address": wallet_address.lower()})
async def create_user(self, wallet_address: str):
now = datetime.utcnow()
user = {
"wallet_address": wallet_address.lower(),
"created_at": now,
"last_login": now,
"total_tests": 0,
"certificates": []
}
result = await self.collection.insert_one(user)
user["_id"] = result.inserted_id
return user
async def update_last_login(self, wallet_address: str):
now = datetime.utcnow()
await self.collection.update_one(
{"wallet_address": wallet_address.lower()},
{"$set": {"last_login": now}}
)
+142
View File
@@ -0,0 +1,142 @@
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ServerSelectionTimeoutError
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
class MongoService:
def __init__(self, uri: str):
self.uri = uri # Store URI for sync operations
try:
# Simple connection without custom SSL context
self.client = AsyncIOMotorClient(
uri,
serverSelectionTimeoutMS=30000,
connectTimeoutMS=30000,
socketTimeoutMS=30000
)
print("MongoDB client initialized successfully")
except Exception as e:
print(f"MongoDB connection failed: {e}")
# Fallback to basic connection
self.client = AsyncIOMotorClient(uri)
self.db = self.client.openlearnx
# Collections
self.users = self.db.users
self.questions = self.db.questions
self.test_sessions = self.db.test_sessions
self.certificates = self.db.certificates
self.peer_reviews = self.db.peer_reviews
async def init_db(self):
"""Initialize database with indexes and sample data"""
try:
# Test connection first
await self.client.admin.command('ping')
print("MongoDB connection successful!")
# Create indexes
await self.users.create_index("wallet_address", unique=True)
await self.users.create_index("email", unique=True, sparse=True)
await self.questions.create_index("subject")
await self.questions.create_index("difficulty")
await self.test_sessions.create_index("user_id")
await self.test_sessions.create_index("created_at")
await self.certificates.create_index("user_id")
await self.certificates.create_index("token_id", unique=True)
# Insert sample questions if none exist
if await self.questions.count_documents({}) == 0:
await self.insert_sample_questions()
print("Sample questions inserted successfully")
except ServerSelectionTimeoutError as e:
print(f"Failed to connect to MongoDB: {e}")
print("Continuing without database initialization...")
except Exception as e:
print(f"Database initialization error: {e}")
print("Continuing without database initialization...")
async def get_user_by_wallet(self, wallet_address: str):
"""Get user by wallet address"""
return await self.users.find_one({"wallet_address": wallet_address.lower()})
async def create_user(self, wallet_address: str):
"""Create a new user"""
now = datetime.utcnow()
user = {
"wallet_address": wallet_address.lower(),
"created_at": now,
"last_login": now,
"total_tests": 0,
"certificates": []
}
result = await self.users.insert_one(user)
user["_id"] = result.inserted_id
return user
async def update_user_login(self, wallet_address: str):
"""Update user's last login time"""
await self.users.update_one(
{"wallet_address": wallet_address.lower()},
{"$set": {"last_login": datetime.utcnow()}}
)
async def insert_sample_questions(self):
"""Insert sample questions - implement based on your needs"""
# You'll need to implement this method based on your question structure
sample_questions = [
{
"subject": "Python",
"difficulty": "beginner",
"question": "What is a variable in Python?",
"options": ["A storage location", "A function", "A loop", "A condition"],
"correct_answer": 0,
"created_at": datetime.utcnow()
},
# Add more sample questions as needed
]
await self.questions.insert_many(sample_questions)
async def close_connection(self):
"""Close the database connection"""
if self.client:
self.client.close()
print("MongoDB connection closed")
def create_user_sync(self, wallet_address: str):
"""Synchronous user creation using pymongo instead of motor"""
import pymongo
# Create a synchronous connection for this operation only
client = pymongo.MongoClient(self.uri)
db = client.openlearnx
users = db.users
try:
# Check if user exists
user = users.find_one({"wallet_address": wallet_address.lower()})
if not user:
# Create new user
new_user = {
"wallet_address": wallet_address.lower(),
"created_at": datetime.utcnow(),
"last_login": datetime.utcnow(),
"total_tests": 0,
"certificates": []
}
result = users.insert_one(new_user)
new_user["_id"] = result.inserted_id
return new_user
else:
# Update last login
users.update_one(
{"wallet_address": wallet_address.lower()},
{"$set": {"last_login": datetime.utcnow()}}
)
return user
finally:
# Always close the connection
client.close()
+428
View File
@@ -0,0 +1,428 @@
from flask import Blueprint, request, jsonify
from functools import wraps
import uuid
from datetime import datetime
from pymongo import MongoClient
import os
from bson import ObjectId
bp = Blueprint('admin', __name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
auth_header = request.headers.get('Authorization')
print(f"Admin auth check - Header: {auth_header}")
if not auth_header:
print("❌ No Authorization header")
return jsonify({"error": "No authorization header provided"}), 401
if not auth_header.startswith('Bearer '):
print("❌ Invalid authorization format")
return jsonify({"error": "Invalid authorization format"}), 401
token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else None
print(f"Extracted token: '{token}'")
# Check environment variable first, then fallback to default
expected_token = os.getenv('ADMIN_TOKEN')
if not expected_token:
expected_token = 'admin-secret-key'
print(f"Expected token: '{expected_token}'")
print(f"Environment ADMIN_TOKEN: '{os.getenv('ADMIN_TOKEN')}'")
# Strip any whitespace from both tokens
if token and expected_token:
if token.strip() == expected_token.strip():
print("✅ Admin authentication successful")
return f(*args, **kwargs)
print("❌ Token mismatch")
return jsonify({"error": "Invalid admin token"}), 401
except Exception as e:
print(f"❌ Admin auth error: {str(e)}")
return jsonify({"error": "Authentication failed"}), 500
return decorated_function
def serialize_course(course):
"""Convert MongoDB document to JSON-serializable format"""
if course:
if '_id' in course:
del course['_id']
return course
return None
def convert_to_embed_url(youtube_url):
"""Convert YouTube watch URL to embed URL - ENHANCED VERSION"""
if not youtube_url:
return None
try:
if "youtu.be/" in youtube_url:
video_id = youtube_url.split("youtu.be/")[1].split("?")[0].split("&")[0]
elif "youtube.com/watch?v=" in youtube_url:
video_id = youtube_url.split("v=")[1].split("&")[0]
elif "youtube.com/embed/" in youtube_url:
return youtube_url
else:
return None
video_id = video_id.strip()
return f"https://www.youtube.com/embed/{video_id}?rel=0&modestbranding=1"
except Exception as e:
print(f"Error converting YouTube URL: {e}")
return None
@bp.route("/test", methods=["GET"])
@admin_required
def test_admin():
"""Test admin authentication"""
return jsonify({
"success": True,
"message": "Admin authentication working",
"timestamp": datetime.now().isoformat()
})
@bp.route("/dashboard", methods=["GET"])
@admin_required
def admin_dashboard():
"""Get admin dashboard statistics"""
try:
total_courses = db.courses.count_documents({})
total_lessons = db.lessons.count_documents({})
active_students = db.users.count_documents({"status": "active"}) or 2341
stats = {
"total_courses": total_courses,
"total_lessons": total_lessons,
"active_students": active_students,
"completion_rate": 78
}
return jsonify(stats)
except Exception as e:
print(f"Dashboard error: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/courses", methods=["GET"])
@admin_required
def get_admin_courses():
"""Get all courses for admin management"""
try:
print("Fetching courses from database...")
courses = list(db.courses.find({}, {"_id": 0}))
print(f"Found {len(courses)} courses")
for course in courses:
course["students"] = course.get("students", 0)
course["status"] = "published"
return jsonify(courses)
except Exception as e:
print(f"Error fetching courses: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/courses", methods=["POST"])
@admin_required
def create_course():
"""Create new course"""
try:
data = request.json
print(f"Creating course with data: {data}") # Debug log
course_id = data.get('id') or f"{data.get('title', '').lower().replace(' ', '-').replace('&', 'and')}-course"
existing_course = db.courses.find_one({"id": course_id})
if existing_course:
return jsonify({"error": "Course with this ID already exists"}), 400
new_course = {
"id": course_id,
"title": data.get('title'),
"subject": data.get('subject'),
"description": data.get('description'),
"difficulty": data.get('difficulty'),
"mentor": data.get('mentor', '5t4l1n'),
"video_url": data.get('video_url'),
"embed_url": convert_to_embed_url(data.get('video_url')) if data.get('video_url') else None,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"students": 0,
"progress": 0,
"modules": []
}
result = db.courses.insert_one(new_course)
print(f"Course created with ID: {result.inserted_id}")
# Remove _id field before returning
new_course_response = serialize_course(new_course)
return jsonify({"success": True, "course": new_course_response}), 201
except Exception as e:
print(f"Error creating course: {e}")
return jsonify({"error": str(e)}), 500
@bp.route("/courses/<course_id>", methods=["PUT"])
@admin_required
def update_course(course_id):
"""Update existing course - FIXED VERSION"""
try:
data = request.json
print(f"Updating course {course_id} with data: {data}") # Debug log
update_data = {
"title": data.get('title'),
"subject": data.get('subject'),
"description": data.get('description'),
"difficulty": data.get('difficulty'),
"mentor": data.get('mentor'),
"video_url": data.get('video_url'),
"embed_url": convert_to_embed_url(data.get('video_url')) if data.get('video_url') else None,
"updated_at": datetime.now().isoformat()
}
# Remove None values
update_data = {k: v for k, v in update_data.items() if v is not None}
print(f"Filtered update data: {update_data}") # Debug log
result = db.courses.update_one(
{"id": course_id},
{"$set": update_data}
)
print(f"Update result: matched={result.matched_count}, modified={result.modified_count}") # Debug log
if result.matched_count == 0:
return jsonify({"error": "Course not found"}), 404
# Get updated course without _id field
updated_course = db.courses.find_one({"id": course_id}, {"_id": 0})
return jsonify({"success": True, "course": updated_course})
except Exception as e:
print(f"Error updating course: {e}")
return jsonify({"error": str(e)}), 500
@bp.route("/courses/<course_id>", methods=["DELETE"])
@admin_required
def delete_course(course_id):
"""Delete course"""
try:
print(f"Deleting course: {course_id}") # Debug log
result = db.courses.delete_one({"id": course_id})
if result.deleted_count == 0:
return jsonify({"error": "Course not found"}), 404
# Also delete related lessons
lesson_result = db.lessons.delete_many({"course_id": course_id})
print(f"Deleted {lesson_result.deleted_count} related lessons") # Debug log
return jsonify({"success": True, "message": "Course deleted successfully"})
except Exception as e:
print(f"Error deleting course: {e}")
return jsonify({"error": str(e)}), 500
@bp.route("/courses/<course_id>/modules", methods=["POST"])
@admin_required
def add_module(course_id):
"""Add module to course"""
try:
data = request.json
module = {
"id": data.get('id') or str(uuid.uuid4()),
"title": data.get('title'),
"lessons": []
}
result = db.courses.update_one(
{"id": course_id},
{"$push": {"modules": module}}
)
if result.matched_count == 0:
return jsonify({"error": "Course not found"}), 404
return jsonify({"success": True, "module": module})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/courses/<course_id>/lessons", methods=["POST"])
@admin_required
def add_lesson(course_id):
"""Add lesson to course"""
try:
data = request.json
lesson = {
"id": data.get('id') or str(uuid.uuid4()),
"course_id": course_id,
"title": data.get('title'),
"type": data.get('type', 'video'),
"duration": data.get('duration'),
"description": data.get('description'),
"content": data.get('content'),
"video_url": data.get('video_url'),
"embed_url": convert_to_embed_url(data.get('video_url')) if data.get('video_url') else None,
"created_at": datetime.now().isoformat()
}
# Insert lesson
db.lessons.insert_one(lesson)
# Remove _id field before returning
lesson_response = serialize_course(lesson)
return jsonify({"success": True, "lesson": lesson_response})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/initialize", methods=["POST"])
@admin_required
def initialize_default_courses():
"""Initialize database with default courses"""
try:
existing_count = db.courses.count_documents({})
if existing_count > 0:
return jsonify({"message": f"Courses already initialized ({existing_count} courses found)"}), 200
default_courses = [
{
"id": "python-course",
"title": "Python Programming Mastery",
"subject": "Programming",
"description": "Learn Python from basics to advanced concepts including turtle graphics",
"difficulty": "Beginner to Advanced",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp",
"embed_url": "https://www.youtube.com/embed/SsH8GJlqUIg?rel=0&modestbranding=1",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"students": 1250,
"progress": 0,
"modules": []
},
{
"id": "java-course",
"title": "Java Development Bootcamp",
"subject": "Programming",
"description": "Master Java programming with object-oriented concepts",
"difficulty": "Intermediate",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp",
"embed_url": "https://www.youtube.com/embed/SsH8GJlqUIg?rel=0&modestbranding=1",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"students": 890,
"progress": 0,
"modules": []
},
{
"id": "ethical-hacking-course",
"title": "Ethical Hacking & Cybersecurity",
"subject": "Cybersecurity",
"description": "Learn ethical hacking techniques and penetration testing",
"difficulty": "Advanced",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/cDnX0vyNTaE?si=ZXNI4hv2HlWN7eCS",
"embed_url": "https://www.youtube.com/embed/cDnX0vyNTaE?rel=0&modestbranding=1",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"students": 567,
"progress": 0,
"modules": []
},
{
"id": "dark-web-hosting-course",
"title": "Learn Dark Web Hosting",
"subject": "Cybersecurity",
"description": "Understanding dark web infrastructure, Tor networks, and secure hosting practices for cybersecurity professionals",
"difficulty": "Expert",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/Z4_USAMVhYs?si=Y_ThVisph5ekM44U",
"embed_url": "https://www.youtube.com/embed/Z4_USAMVhYs?rel=0&modestbranding=1",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"students": 234,
"progress": 0,
"modules": []
}
]
result = db.courses.insert_many(default_courses)
print(f"Initialized {len(result.inserted_ids)} default courses")
return jsonify({
"success": True,
"message": f"Default courses initialized successfully",
"courses_created": len(result.inserted_ids)
})
except Exception as e:
print(f"Error initializing courses: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/stats", methods=["GET"])
@admin_required
def get_admin_stats():
"""Get detailed admin statistics"""
try:
total_courses = db.courses.count_documents({})
total_lessons = db.lessons.count_documents({})
# Course statistics by subject
pipeline = [
{"$group": {"_id": "$subject", "count": {"$sum": 1}}}
]
subjects = list(db.courses.aggregate(pipeline))
# Course statistics by difficulty
pipeline = [
{"$group": {"_id": "$difficulty", "count": {"$sum": 1}}}
]
difficulties = list(db.courses.aggregate(pipeline))
stats = {
"total_courses": total_courses,
"total_lessons": total_lessons,
"subjects": subjects,
"difficulties": difficulties,
"last_updated": datetime.now().isoformat()
}
return jsonify(stats)
except Exception as e:
print(f"Error getting stats: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/health", methods=["GET"])
def admin_health():
"""Admin health check endpoint"""
return jsonify({
"status": "Admin API is healthy",
"timestamp": datetime.now().isoformat(),
"database_connected": True,
"endpoints": [
"GET /api/admin/dashboard",
"GET /api/admin/courses",
"POST /api/admin/courses",
"PUT /api/admin/courses/<id>",
"DELETE /api/admin/courses/<id>",
"POST /api/admin/initialize",
"GET /api/admin/test",
"GET /api/admin/stats"
]
})
+90
View File
@@ -0,0 +1,90 @@
from flask import Blueprint, request, jsonify, current_app
import jwt
from datetime import datetime, timedelta
import secrets
bp = Blueprint("auth", __name__)
# Store nonces temporarily (in production, use Redis or database)
nonces = {}
@bp.route("/nonce", methods=["POST"])
def get_nonce():
data = request.get_json()
wallet_address = data.get("wallet_address")
if not wallet_address:
return jsonify({"error": "wallet_address is required"}), 400
# Generate nonce
nonce = secrets.token_hex(16)
message = f"Sign this message to authenticate with OpenLearnX: {nonce}"
# Store nonce for this wallet address
nonces[wallet_address.lower()] = nonce
return jsonify({"nonce": nonce, "message": message})
@bp.route("/verify", methods=["POST"])
def verify_signature():
data = request.get_json()
wallet_address = data.get("wallet_address", "").lower()
signature = data.get("signature")
message = data.get("message")
if not all([wallet_address, signature, message]):
return jsonify({"error": "Missing required fields"}), 400
# Verify nonce
stored_nonce = nonces.get(wallet_address)
if not stored_nonce or stored_nonce not in message:
return jsonify({"error": "Invalid nonce"}), 400
try:
web3_service = current_app.config["WEB3_SERVICE"]
# Verify signature
if not web3_service.verify_signature(wallet_address, message, signature):
return jsonify({"error": "Invalid signature"}), 401
# For now, create a mock user without database operations
# This bypasses the async MongoDB issues entirely
user = {
"_id": f"user_{wallet_address}",
"wallet_address": wallet_address,
"created_at": datetime.utcnow(),
"total_tests": 0,
"certificates": []
}
# Create JWT token
token_payload = {
"user_id": str(user["_id"]),
"wallet_address": wallet_address,
"exp": datetime.utcnow() + timedelta(days=7)
}
token = jwt.encode(
token_payload,
current_app.config["SECRET_KEY"],
algorithm="HS256"
)
# Clean up nonce
if wallet_address in nonces:
del nonces[wallet_address]
return jsonify({
"success": True,
"token": token,
"user": {
"id": str(user["_id"]),
"wallet_address": user["wallet_address"],
"total_tests": user.get("total_tests", 0),
"certificates": len(user.get("certificates", []))
}
})
except Exception as e:
print(f"Authentication error: {str(e)}")
return jsonify({"error": "Authentication failed"}), 500
+49
View File
@@ -0,0 +1,49 @@
from flask import Blueprint, request, jsonify, current_app
import jwt
bp = Blueprint('certificate', __name__)
def get_user_from_token(token):
"""Extract user from JWT token"""
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
return payload['user_id'], payload['wallet_address']
except:
return None, None
@bp.route('/user/<user_id>', methods=['GET'])
async def get_user_certificates(user_id):
"""Get all certificates for a user"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
token_user_id, _ = get_user_from_token(token)
if not token_user_id or token_user_id != user_id:
return jsonify({"error": "Unauthorized"}), 403
mongo_service = current_app.config['MONGO_SERVICE']
certificates = await mongo_service.get_user_certificates(user_id)
return jsonify({"certificates": certificates or []})
@bp.route('/mint', methods=['POST'])
async def mint_certificate():
"""Mint NFT certificate for completed test"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id, wallet_address = get_user_from_token(token)
if not user_id:
return jsonify({"error": "Authentication required"}), 401
# Mock certificate minting for now
return jsonify({
"success": True,
"certificate": {
"token_id": 1,
"transaction_hash": "0x123...",
"message": "Certificate minting functionality ready"
}
})
+240
View File
@@ -0,0 +1,240 @@
from flask import Blueprint, request, jsonify, session
from functools import wraps
import subprocess
import tempfile
import os
import time
import uuid
from datetime import datetime
import docker
import psutil
bp = Blueprint('coding', __name__)
def secure_execution_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Check if user is in secure coding mode
if not session.get('secure_coding_mode'):
return jsonify({"error": "Secure coding mode required"}), 403
return f(*args, **kwargs)
return decorated_function
@bp.route("/start-session", methods=["POST"])
def start_coding_session():
"""Start a secure coding session"""
try:
data = request.json
course_id = data.get('course_id')
lesson_id = data.get('lesson_id')
session_id = str(uuid.uuid4())
session['coding_session_id'] = session_id
session['secure_coding_mode'] = True
session['start_time'] = datetime.now().isoformat()
session['course_id'] = course_id
session['lesson_id'] = lesson_id
return jsonify({
"success": True,
"session_id": session_id,
"message": "Secure coding session started",
"restrictions": {
"copy_paste_disabled": True,
"browser_locked": True,
"extensions_blocked": True,
"virtual_detection": True
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/execute", methods=["POST"])
@secure_execution_required
def execute_code():
"""Execute code securely in isolated environment"""
try:
data = request.json
code = data.get('code')
language = data.get('language', 'python')
test_cases = data.get('test_cases', [])
if not code:
return jsonify({"error": "No code provided"}), 400
# Log coding attempt
log_coding_attempt(session['coding_session_id'], code, language)
# Execute code in secure container
result = execute_in_container(code, language, test_cases)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/submit-test", methods=["POST"])
@secure_execution_required
def submit_coding_test():
"""Submit coding test for evaluation"""
try:
data = request.json
code = data.get('code')
problem_id = data.get('problem_id')
# Validate against test cases
test_result = validate_test_submission(code, problem_id)
# Store submission
submission_id = store_submission(
session['coding_session_id'],
session['course_id'],
problem_id,
code,
test_result
)
return jsonify({
"success": True,
"submission_id": submission_id,
"score": test_result['score'],
"passed_tests": test_result['passed'],
"total_tests": test_result['total'],
"feedback": test_result['feedback']
})
except Exception as e:
return jsonify({"error": str(e)}), 500
def execute_in_container(code, language, test_cases):
"""Execute code in secure Docker container"""
try:
client = docker.from_env()
# Language-specific container configuration
containers = {
'python': 'python:3.9-alpine',
'java': 'openjdk:11-alpine',
'javascript': 'node:16-alpine'
}
if language not in containers:
return {"error": "Unsupported language"}
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{get_file_extension(language)}', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# Run container with security restrictions
container = client.containers.run(
containers[language],
command=get_run_command(language, temp_file),
volumes={os.path.dirname(temp_file): {'bind': '/app', 'mode': 'ro'}},
working_dir='/app',
mem_limit='128m',
cpu_period=100000,
cpu_quota=50000, # 50% CPU limit
network_mode='none', # No network access
remove=True,
timeout=10, # 10 second timeout
detach=False
)
output = container.decode('utf-8')
# Run test cases if provided
test_results = []
if test_cases:
for test in test_cases:
test_result = run_test_case(code, language, test)
test_results.append(test_result)
return {
"success": True,
"output": output,
"test_results": test_results,
"execution_time": "< 10s"
}
finally:
os.unlink(temp_file)
except docker.errors.ContainerError as e:
return {"error": f"Runtime error: {e}"}
except docker.errors.ImageNotFound:
return {"error": "Language runtime not available"}
except Exception as e:
return {"error": f"Execution failed: {str(e)}"}
def get_file_extension(language):
extensions = {
'python': 'py',
'java': 'java',
'javascript': 'js'
}
return extensions.get(language, 'txt')
def get_run_command(language, filename):
commands = {
'python': f'python /app/{os.path.basename(filename)}',
'java': f'javac /app/{os.path.basename(filename)} && java -cp /app {os.path.splitext(os.path.basename(filename))[0]}',
'javascript': f'node /app/{os.path.basename(filename)}'
}
return commands.get(language)
def log_coding_attempt(session_id, code, language):
"""Log all coding attempts for monitoring"""
from pymongo import MongoClient
client = MongoClient(os.getenv('MONGODB_URI', 'mongodb://localhost:27017/'))
db = client.openlearnx
db.coding_logs.insert_one({
"session_id": session_id,
"code": code,
"language": language,
"timestamp": datetime.now(),
"ip_address": request.remote_addr,
"user_agent": request.headers.get('User-Agent')
})
def validate_test_submission(code, problem_id):
"""Validate code against predefined test cases"""
# Load test cases for the problem
test_cases = get_problem_test_cases(problem_id)
passed = 0
total = len(test_cases)
feedback = []
for i, test_case in enumerate(test_cases):
result = run_test_case(code, 'python', test_case)
if result['passed']:
passed += 1
feedback.append(f"Test {i+1}: ✅ Passed")
else:
feedback.append(f"Test {i+1}: ❌ Failed - {result['error']}")
score = (passed / total) * 100
return {
"score": score,
"passed": passed,
"total": total,
"feedback": feedback
}
def get_problem_test_cases(problem_id):
"""Get test cases for a specific problem"""
# This would load from your database
test_cases_db = {
"python-basics-1": [
{"input": "hello", "expected_output": "HELLO"},
{"input": "world", "expected_output": "WORLD"}
],
"java-oop-1": [
{"input": "5", "expected_output": "25"},
{"input": "10", "expected_output": "100"}
]
}
return test_cases_db.get(problem_id, [])
+546
View File
@@ -0,0 +1,546 @@
from flask import Blueprint, request, jsonify
import subprocess
import tempfile
import os
import time
import docker
from datetime import datetime
bp = Blueprint('compiler', __name__)
def get_db():
"""Get MongoDB database connection"""
from pymongo import MongoClient
from flask import current_app
client = MongoClient(current_app.config['MONGODB_URI'])
return client.openlearnx
@bp.route('/execute', methods=['POST', 'OPTIONS'])
def execute_code():
"""Execute code in specified language with Docker support"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
language = data.get('language', 'python').lower()
code = data.get('code', '').strip()
input_data = data.get('input', '')
print(f"🔧 Executing {language} code")
print(f"📝 Code length: {len(code)} characters")
if not code:
return jsonify({"success": False, "error": "No code provided"}), 400
# Execute based on language
if language == 'python':
return execute_python(code, input_data)
elif language == 'java':
return execute_java(code, input_data)
elif language == 'javascript' or language == 'js':
return execute_javascript(code, input_data)
elif language == 'cpp' or language == 'c++':
return execute_cpp(code, input_data)
elif language == 'c':
return execute_c(code, input_data)
else:
return jsonify({
"success": False,
"error": f"Language '{language}' not supported. Available: python, java, javascript, cpp, c"
}), 400
except Exception as e:
print(f"❌ Compiler error: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500
def execute_python(code, input_data=""):
"""Execute Python code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# Execute with subprocess
start_time = time.time()
result = subprocess.run(
['python3', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10, # 10 second timeout
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "python",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Process exited with code {result.returncode}",
"language": "python"
})
finally:
# Clean up temp file
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out (10s limit)"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Python interpreter not found. Please install Python 3."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Python execution error: {str(e)}"
}), 500
def execute_java(code, input_data=""):
"""Execute Java code"""
try:
# Extract class name from code
import re
class_match = re.search(r'public\s+class\s+(\w+)', code)
if not class_match:
return jsonify({
"success": False,
"error": "No public class found. Java code must contain 'public class ClassName'"
}), 400
class_name = class_match.group(1)
# Create temporary directory
temp_dir = tempfile.mkdtemp()
java_file = os.path.join(temp_dir, f"{class_name}.java")
try:
# Write Java code to file
with open(java_file, 'w') as f:
f.write(code)
# Compile Java code
compile_result = subprocess.run(
['javac', java_file],
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "java"
})
# Execute Java code
start_time = time.time()
result = subprocess.run(
['java', class_name],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "java",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "java"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Java compiler/runtime not found. Please install JDK."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Java execution error: {str(e)}"
}), 500
def execute_javascript(code, input_data=""):
"""Execute JavaScript code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
# Add input handling if needed
if input_data:
js_code = f"""
const input = `{input_data}`;
const readline = {{ question: () => input }};
{code}
"""
else:
js_code = code
f.write(js_code)
temp_file = f.name
try:
# Execute with Node.js
start_time = time.time()
result = subprocess.run(
['node', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "javascript",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "javascript"
})
finally:
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Node.js not found. Please install Node.js."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"JavaScript execution error: {str(e)}"
}), 500
def execute_cpp(code, input_data=""):
"""Execute C++ code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
cpp_file = os.path.join(temp_dir, "main.cpp")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C++ code to file
with open(cpp_file, 'w') as f:
f.write(code)
# Compile C++ code
compile_cmd = ['g++', '-o', exe_file, cpp_file, '-std=c++17']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "cpp"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "cpp",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "cpp"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "G++ compiler not found. Please install GCC/G++."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C++ execution error: {str(e)}"
}), 500
def execute_c(code, input_data=""):
"""Execute C code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
c_file = os.path.join(temp_dir, "main.c")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C code to file
with open(c_file, 'w') as f:
f.write(code)
# Compile C code
compile_cmd = ['gcc', '-o', exe_file, c_file, '-std=c99']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "c"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "c",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "c"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "GCC compiler not found. Please install GCC."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C execution error: {str(e)}"
}), 500
@bp.route('/languages', methods=['GET', 'OPTIONS'])
def get_supported_languages():
"""Get list of supported programming languages"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
languages = {
"python": {
"name": "Python",
"version": "3.x",
"extension": ".py",
"available": check_language_availability("python3")
},
"java": {
"name": "Java",
"version": "JDK 8+",
"extension": ".java",
"available": check_language_availability("javac")
},
"javascript": {
"name": "JavaScript",
"version": "Node.js",
"extension": ".js",
"available": check_language_availability("node")
},
"cpp": {
"name": "C++",
"version": "GCC/G++",
"extension": ".cpp",
"available": check_language_availability("g++")
},
"c": {
"name": "C",
"version": "GCC",
"extension": ".c",
"available": check_language_availability("gcc")
}
}
return jsonify({
"success": True,
"languages": languages,
"total": len(languages),
"available_count": sum(1 for lang in languages.values() if lang["available"])
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
def check_language_availability(command):
"""Check if a language compiler/interpreter is available"""
try:
result = subprocess.run([command, '--version'],
capture_output=True,
timeout=5)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
@bp.route('/health', methods=['GET'])
def compiler_health():
"""Health check for compiler service"""
try:
languages_status = {
"python": check_language_availability("python3"),
"java": check_language_availability("javac"),
"javascript": check_language_availability("node"),
"cpp": check_language_availability("g++"),
"c": check_language_availability("gcc")
}
available_languages = sum(languages_status.values())
total_languages = len(languages_status)
status = "healthy" if available_languages > 0 else "unavailable"
return jsonify({
"status": status,
"timestamp": datetime.now().isoformat(),
"languages": languages_status,
"available_languages": available_languages,
"total_languages": total_languages,
"docker_available": check_docker_availability()
})
except Exception as e:
return jsonify({
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 500
def check_docker_availability():
"""Check if Docker is available for containerized execution"""
try:
client = docker.from_env()
client.ping()
return True
except:
return False
+93
View File
@@ -0,0 +1,93 @@
from flask import Blueprint, jsonify, current_app
from pymongo import MongoClient
import os
bp = Blueprint('courses', __name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
@bp.route("/", methods=["GET"])
@bp.route("", methods=["GET"])
def list_courses():
"""Get all courses - DYNAMIC from database"""
try:
courses = list(db.courses.find({}, {"_id": 0}))
course_list = []
for course in courses:
course_data = {
"id": course.get("id"),
"title": course.get("title"),
"subject": course.get("subject"),
"description": course.get("description"),
"difficulty": course.get("difficulty"),
"mentor": course.get("mentor"),
"video_url": course.get("video_url"),
"embed_url": course.get("embed_url"),
"progress": course.get("progress", 0)
}
course_list.append(course_data)
return jsonify(course_list)
except Exception as e:
print(f"Error in list_courses: {e}")
return jsonify({"error": "Failed to fetch courses"}), 500
@bp.route("/<course_id>", methods=["GET"])
def get_course(course_id):
"""Get specific course details - DYNAMIC"""
try:
course = db.courses.find_one({"id": course_id}, {"_id": 0})
if not course:
return jsonify({"error": "Course not found"}), 404
return jsonify(course)
except Exception as e:
print(f"Error in get_course: {e}")
return jsonify({"error": "Failed to fetch course"}), 500
@bp.route("/<course_id>/lessons/<lesson_id>", methods=["GET"])
def get_lesson(course_id, lesson_id):
"""Get specific lesson content - DYNAMIC"""
try:
lesson = db.lessons.find_one({"id": lesson_id, "course_id": course_id}, {"_id": 0})
if not lesson:
return jsonify({"error": "Lesson not found"}), 404
return jsonify(lesson)
except Exception as e:
print(f"Error in get_lesson: {e}")
return jsonify({"error": "Failed to fetch lesson"}), 500
@bp.route("/<course_id>/lessons/<lesson_id>/complete", methods=["POST"])
def mark_lesson_complete(course_id, lesson_id):
"""Mark a lesson as completed for the user"""
try:
return jsonify({
"success": True,
"message": f"Lesson {lesson_id} marked as complete",
"progress_updated": True
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/<course_id>/progress", methods=["GET"])
def get_course_progress(course_id):
"""Get user's progress in a specific course"""
try:
progress = {
"course_id": course_id,
"completion_percentage": 25,
"lessons_completed": [],
"total_lessons": 4,
"last_accessed": "2025-01-26T23:30:00Z",
"time_spent": "2 hours 15 minutes"
}
return jsonify(progress)
except Exception as e:
return jsonify({"error": str(e)}), 500
+40
View File
@@ -0,0 +1,40 @@
from flask import Blueprint, request, jsonify, current_app
import jwt
bp = Blueprint('dashboard', __name__)
def get_user_from_token(token):
"""Extract user from JWT token"""
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
return payload['user_id']
except:
return None
@bp.route('/student/<user_id>', methods=['GET'])
async def get_student_dashboard(user_id):
"""Get comprehensive student dashboard"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
token_user_id = get_user_from_token(token)
if not token_user_id or token_user_id != user_id:
return jsonify({"error": "Unauthorized"}), 403
mongo_service = current_app.config['MONGO_SERVICE']
analytics = await mongo_service.get_user_analytics(user_id)
return jsonify(analytics or {
"user_info": {"id": user_id},
"overview": {
"total_tests": 0,
"completed_tests": 0,
"average_score": 0,
"certificates_earned": 0
},
"subject_breakdown": {},
"recent_activity": []
})
+931
View File
@@ -0,0 +1,931 @@
from flask import Blueprint, request, jsonify, session
import uuid
import random
import string
from datetime import datetime, timedelta
from pymongo import MongoClient
import os
bp = Blueprint('exam', __name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
def generate_exam_code():
"""Generate a unique 6-character exam code"""
while True:
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
if not db.exams.find_one({"exam_code": code}):
return code
@bp.route("/create-exam", methods=["POST", "OPTIONS"])
def create_exam():
"""Create a new coding exam"""
# Handle OPTIONS request for CORS
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
print(f"Received create-exam request")
data = request.json
print(f"Request data: {data}")
if not data:
print("❌ No data provided")
return jsonify({"error": "No data provided"}), 400
# Check for basic required fields
if not data.get('title'):
print("❌ Missing title")
return jsonify({"error": "Missing required field: title"}), 400
if not data.get('host_name'):
print("❌ Missing host_name")
return jsonify({"error": "Missing required field: host_name"}), 400
# Handle different problem data formats
problem_title = data.get('problem_title') or data.get('title') or 'Coding Challenge'
problem_description = data.get('problem_description') or f"Solve the {problem_title} problem"
# Handle problem_id if provided
if data.get('problem_id'):
problem_title = problem_title or data.get('problem_id').replace('-', ' ').title()
print(f"Using problem_id: {data.get('problem_id')}")
exam_code = generate_exam_code()
exam = {
"exam_code": exam_code,
"title": data.get('title'),
"host_name": data.get('host_name'),
"created_at": datetime.now(),
"status": "waiting",
"duration_minutes": data.get('duration_minutes', 30),
"max_participants": data.get('max_participants', 50),
"problem": {
"title": problem_title,
"description": problem_description,
"function_name": data.get('function_name', 'solve'),
"languages": data.get('languages', ['python']),
"test_cases": data.get('test_cases', [
{
"input": "hello world",
"expected_output": "Hello World",
"description": "Basic capitalization test"
}
]),
"starter_code": data.get('starter_code', {
'python': 'def solve(input_string):\n # Write your solution here\n return input_string.title()',
'java': 'public String solve(String inputString) {\n // Write your solution here\n return inputString;\n}',
'javascript': 'function solve(inputString) {\n // Write your solution here\n return inputString;\n}'
}),
"constraints": data.get('constraints', ['Input will be a string', 'Length between 1-1000 characters']),
"examples": data.get('examples', [
{
"input": "hello world",
"expected_output": "Hello World",
"description": "Capitalize each word"
}
])
},
"participants": [],
"leaderboard": [],
"start_time": None,
"end_time": None
}
print(f"✅ Creating exam with code: {exam_code}")
print(f"✅ Problem title: {problem_title}")
# Insert into database
result = db.exams.insert_one(exam)
print(f"✅ Exam created successfully with ID: {result.inserted_id}")
return jsonify({
"success": True,
"exam_code": exam_code,
"exam_id": str(result.inserted_id),
"message": f"Exam created successfully! Share code: {exam_code}",
"exam_details": {
"title": exam['title'],
"problem_title": problem_title,
"duration": exam['duration_minutes'],
"max_participants": exam['max_participants'],
"languages": exam['problem']['languages']
}
})
except Exception as e:
print(f"❌ Error creating exam: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"error": f"Failed to create exam: {str(e)}"}), 500
@bp.route("/join-exam", methods=["POST", "OPTIONS"])
def join_exam():
"""Student joins exam using unique code and their name"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
# Debug logging for the request
print(f"🔍 Raw request data: {request.data}")
print(f"🔍 Content-Type: {request.headers.get('Content-Type')}")
data = request.json
print(f"🔍 Parsed JSON data: {data}")
if not data:
print("❌ No JSON data received")
return jsonify({"error": "No data provided"}), 400
exam_code = data.get('exam_code', '').upper().strip()
student_name = data.get('student_name', '').strip()
print(f"📝 Join exam request - Code: {exam_code}, Name: {student_name}")
# Enhanced validation with detailed error messages
if not exam_code:
print("❌ Missing exam_code")
return jsonify({"error": "Exam code is required"}), 400
if not student_name:
print("❌ Missing student_name")
return jsonify({"error": "Student name is required"}), 400
# Check if exam exists
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
print(f"❌ Exam not found: {exam_code}")
return jsonify({"error": "Invalid exam code"}), 404
print(f"✅ Found exam: {exam['title']} (Status: {exam['status']})")
# Check exam status
if exam['status'] == 'completed':
print("❌ Exam already completed")
return jsonify({"error": "This exam has already ended"}), 400
# Check capacity
current_participants = exam.get('participants', [])
max_participants = exam.get('max_participants', 50)
if len(current_participants) >= max_participants:
print(f"❌ Exam full: {len(current_participants)}/{max_participants}")
return jsonify({"error": "Exam is full"}), 400
# Check if name is already taken
existing_names = [p['name'].lower() for p in current_participants]
if student_name.lower() in existing_names:
print(f"❌ Name already taken: {student_name}")
return jsonify({"error": "Name already taken. Please choose a different name."}), 400
# Create new participant
participant = {
"name": student_name,
"joined_at": datetime.now(),
"session_id": str(uuid.uuid4()),
"score": 0,
"submission": None,
"language": None,
"submission_time": None,
"completed": False,
"rank": 0,
"test_results": []
}
# Add participant to exam
result = db.exams.update_one(
{"exam_code": exam_code},
{"$push": {"participants": participant}}
)
if result.modified_count == 0:
print("❌ Failed to add participant to database")
return jsonify({"error": "Failed to join exam"}), 500
# Set session data
session['exam_code'] = exam_code
session['student_name'] = student_name
session['session_id'] = participant['session_id']
print(f"✅ Participant {student_name} joined exam {exam_code}")
return jsonify({
"success": True,
"message": f"Successfully joined exam: {exam['title']}",
"exam_info": {
"title": exam['title'],
"duration_minutes": exam['duration_minutes'],
"status": exam['status'],
"participants_count": len(current_participants) + 1,
"max_participants": max_participants,
"languages": exam.get('problem', {}).get('languages', ['python']),
"problem_title": exam.get('problem', {}).get('title', '')
}
})
except Exception as e:
print(f"❌ Error joining exam: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"error": f"Failed to join exam: {str(e)}"}), 500
@bp.route("/start-exam", methods=["POST", "OPTIONS"])
def start_exam():
"""Host starts the exam"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.json
exam_code = data.get('exam_code')
print(f"📝 Start exam request - Code: {exam_code}")
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"error": "Exam not found"}), 404
if exam['status'] != 'waiting':
return jsonify({"error": "Exam has already started or ended"}), 400
start_time = datetime.now()
end_time = start_time + timedelta(minutes=exam['duration_minutes'])
db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"status": "active",
"start_time": start_time,
"end_time": end_time
}
}
)
print(f"✅ Exam {exam_code} started successfully")
return jsonify({
"success": True,
"message": "Exam started successfully!",
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"participants_count": len(exam.get('participants', []))
})
except Exception as e:
print(f"❌ Error starting exam: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/leaderboard/<exam_code>", methods=["GET", "OPTIONS"])
def get_leaderboard(exam_code):
"""Get real-time leaderboard visible to all participants"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"📝 Leaderboard request - Code: {exam_code}")
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"error": "Exam not found"}), 404
participants = exam.get('participants', [])
# Sort by score and submission time
completed_participants = [p for p in participants if p.get('completed', False)]
leaderboard = sorted(
completed_participants,
key=lambda x: (-x.get('score', 0), x.get('submission_time', datetime.now()))
)
# Add rank to each participant
for i, participant in enumerate(leaderboard):
participant['rank'] = i + 1
waiting_participants = [p for p in participants if not p.get('completed', False)]
# Calculate statistics
total_score = sum(p.get('score', 0) for p in completed_participants)
avg_score = total_score / len(completed_participants) if completed_participants else 0
return jsonify({
"success": True,
"exam_info": {
"title": exam['title'],
"status": exam['status'],
"duration_minutes": exam['duration_minutes'],
"start_time": exam.get('start_time'),
"end_time": exam.get('end_time'),
"problem_title": exam.get('problem', {}).get('title', '')
},
"leaderboard": leaderboard,
"waiting_participants": waiting_participants,
"stats": {
"total_participants": len(participants),
"completed_submissions": len(completed_participants),
"waiting_submissions": len(waiting_participants),
"average_score": round(avg_score, 1),
"highest_score": max((p.get('score', 0) for p in completed_participants), default=0)
}
})
except Exception as e:
print(f"❌ Error getting leaderboard: {str(e)}")
return jsonify({"error": str(e)}), 500
@bp.route("/get-problem/<exam_code>", methods=["GET", "OPTIONS"])
def get_exam_problem(exam_code):
"""Get problem details for participants"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"error": "Exam not found"}), 404
return jsonify({
"success": True,
"problem": exam.get('problem', {}),
"exam_info": {
"title": exam['title'],
"status": exam['status'],
"duration_minutes": exam['duration_minutes']
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/host-dashboard/<exam_code>", methods=["GET", "OPTIONS"])
def get_host_dashboard(exam_code):
"""Get comprehensive host dashboard data"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"error": "Exam not found"}), 404
participants = exam.get('participants', [])
# Separate participants by status
completed_participants = [p for p in participants if p.get('completed', False)]
waiting_participants = [p for p in participants if not p.get('completed', False)]
# Sort leaderboard
leaderboard = sorted(
completed_participants,
key=lambda x: (-x.get('score', 0), x.get('submission_time', datetime.now()))
)
# Add ranks
for i, participant in enumerate(leaderboard):
participant['rank'] = i + 1
# Calculate time statistics
current_time = datetime.now()
start_time = exam.get('start_time')
end_time = exam.get('end_time')
time_elapsed = 0
time_remaining = 0
if start_time:
time_elapsed = int((current_time - start_time).total_seconds())
if end_time and current_time < end_time:
time_remaining = int((end_time - current_time).total_seconds())
return jsonify({
"success": True,
"exam_info": {
"exam_code": exam['exam_code'],
"title": exam['title'],
"status": exam['status'],
"duration_minutes": exam['duration_minutes'],
"max_participants": exam.get('max_participants', 50),
"created_at": exam.get('created_at'),
"start_time": start_time,
"end_time": end_time,
"time_elapsed": time_elapsed,
"time_remaining": time_remaining
},
"participants": {
"total": len(participants),
"completed": len(completed_participants),
"working": len(waiting_participants),
"all_participants": sorted(participants, key=lambda x: x.get('joined_at', datetime.now())),
"recent_joins": sorted(participants, key=lambda x: x.get('joined_at', datetime.now()), reverse=True)[:5]
},
"leaderboard": leaderboard,
"statistics": {
"average_score": sum(p.get('score', 0) for p in completed_participants) / len(completed_participants) if completed_participants else 0,
"highest_score": max((p.get('score', 0) for p in completed_participants), default=0),
"lowest_score": min((p.get('score', 0) for p in completed_participants), default=0),
"completion_rate": (len(completed_participants) / len(participants) * 100) if participants else 0
},
"problem": exam.get('problem', {})
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ✅ CORRECTED: Host panel management endpoints (using Blueprint decorators)
@bp.route('/info/<exam_code>', methods=['GET', 'OPTIONS'])
def get_exam_info(exam_code):
"""Get detailed information about an exam for the host panel"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
exam_info = {
"title": exam["title"],
"status": exam["status"],
"duration_minutes": exam["duration_minutes"],
"participants_count": len(exam.get("participants", [])),
"max_participants": exam["max_participants"],
"problem_title": exam.get("problem", {}).get("title", exam["title"]),
"languages": exam.get("problem", {}).get("languages", ["python"]),
"created_at": exam["created_at"],
"host_name": exam["host_name"]
}
print(f"📊 Host panel requested info for exam {exam_code}")
return jsonify({"success": True, "exam_info": exam_info})
except Exception as e:
print(f"❌ Error getting exam info: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@bp.route('/participants/<exam_code>', methods=['GET', 'OPTIONS'])
def get_participants(exam_code):
"""Get list of participants for host panel monitoring"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
participants = exam.get("participants", [])
# Format participant data for host panel
formatted_participants = []
for participant in participants:
participant_data = {
"name": participant.get("name", ""),
"score": participant.get("score", 0),
"completed": participant.get("completed", False),
"joined_at": participant.get("joined_at", ""),
"submitted_at": participant.get("submitted_at", None)
}
formatted_participants.append(participant_data)
print(f"👥 Retrieved {len(formatted_participants)} participants for exam {exam_code}")
return jsonify({"success": True, "participants": formatted_participants})
except Exception as e:
print(f"❌ Error getting participants: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@bp.route('/remove-participant', methods=['POST', 'OPTIONS'])
def remove_participant():
"""Remove a participant from an exam (host only)"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
participant_name = data.get('participant_name', '')
if not exam_code or not participant_name:
return jsonify({"success": False, "error": "Missing exam_code or participant_name"}), 400
# Remove participant from exam
result = db.exams.update_one(
{"exam_code": exam_code},
{"$pull": {"participants": {"name": participant_name}}}
)
if result.modified_count > 0:
print(f"🗑️ Host removed participant {participant_name} from exam {exam_code}")
return jsonify({"success": True, "message": f"Participant {participant_name} removed successfully"})
else:
return jsonify({"success": False, "error": "Participant not found or already removed"}), 404
except Exception as e:
print(f"❌ Error removing participant: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@bp.route('/stop-exam', methods=['POST', 'OPTIONS'])
def stop_exam():
"""Stop an exam early (host only)"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
if not exam_code:
return jsonify({"success": False, "error": "Missing exam_code"}), 400
# Update exam status to completed
result = db.exams.update_one(
{"exam_code": exam_code},
{"$set": {
"status": "completed",
"ended_at": datetime.now().isoformat(),
"ended_by": "host"
}}
)
if result.modified_count > 0:
print(f"🛑 Exam {exam_code} stopped early by host")
return jsonify({"success": True, "message": "Exam stopped successfully"})
else:
return jsonify({"success": False, "error": "Exam not found"}), 404
except Exception as e:
print(f"❌ Error stopping exam: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@bp.route("/debug-join-data", methods=["POST", "OPTIONS"])
def debug_join_data():
"""Debug what data is actually being received"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
print(f"🔍 Raw request data: {request.data}")
print(f"🔍 Request JSON: {request.json}")
print(f"🔍 Content-Type: {request.headers.get('Content-Type')}")
return jsonify({
"received_raw": request.data.decode() if request.data else None,
"received_json": request.json,
"content_type": request.headers.get('Content-Type'),
"success": True
})
@bp.route("/test", methods=["GET"])
def test_exam_route():
"""Test if exam routes are working"""
return jsonify({
"success": True,
"message": "Exam routes are working",
"timestamp": datetime.now().isoformat(),
"available_routes": [
"/api/exam/create-exam",
"/api/exam/join-exam",
"/api/exam/start-exam",
"/api/exam/leaderboard/<exam_code>",
"/api/exam/get-problem/<exam_code>",
"/api/exam/host-dashboard/<exam_code>",
"/api/exam/info/<exam_code>",
"/api/exam/participants/<exam_code>",
"/api/exam/remove-participant",
"/api/exam/stop-exam",
"/api/exam/debug-join-data"
]
})
@bp.route("/", methods=["GET"])
def exam_root():
"""Exam route root"""
return jsonify({
"message": "OpenLearnX Exam API",
"available_endpoints": [
"/api/exam/create-exam",
"/api/exam/join-exam",
"/api/exam/start-exam",
"/api/exam/leaderboard/<exam_code>",
"/api/exam/get-problem/<exam_code>",
"/api/exam/host-dashboard/<exam_code>",
"/api/exam/info/<exam_code>",
"/api/exam/participants/<exam_code>",
"/api/exam/remove-participant",
"/api/exam/stop-exam",
"/api/exam/test",
"/api/exam/debug-join-data"
]
})
@bp.route('/info/<exam_code>', methods=['GET', 'OPTIONS'])
def get_exam_info(exam_code):
"""Get detailed information about an exam for the host panel"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"📊 Host panel requesting info for exam: {exam_code}")
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
print(f"❌ Exam not found: {exam_code}")
return jsonify({"success": False, "error": "Exam not found"}), 404
# Convert datetime objects to strings for JSON serialization
created_at = exam.get("created_at")
if hasattr(created_at, 'isoformat'):
created_at = created_at.isoformat()
exam_info = {
"title": exam["title"],
"status": exam["status"],
"duration_minutes": exam["duration_minutes"],
"participants_count": len(exam.get("participants", [])),
"max_participants": exam.get("max_participants", 50),
"problem_title": exam.get("problem", {}).get("title", exam["title"]),
"languages": exam.get("problem", {}).get("languages", ["python"]),
"created_at": created_at,
"host_name": exam["host_name"]
}
print(f"✅ Found exam: {exam['title']} (Status: {exam['status']})")
return jsonify({"success": True, "exam_info": exam_info})
except Exception as e:
print(f"❌ Error getting exam info: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
@bp.route('/upload-question', methods=['POST', 'OPTIONS'])
def upload_question():
"""Host uploads a custom question to their exam"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
question_data = data.get('question', {})
print(f"📤 Host uploading question to exam: {exam_code}")
if not exam_code or not question_data:
return jsonify({"success": False, "error": "Missing exam_code or question data"}), 400
# Validate required question fields
required_fields = ['title', 'description', 'function_name', 'test_cases']
for field in required_fields:
if not question_data.get(field):
return jsonify({"success": False, "error": f"Missing required field: {field}"}), 400
# Find the exam
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Check if exam has already started
if exam['status'] != 'waiting':
return jsonify({"success": False, "error": "Cannot modify questions after exam has started"}), 400
# Generate question ID
question_id = str(uuid.uuid4())
# Prepare question document
question = {
"id": question_id,
"title": question_data['title'],
"description": question_data['description'],
"difficulty": question_data.get('difficulty', 'medium'),
"function_name": question_data['function_name'],
"starter_code": question_data.get('starter_code', {
'python': f'def {question_data["function_name"]}():\n # Write your solution here\n pass'
}),
"test_cases": question_data['test_cases'],
"examples": question_data.get('examples', []),
"constraints": question_data.get('constraints', []),
"time_limit": question_data.get('time_limit', 1000),
"memory_limit": question_data.get('memory_limit', '128MB'),
"created_at": datetime.now(),
"uploaded_by": exam['host_name']
}
# Update the exam with the new question
result = db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"problem": question,
"updated_at": datetime.now()
}
}
)
if result.modified_count > 0:
print(f"✅ Question '{question['title']}' uploaded to exam {exam_code}")
return jsonify({
"success": True,
"message": "Question uploaded successfully",
"question_id": question_id,
"question_title": question['title']
})
else:
return jsonify({"success": False, "error": "Failed to update exam"}), 500
except Exception as e:
print(f"❌ Error uploading question: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500
@bp.route('/upload-question', methods=['POST', 'OPTIONS'])
def upload_question():
"""Host uploads a custom question to their exam"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
question_data = data.get('question', {})
print(f"📤 Host uploading question to exam: {exam_code}")
if not exam_code or not question_data:
return jsonify({"success": False, "error": "Missing exam_code or question data"}), 400
# Validate required question fields
required_fields = ['title', 'description']
for field in required_fields:
if not question_data.get(field):
return jsonify({"success": False, "error": f"Missing required field: {field}"}), 400
# Find the exam
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Check if exam has already started
if exam['status'] != 'waiting':
return jsonify({"success": False, "error": "Cannot modify questions after exam has started"}), 400
# Generate question ID
import uuid
question_id = str(uuid.uuid4())
# Prepare question document
question = {
"id": question_id,
"title": question_data['title'],
"description": question_data['description'],
"difficulty": question_data.get('difficulty', 'medium'),
"function_name": question_data.get('function_name', 'solve'),
"starter_code": question_data.get('starter_code', {
'python': f'def {question_data.get("function_name", "solve")}():\n # Write your solution here\n pass'
}),
"test_cases": question_data.get('test_cases', []),
"examples": question_data.get('examples', []),
"constraints": question_data.get('constraints', []),
"time_limit": question_data.get('time_limit', 1000),
"memory_limit": question_data.get('memory_limit', '128MB'),
"created_at": datetime.now(),
"uploaded_by": exam.get('host_name', 'Unknown')
}
# Update the exam with the new question
result = db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"problem": question,
"updated_at": datetime.now()
}
}
)
if result.modified_count > 0:
print(f"✅ Question '{question['title']}' uploaded to exam {exam_code}")
return jsonify({
"success": True,
"message": "Question uploaded successfully",
"question_id": question_id,
"question_title": question['title']
})
else:
return jsonify({"success": False, "error": "Failed to update exam"}), 500
except Exception as e:
print(f"❌ Error uploading question: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500
@bp.route('/update-duration', methods=['POST', 'OPTIONS'])
def update_duration():
"""Update exam duration (host only, before exam starts)"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
duration_minutes = data.get('duration_minutes', 0)
print(f"⏰ Updating duration for exam {exam_code} to {duration_minutes} minutes")
if not exam_code or duration_minutes <= 0:
return jsonify({"success": False, "error": "Invalid exam_code or duration_minutes"}), 400
# Find the exam
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Check if exam can be modified
if exam.get('status') != 'waiting':
return jsonify({"success": False, "error": "Cannot modify duration after exam has started"}), 400
# Update duration
result = db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"duration_minutes": duration_minutes,
"updated_at": datetime.now()
}
}
)
if result.modified_count > 0:
print(f"✅ Duration updated to {duration_minutes} minutes for exam {exam_code}")
return jsonify({
"success": True,
"message": f"Duration updated to {duration_minutes} minutes",
"new_duration": duration_minutes
})
else:
return jsonify({"success": False, "error": "Failed to update duration"}), 500
except Exception as e:
print(f"❌ Error updating duration: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
+32
View File
@@ -0,0 +1,32 @@
from flask import Blueprint, jsonify
bp = Blueprint('quizzes', __name__)
# Handle both with and without trailing slash
@bp.route("/", methods=["GET"])
@bp.route("", methods=["GET"]) # Add this line
def list_quizzes():
quizzes = [
{
"id": "python-quiz",
"title": "Python Fundamentals Quiz",
"topic": "Programming",
"difficulty": "Easy",
"recent_performance": 85
},
{
"id": "java-quiz",
"title": "Java OOP Concepts Quiz",
"topic": "Programming",
"difficulty": "Medium",
"recent_performance": 78
},
{
"id": "security-quiz",
"title": "Cybersecurity Basics Quiz",
"topic": "Security",
"difficulty": "Hard",
"recent_performance": 72
}
]
return jsonify(quizzes)
+108
View File
@@ -0,0 +1,108 @@
from flask import Blueprint, request, jsonify, current_app
import jwt
from datetime import datetime
bp = Blueprint('test', __name__)
def get_user_from_token(token):
"""Extract user from JWT token"""
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
return payload['user_id']
except:
return None
@bp.route('/start', methods=['POST'])
async def start_test():
"""Start a new test session"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = get_user_from_token(token)
if not user_id:
return jsonify({"error": "Authentication required"}), 401
data = request.get_json()
subject = data.get('subject', 'General')
mongo_service = current_app.config['MONGO_SERVICE']
# Create test session
session = await mongo_service.create_test_session(user_id, subject)
# Get first question
questions = await mongo_service.get_questions_by_difficulty(2, 1)
if not questions:
return jsonify({"error": "No questions available"}), 404
question = questions[0]
session['questions'].append(str(question['_id']))
await mongo_service.update_test_session(str(session['_id']), {
'questions': session['questions'],
'current_question': 0
})
return jsonify({
"session_id": str(session['_id']),
"question": {
"id": str(question['_id']),
"question": question['question'],
"options": question['options'],
"subject": question['subject'],
"difficulty": question['difficulty']
},
"question_number": 1,
"total_questions": 10
})
@bp.route('/answer', methods=['POST'])
async def submit_answer():
"""Submit answer and get feedback"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = get_user_from_token(token)
if not user_id:
return jsonify({"error": "Authentication required"}), 401
data = request.get_json()
session_id = data.get('session_id')
question_id = data.get('question_id')
answer = data.get('answer')
mongo_service = current_app.config['MONGO_SERVICE']
# Get session and question
session = await mongo_service.get_test_session(session_id)
question = await mongo_service.questions.find_one({"_id": question_id})
if not session or not question:
return jsonify({"error": "Invalid session or question"}), 404
# Check answer
is_correct = answer == question['correct_answer']
# Provide feedback
feedback = {
"correct": is_correct,
"confidence_score": 0.85 if is_correct else 0.25,
"explanation": question['explanation'],
"correct_answer": question['options'][question['correct_answer']],
"current_score": 75.0,
"total_answered": len(session.get('answers', [])) + 1
}
return jsonify({
"feedback": feedback,
"test_completed": False,
"next_question": {
"id": str(question['_id']),
"question": "Sample next question?",
"options": ["A", "B", "C", "D"],
"subject": subject,
"difficulty": 2
}
})
+111
View File
@@ -0,0 +1,111 @@
import asyncio
from mongo_service import MongoService
import os
from dotenv import load_dotenv
load_dotenv()
async def seed_courses():
mongo_service = MongoService(os.getenv('MONGODB_URI'))
courses = [
{
"_id": "python-course",
"title": "Python Programming Mastery",
"subject": "Programming",
"description": "Learn Python from basics to advanced concepts including web development, data science, and automation.",
"difficulty": "Beginner to Advanced",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp",
"modules": [
{
"id": "python-basics",
"title": "Python Fundamentals",
"lessons": [
{"id": "variables", "title": "Variables and Data Types", "type": "video", "video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp"},
{"id": "functions", "title": "Functions and Modules", "type": "code"},
{"id": "turtle-graphics", "title": "Python Turtle Graphics", "type": "video", "video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp"}
]
}
]
},
{
"_id": "java-course",
"title": "Java Development Bootcamp",
"subject": "Programming",
"description": "Master Java programming with object-oriented concepts, Spring framework, and enterprise development.",
"difficulty": "Intermediate",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/SsH8GJlqUIg?si=cK7KW_sM0uf95lEp",
"modules": [
{
"id": "java-oop",
"title": "Object-Oriented Programming in Java",
"lessons": [
{"id": "classes", "title": "Classes and Objects", "type": "code"},
{"id": "inheritance", "title": "Inheritance and Polymorphism", "type": "text"}
]
}
]
},
{
"_id": "ethical-hacking-course",
"title": "Ethical Hacking & Cybersecurity",
"subject": "Cybersecurity",
"description": "Learn ethical hacking techniques, penetration testing, and cybersecurity fundamentals to protect systems.",
"difficulty": "Advanced",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/cDnX0vyNTaE?si=ZXNI4hv2HlWN7eCS",
"modules": [
{
"id": "recon",
"title": "Reconnaissance and Information Gathering",
"lessons": [
{"id": "footprinting", "title": "Footprinting Techniques", "type": "video", "video_url": "https://youtu.be/cDnX0vyNTaE?si=ZXNI4hv2HlWN7eCS"},
{"id": "scanning", "title": "Network Scanning", "type": "code"},
{"id": "enumeration", "title": "Service Enumeration", "type": "text"}
]
}
]
},
{
"_id": "dark-web-hosting-course",
"title": "Learn Dark Web Hosting",
"subject": "Cybersecurity",
"description": "Understanding dark web infrastructure, Tor networks, and secure hosting practices for cybersecurity professionals.",
"difficulty": "Expert",
"mentor": "5t4l1n",
"video_url": "https://youtu.be/Z4_USAMVhYs?si=Y_ThVisph5ekM44U",
"modules": [
{
"id": "tor-basics",
"title": "Tor Network Fundamentals",
"lessons": [
{"id": "tor-intro", "title": "Introduction to Tor Network", "type": "video", "video_url": "https://youtu.be/Z4_USAMVhYs?si=Y_ThVisph5ekM44U"},
{"id": "onion-services", "title": "Setting Up Onion Services", "type": "code"},
{"id": "security-practices", "title": "Security Best Practices", "type": "text"}
]
},
{
"id": "hosting-setup",
"title": "Dark Web Hosting Setup",
"lessons": [
{"id": "server-config", "title": "Server Configuration", "type": "code"},
{"id": "anonymity", "title": "Maintaining Anonymity", "type": "text"}
]
}
]
}
]
try:
# Clear existing courses first
await mongo_service.db.courses.delete_many({})
# Insert updated courses
await mongo_service.db.courses.insert_many(courses)
print("✅ Courses with mentor and video links seeded successfully!")
except Exception as e:
print(f"❌ Error seeding courses: {e}")
if __name__ == "__main__":
asyncio.run(seed_courses())
@@ -0,0 +1,42 @@
import docker
import tempfile
import os # ✅ Make sure this is imported
import subprocess
import time
from typing import Dict, List, Any
import json
class CompilerService:
def __init__(self):
self.client = docker.from_env()
self.language_configs = {
'python': {
'image': 'python:3.9-alpine',
'file_ext': '.py',
'run_command': 'python /app/solution{ext}',
'timeout': 10
},
'java': {
'image': 'openjdk:11-alpine',
'file_ext': '.java',
'run_command': 'cd /app && javac Solution.java && java Solution',
'timeout': 15
},
'c': {
'image': 'gcc:9-alpine',
'file_ext': '.c',
'run_command': 'cd /app && gcc -o solution solution.c && ./solution',
'timeout': 15
},
'bash': {
'image': 'bash:5-alpine',
'file_ext': '.sh',
'run_command': 'bash /app/solution.sh',
'timeout': 10
}
}
# ... rest of your compiler service code
# Global compiler service instance
compiler_service = CompilerService()
@@ -0,0 +1,305 @@
import docker
import tempfile
import os
import subprocess
import time
import uuid
import json
import threading
from typing import Dict, List, Any, Optional
from datetime import datetime
import queue
import signal
class RealCompilerService:
def __init__(self):
self.client = docker.from_env()
self.execution_queue = queue.Queue()
self.active_executions = {}
self.max_concurrent_executions = 5
# Enhanced language configurations with real execution
self.language_configs = {
'python': {
'image': 'python:3.11-slim',
'file_ext': '.py',
'compile_command': None, # Python doesn't need compilation
'run_command': 'python /app/code.py',
'timeout': 30,
'memory_limit': '256m',
'cpu_limit': '0.5'
},
'java': {
'image': 'openjdk:17-alpine',
'file_ext': '.java',
'compile_command': 'javac /app/Main.java',
'run_command': 'java -cp /app Main',
'timeout': 30,
'memory_limit': '512m',
'cpu_limit': '0.5'
},
'cpp': {
'image': 'gcc:latest',
'file_ext': '.cpp',
'compile_command': 'g++ -o /app/program /app/code.cpp -std=c++17',
'run_command': '/app/program',
'timeout': 30,
'memory_limit': '256m',
'cpu_limit': '0.5'
},
'c': {
'image': 'gcc:latest',
'file_ext': '.c',
'compile_command': 'gcc -o /app/program /app/code.c',
'run_command': '/app/program',
'timeout': 30,
'memory_limit': '256m',
'cpu_limit': '0.5'
},
'javascript': {
'image': 'node:18-alpine',
'file_ext': '.js',
'compile_command': None,
'run_command': 'node /app/code.js',
'timeout': 30,
'memory_limit': '256m',
'cpu_limit': '0.5'
},
'bash': {
'image': 'bash:5.2-alpine3.18',
'file_ext': '.sh',
'compile_command': None,
'run_command': 'bash /app/code.sh',
'timeout': 30,
'memory_limit': '128m',
'cpu_limit': '0.3'
},
'go': {
'image': 'golang:1.21-alpine',
'file_ext': '.go',
'compile_command': 'go build -o /app/program /app/code.go',
'run_command': '/app/program',
'timeout': 30,
'memory_limit': '512m',
'cpu_limit': '0.5'
},
'rust': {
'image': 'rust:1.75-alpine',
'file_ext': '.rs',
'compile_command': 'rustc /app/code.rs -o /app/program',
'run_command': '/app/program',
'timeout': 60, # Rust compilation can be slow
'memory_limit': '1g',
'cpu_limit': '1.0'
}
}
# Start execution worker
self.start_execution_worker()
def start_execution_worker(self):
"""Start background worker for code execution"""
def worker():
while True:
try:
execution_task = self.execution_queue.get(timeout=1)
self._execute_task(execution_task)
self.execution_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Execution worker error: {e}")
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()
def execute_code(self, code: str, language: str, input_data: str = "",
execution_id: str = None) -> Dict[str, Any]:
"""Execute code with real output capture"""
if language not in self.language_configs:
return {"error": f"Language '{language}' not supported"}
if not execution_id:
execution_id = str(uuid.uuid4())
config = self.language_configs[language]
try:
# Create execution context
execution_context = {
'execution_id': execution_id,
'code': code,
'language': language,
'input_data': input_data,
'config': config,
'start_time': datetime.now(),
'status': 'running'
}
self.active_executions[execution_id] = execution_context
# Execute in Docker container
result = self._execute_in_container(execution_context)
# Update execution context
execution_context['status'] = 'completed'
execution_context['end_time'] = datetime.now()
execution_context['result'] = result
return {
"success": True,
"execution_id": execution_id,
"output": result.get('output', ''),
"error": result.get('error', ''),
"execution_time": result.get('execution_time', 0),
"memory_used": result.get('memory_used', 0),
"exit_code": result.get('exit_code', 0),
"language": language,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"error": f"Execution failed: {str(e)}",
"execution_id": execution_id,
"language": language
}
finally:
# Clean up
if execution_id in self.active_executions:
del self.active_executions[execution_id]
def _execute_in_container(self, context: Dict) -> Dict[str, Any]:
"""Execute code in secure Docker container"""
code = context['code']
language = context['language']
input_data = context['input_data']
config = context['config']
with tempfile.TemporaryDirectory() as temp_dir:
# Prepare code file
filename = f"code{config['file_ext']}" if language != 'java' else "Main.java"
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(code)
# Prepare input file
input_file = os.path.join(temp_dir, 'input.txt')
with open(input_file, 'w', encoding='utf-8') as f:
f.write(input_data)
try:
start_time = time.time()
# Create and run container
container = self.client.containers.run(
config['image'],
command=self._build_execution_command(config, filename),
volumes={temp_dir: {'bind': '/app', 'mode': 'rw'}},
working_dir='/app',
mem_limit=config['memory_limit'],
cpu_period=100000,
cpu_quota=int(float(config['cpu_limit']) * 100000),
network_mode='none', # No network access
remove=True,
detach=False,
stdin_open=True,
tty=False,
timeout=config['timeout'],
# Security options
cap_drop=['ALL'],
security_opt=['no-new-privileges'],
read_only=False,
tmpfs={'/tmp': 'rw,noexec,nosuid,size=100m'}
)
execution_time = time.time() - start_time
output = container.decode('utf-8')
return {
"output": output.strip(),
"error": "",
"exit_code": 0,
"execution_time": round(execution_time, 3),
"memory_used": self._get_memory_usage(container)
}
except docker.errors.ContainerError as e:
return {
"output": "",
"error": f"Runtime error (exit code {e.exit_status}): {e.stderr.decode('utf-8') if e.stderr else 'Unknown error'}",
"exit_code": e.exit_status,
"execution_time": time.time() - start_time,
"memory_used": 0
}
except docker.errors.APIError as e:
return {
"output": "",
"error": f"Docker API error: {str(e)}",
"exit_code": -1,
"execution_time": 0,
"memory_used": 0
}
except Exception as e:
return {
"output": "",
"error": f"Execution error: {str(e)}",
"exit_code": -1,
"execution_time": 0,
"memory_used": 0
}
def _build_execution_command(self, config: Dict, filename: str) -> str:
"""Build the execution command for the container"""
commands = []
# Add compilation step if needed
if config.get('compile_command'):
commands.append(config['compile_command'])
# Add execution command with input redirection
run_cmd = config['run_command']
if '<' not in run_cmd: # Add input redirection if not present
run_cmd += ' < /app/input.txt 2>&1'
commands.append(run_cmd)
# Combine commands
return f"sh -c '{' && '.join(commands)}'"
def _get_memory_usage(self, container) -> int:
"""Get memory usage from container stats"""
try:
stats = container.stats(stream=False)
memory_usage = stats['memory']['usage']
return memory_usage
except:
return 0
def get_supported_languages(self) -> List[Dict[str, str]]:
"""Get list of supported languages with details"""
return [
{
'id': lang_id,
'name': lang_id.title(),
'extension': config['file_ext'],
'timeout': config['timeout'],
'memory_limit': config['memory_limit']
}
for lang_id, config in self.language_configs.items()
]
def get_execution_status(self, execution_id: str) -> Optional[Dict]:
"""Get status of a running execution"""
return self.active_executions.get(execution_id)
def cancel_execution(self, execution_id: str) -> bool:
"""Cancel a running execution"""
if execution_id in self.active_executions:
# Implementation would involve stopping the Docker container
del self.active_executions[execution_id]
return True
return False
# Create global instance
real_compiler_service = RealCompilerService()
@@ -0,0 +1,53 @@
from web3 import Web3
from eth_account import Account
from datetime import datetime
import hashlib
import secrets
import os # ✅ Add this missing import
class WalletService:
def __init__(self, web3_provider_url):
self.w3 = Web3(Web3.HTTPProvider(web3_provider_url))
def verify_wallet_signature(self, wallet_address, signature, message):
"""Verify wallet signature for authentication"""
try:
# Recover the address from signature
message_hash = Web3.keccak(text=message)
recovered_address = self.w3.eth.account.recover_message_hash(message_hash, signature=signature)
return recovered_address.lower() == wallet_address.lower()
except Exception as e:
print(f"Signature verification error: {e}")
return False
def generate_auth_message(self, wallet_address, exam_code):
"""Generate message for wallet signing"""
timestamp = int(datetime.now().timestamp())
nonce = secrets.token_hex(16)
message = f"""OpenLearnX Exam Authentication
Wallet: {wallet_address}
Exam Code: {exam_code}
Timestamp: {timestamp}
Nonce: {nonce}
Sign this message to join the coding exam."""
return message, timestamp, nonce
def create_wallet_session(self, wallet_address, exam_code, signature):
"""Create authenticated wallet session"""
session_id = hashlib.sha256(f"{wallet_address}{exam_code}{datetime.now().timestamp()}".encode()).hexdigest()
return {
"session_id": session_id,
"wallet_address": wallet_address,
"exam_code": exam_code,
"authenticated_at": datetime.now(),
"signature": signature
}
# Create the service instance
wallet_service = WalletService(os.getenv('WEB3_PROVIDER_URL', 'http://127.0.0.1:8545'))
+8
View File
@@ -0,0 +1,8 @@
def choose_next_question(current_difficulty: int, last_answer_correct: bool) -> int:
"""
Simplified adaptive engine logic adjusting difficulty for next question.
"""
if last_answer_correct:
return min(current_difficulty + 1, 3) # max difficulty = 3
else:
return max(current_difficulty - 1, 1) # min difficulty = 1
+283
View File
@@ -0,0 +1,283 @@
from web3 import Web3
from eth_account.messages import encode_defunct
import json
import secrets
import time
from typing import Optional, Dict, Any
from pathlib import Path
class Web3Service:
def __init__(self, provider_url: str, contract_address: Optional[str] = None):
self.w3 = Web3(Web3.HTTPProvider(provider_url))
self.contract_address = contract_address
self.contract = None
if contract_address:
self.load_contract()
def load_contract(self):
"""Load the smart contract ABI and create contract instance"""
try:
# Updated path to match Foundry's output structure
contract_path = Path('out/CertificateNFT.sol/CertificateNFT.json')
if not contract_path.exists():
print(f"Contract JSON not found at {contract_path}")
return
with open(contract_path, 'r') as f:
contract_data = json.load(f)
abi = contract_data['abi']
self.contract = self.w3.eth.contract(
address=self.contract_address,
abi=abi
)
print(f"Contract loaded successfully at {self.contract_address}")
except Exception as e:
print(f"Failed to load contract: {e}")
def generate_nonce(self) -> str:
"""Generate a random nonce for signature verification"""
return secrets.token_hex(16)
def verify_signature(self, address: str, message: str, signature: str) -> bool:
"""Verify MetaMask signature"""
try:
# Create the message that was signed
message_hash = encode_defunct(text=message)
# Recover the address from signature
recovered_address = self.w3.eth.account.recover_message(
message_hash,
signature=signature
)
# Compare addresses (case insensitive)
return recovered_address.lower() == address.lower()
except Exception as e:
print(f"Signature verification failed: {e}")
return False
def mint_certificate(self, to_address: str, token_uri: str, private_key: str) -> Optional[str]:
"""Mint an NFT certificate using the simple mintCertificate function"""
if not self.contract:
raise Exception("Contract not loaded")
try:
# Get account from private key
account = self.w3.eth.account.from_key(private_key)
# Build transaction
transaction = self.contract.functions.mintCertificate(
to_address,
token_uri
).build_transaction({
'from': account.address,
'nonce': self.w3.eth.get_transaction_count(account.address),
'gas': 500000, # Increased gas limit
'gasPrice': self.w3.to_wei('20', 'gwei')
})
# Sign and send transaction
signed_txn = self.w3.eth.account.sign_transaction(transaction, private_key)
tx_hash = self.w3.eth.send_raw_transaction(signed_txn.rawTransaction)
# Wait for transaction receipt
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120)
if receipt.status == 1:
print(f"Certificate minted successfully. TX: {receipt.transactionHash.hex()}")
return receipt.transactionHash.hex()
else:
print(f"Transaction failed. Status: {receipt.status}")
return None
except Exception as e:
print(f"Minting failed: {e}")
return None
def mint_certificate_with_details(self, to_address: str, token_uri: str,
subject: str, student_name: str, score: int,
private_key: str) -> Optional[str]:
"""Mint an NFT certificate with detailed information"""
if not self.contract:
raise Exception("Contract not loaded")
try:
# Get account from private key
account = self.w3.eth.account.from_key(private_key)
# Build transaction with detailed function
transaction = self.contract.functions.mintCertificateWithDetails(
to_address,
token_uri,
subject,
student_name,
score
).build_transaction({
'from': account.address,
'nonce': self.w3.eth.get_transaction_count(account.address),
'gas': 600000, # Higher gas for detailed function
'gasPrice': self.w3.to_wei('20', 'gwei')
})
# Sign and send transaction
signed_txn = self.w3.eth.account.sign_transaction(transaction, private_key)
tx_hash = self.w3.eth.send_raw_transaction(signed_txn.rawTransaction)
# Wait for transaction receipt
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120)
if receipt.status == 1:
print(f"Detailed certificate minted successfully. TX: {receipt.transactionHash.hex()}")
return receipt.transactionHash.hex()
else:
print(f"Transaction failed. Status: {receipt.status}")
return None
except Exception as e:
print(f"Detailed minting failed: {e}")
return None
def get_certificate_details(self, token_id: int) -> Optional[Dict]:
"""Get certificate details by token ID"""
if not self.contract:
return None
try:
# Get certificate struct data
certificate = self.contract.functions.getCertificate(token_id).call()
# Get owner and token URI
owner = self.contract.functions.ownerOf(token_id).call()
token_uri = self.contract.functions.tokenURI(token_id).call()
return {
'token_id': token_id,
'owner': owner,
'token_uri': token_uri,
'subject': certificate[0],
'student_name': certificate[1],
'score': certificate[2],
'timestamp': certificate[3],
'verified': certificate[4]
}
except Exception as e:
print(f"Failed to get certificate details: {e}")
return None
def get_user_certificates(self, user_address: str) -> Optional[list]:
"""Get all certificate token IDs for a user"""
if not self.contract:
return None
try:
token_ids = self.contract.functions.getUserCertificates(user_address).call()
return token_ids
except Exception as e:
print(f"Failed to get user certificates: {e}")
return None
def verify_certificate(self, token_id: int) -> bool:
"""Verify if a certificate is valid"""
if not self.contract:
return False
try:
is_verified = self.contract.functions.verifyCertificate(token_id).call()
return is_verified
except Exception as e:
print(f"Failed to verify certificate: {e}")
return False
def get_total_supply(self) -> int:
"""Get total number of certificates minted"""
if not self.contract:
return 0
try:
total = self.contract.functions.totalSupply().call()
return total
except Exception as e:
print(f"Failed to get total supply: {e}")
return 0
def get_latest_token_id(self) -> int:
"""Get the latest token ID (useful for getting newly minted certificate)"""
return self.get_total_supply()
def get_transaction_receipt(self, tx_hash: str) -> Optional[Dict]:
"""Get transaction receipt for a given hash"""
try:
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
return {
'transaction_hash': receipt.transactionHash.hex(),
'block_number': receipt.blockNumber,
'gas_used': receipt.gasUsed,
'status': receipt.status,
'contract_address': receipt.contractAddress
}
except Exception as e:
print(f"Failed to get transaction receipt: {e}")
return None
def is_connected(self) -> bool:
"""Check if connected to blockchain"""
try:
return self.w3.is_connected()
except:
return False
def get_balance(self, address: str) -> float:
"""Get ETH balance for an address"""
try:
balance_wei = self.w3.eth.get_balance(address)
return self.w3.from_wei(balance_wei, 'ether')
except Exception as e:
print(f"Failed to get balance: {e}")
return 0.0
def get_gas_price(self) -> int:
"""Get current gas price"""
try:
return self.w3.eth.gas_price
except Exception as e:
print(f"Failed to get gas price: {e}")
return self.w3.to_wei('20', 'gwei') # Default fallback
def estimate_gas(self, to_address: str, token_uri: str, account_address: str) -> int:
"""Estimate gas for certificate minting"""
if not self.contract:
return 500000 # Default estimate
try:
gas_estimate = self.contract.functions.mintCertificate(
to_address,
token_uri
).estimate_gas({'from': account_address})
# Add 20% buffer
return int(gas_estimate * 1.2)
except Exception as e:
print(f"Failed to estimate gas: {e}")
return 500000 # Default fallback
def get_contract_info(self) -> Dict:
"""Get basic contract information"""
if not self.contract:
return {}
try:
return {
'address': self.contract_address,
'total_certificates': self.get_total_supply(),
'is_connected': self.is_connected(),
'network_id': self.w3.eth.chain_id,
'latest_block': self.w3.eth.block_number
}
except Exception as e:
print(f"Failed to get contract info: {e}")
return {}
+214 -582
View File
File diff suppressed because it is too large Load Diff