This commit is contained in:
5t4l1n
2025-07-27 02:22:23 +05:30
parent 82fd0a8e24
commit e358b6c760
4 changed files with 1863 additions and 808 deletions
+742 -71
View File
@@ -136,10 +136,175 @@ def get_db():
client = MongoClient(app.config['MONGODB_URI'])
return client.openlearnx
# ✅ ONLY UNIQUE DIRECT EXAM ENDPOINTS (no conflicts with blueprint)
# ===================================================================
# ✅ DYNAMIC SCORING SYSTEM
# ===================================================================
def calculate_dynamic_score(code, language, problem):
"""Calculate score based on test cases and expected outputs"""
import io
from contextlib import redirect_stdout, redirect_stderr
import time
test_cases = problem.get('test_cases', [])
scoring_method = problem.get('scoring_method', 'test_cases')
total_points = problem.get('total_points', 100)
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
start_time = time.time()
passed_tests = 0
total_tests = len(test_cases) if test_cases else 1
test_results = []
points_earned = 0
print(f"🧮 Starting dynamic scoring - {total_tests} test cases")
try:
if test_cases:
# ✅ TEST CASE BASED SCORING
for i, test_case in enumerate(test_cases):
test_input = test_case.get('input', '')
expected_output = test_case.get('expected_output', '').strip()
test_points = test_case.get('points', total_points // total_tests)
print(f"📋 Test {i+1}: Input='{test_input}', Expected='{expected_output}'")
try:
# Create a modified version of the code that handles input
if test_input:
# Inject input into the code execution environment
modified_code = f"""
import io
import sys
sys.stdin = io.StringIO('{test_input}')
{code}
"""
else:
modified_code = code
# Execute the code
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
exec(modified_code, {"__builtins__": __builtins__})
actual_output = stdout_buffer.getvalue().strip()
stderr_content = stderr_buffer.getvalue()
print(f"🔍 Test {i+1} - Actual: '{actual_output}', Expected: '{expected_output}'")
# Check if output matches
if actual_output == expected_output:
passed_tests += 1
points_earned += test_points
test_results.append({
"test_number": i + 1,
"passed": True,
"input": test_input,
"expected_output": expected_output,
"actual_output": actual_output,
"points_earned": test_points,
"description": test_case.get('description', f'Test case {i+1}')
})
print(f"✅ Test {i+1} PASSED - {test_points} points")
else:
test_results.append({
"test_number": i + 1,
"passed": False,
"input": test_input,
"expected_output": expected_output,
"actual_output": actual_output,
"points_earned": 0,
"error": f"Output mismatch. Got '{actual_output}', expected '{expected_output}'",
"description": test_case.get('description', f'Test case {i+1}')
})
print(f"❌ Test {i+1} FAILED - Expected '{expected_output}', got '{actual_output}'")
except Exception as e:
test_results.append({
"test_number": i + 1,
"passed": False,
"input": test_input,
"expected_output": expected_output,
"actual_output": f"Error: {str(e)}",
"points_earned": 0,
"error": str(e),
"description": test_case.get('description', f'Test case {i+1}')
})
print(f"❌ Test {i+1} ERROR - {str(e)}")
else:
# ✅ FALLBACK: BASIC EXECUTION TEST
try:
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
exec(code, {"__builtins__": __builtins__})
passed_tests = 1
points_earned = total_points
test_results = [{
"test_number": 1,
"passed": True,
"input": "",
"expected_output": "Code should execute without errors",
"actual_output": stdout_buffer.getvalue().strip(),
"points_earned": total_points,
"description": "Basic execution test"
}]
except Exception as e:
test_results = [{
"test_number": 1,
"passed": False,
"input": "",
"expected_output": "Code should execute without errors",
"actual_output": f"Error: {str(e)}",
"points_earned": 0,
"error": str(e),
"description": "Basic execution test"
}]
except Exception as e:
print(f"❌ Scoring error: {str(e)}")
test_results = [{
"test_number": 1,
"passed": False,
"input": "",
"expected_output": "Code should execute without errors",
"actual_output": f"Scoring error: {str(e)}",
"points_earned": 0,
"error": str(e),
"description": "Scoring system error"
}]
execution_time = time.time() - start_time
# Calculate final score percentage
final_score = int((points_earned / total_points) * 100) if total_points > 0 else 0
print(f"🏆 FINAL SCORE: {final_score}% ({points_earned}/{total_points} points, {passed_tests}/{total_tests} tests)")
return {
'score': final_score,
'passed_tests': passed_tests,
'total_tests': total_tests,
'test_results': test_results,
'execution_time': round(execution_time, 3),
'details': {
'points_earned': points_earned,
'total_points': total_points,
'scoring_method': scoring_method
}
}
# ===================================================================
# ✅ ENHANCED EXAM ENDPOINTS WITH DYNAMIC SCORING
# ===================================================================
@app.route('/api/exam/upload-question', methods=['POST', 'OPTIONS'])
def upload_question_direct():
"""Direct endpoint for question upload"""
"""Enhanced question upload with dynamic scoring"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
@@ -148,7 +313,7 @@ def upload_question_direct():
return response
try:
print(f"📤 Direct question upload request")
print(f"📤 Enhanced question upload request")
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
@@ -172,7 +337,7 @@ def upload_question_direct():
if exam.get('status') != 'waiting':
return jsonify({"success": False, "error": "Cannot modify questions after exam has started"}), 400
# Create question document
# ✅ ENHANCED QUESTION STRUCTURE WITH DYNAMIC SCORING
question = {
"id": str(uuid.uuid4()),
"title": question_data.get('title', 'Custom Question'),
@@ -182,14 +347,31 @@ def upload_question_direct():
"starter_code": question_data.get('starter_code', {
'python': 'def solve():\n # Write your solution here\n pass'
}),
"test_cases": question_data.get('test_cases', []),
# ✅ ENHANCED TEST CASES FOR DYNAMIC SCORING
"test_cases": question_data.get('test_cases', [
{
"input": "",
"expected_output": "Hello World",
"description": "Basic test case",
"is_public": True,
"points": 25
}
]),
"examples": question_data.get('examples', []),
"constraints": question_data.get('constraints', []),
"time_limit": question_data.get('time_limit', 1000),
"memory_limit": question_data.get('memory_limit', '128MB'),
"created_at": datetime.now(),
"uploaded_by": exam.get('host_name', 'Host'),
"languages": list(question_data.get('starter_code', {}).keys())
"languages": list(question_data.get('starter_code', {}).keys()),
# ✅ HOST'S CORRECT SOLUTION
"correct_solution": {
"python": question_data.get('correct_solution', {}).get('python', ''),
"java": question_data.get('correct_solution', {}).get('java', ''),
"javascript": question_data.get('correct_solution', {}).get('javascript', '')
},
"scoring_method": question_data.get('scoring_method', 'test_cases'),
"total_points": question_data.get('total_points', 100)
}
# Update exam
@@ -205,12 +387,14 @@ def upload_question_direct():
)
if result.modified_count > 0:
print(f"Question '{question['title']}' uploaded to exam {exam_code}")
print(f"Enhanced question '{question['title']}' uploaded to exam {exam_code}")
return jsonify({
"success": True,
"message": "Question uploaded successfully",
"message": "Question uploaded successfully with dynamic scoring",
"question_id": question['id'],
"question_title": question['title']
"question_title": question['title'],
"test_cases_count": len(question['test_cases']),
"total_points": question['total_points']
})
else:
print(f"❌ Failed to update exam {exam_code}")
@@ -222,6 +406,495 @@ def upload_question_direct():
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/submit-solution', methods=['POST', 'OPTIONS'])
def submit_solution_direct():
"""Enhanced solution submission with dynamic scoring"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
language = data.get('language', 'python')
code = data.get('code', '').strip()
participant_name = data.get('participant_name', 'Anonymous')
print(f"📤 ENHANCED SUBMIT: Exam {exam_code}, Language: {language}, Participant: {participant_name}")
if not exam_code or not code:
return jsonify({"success": False, "error": "Missing exam_code or code"}), 400
# Get database
db = get_db()
# Find the exam
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
if exam.get('status') != 'active':
return jsonify({"success": False, "error": "Exam is not active"}), 400
# Get the problem/question
problem = exam.get('problem', {})
if not problem:
return jsonify({"success": False, "error": "No problem found for this exam"}), 400
# ✅ DYNAMIC SCORING SYSTEM
scoring_result = calculate_dynamic_score(code, language, problem)
# Store enhanced submission
submission = {
"exam_code": exam_code,
"participant_name": participant_name,
"language": language,
"code": code,
"score": scoring_result['score'],
"passed_tests": scoring_result['passed_tests'],
"total_tests": scoring_result['total_tests'],
"test_results": scoring_result['test_results'],
"execution_time": scoring_result['execution_time'],
"submitted_at": datetime.now(),
"submission_id": str(uuid.uuid4()),
"scoring_details": scoring_result['details']
}
# Save to database
db.submissions.insert_one(submission)
# Update participant status in leaderboard
db.participants.update_one(
{"exam_code": exam_code, "name": participant_name},
{
"$set": {
"completed": True,
"score": scoring_result['score'],
"submitted_at": datetime.now(),
"language": language,
"passed_tests": scoring_result['passed_tests'],
"total_tests": scoring_result['total_tests'],
"points_earned": scoring_result['details']['points_earned'],
"total_points": scoring_result['details']['total_points']
}
},
upsert=True
)
print(f"✅ Enhanced submission saved - Score: {scoring_result['score']}% ({scoring_result['passed_tests']}/{scoring_result['total_tests']} tests passed)")
return jsonify({
"success": True,
"message": "Solution submitted successfully with dynamic scoring!",
"score": scoring_result['score'],
"passed_tests": scoring_result['passed_tests'],
"total_tests": scoring_result['total_tests'],
"test_results": scoring_result['test_results'],
"execution_time": scoring_result['execution_time'],
"submission_id": submission["submission_id"],
"scoring_details": scoring_result['details']
})
except Exception as e:
print(f"❌ Enhanced submit error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
# ===================================================================
# ✅ OTHER EXISTING ENDPOINTS (keeping all your current ones)
# ===================================================================
@app.route('/api/exam/update-duration', methods=['POST', 'OPTIONS'])
def update_duration_direct():
"""Direct endpoint for duration update"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
duration_minutes = data.get('duration_minutes', 0)
print(f"⏰ DIRECT: Updating duration for exam {exam_code} to {duration_minutes} minutes")
if not exam_code or duration_minutes <= 0:
return jsonify({"success": False, "error": "Invalid data"}), 400
# Get database
db = get_db()
# Find and update exam
result = db.exams.update_one(
{"exam_code": exam_code, "status": "waiting"},
{"$set": {"duration_minutes": duration_minutes}}
)
if result.modified_count > 0:
print(f"✅ Duration updated to {duration_minutes} minutes")
return jsonify({
"success": True,
"message": f"Duration updated to {duration_minutes} minutes"
})
else:
return jsonify({"success": False, "error": "Exam not found or already started"}), 404
except Exception as e:
print(f"❌ Error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/info/<exam_code>', methods=['GET', 'OPTIONS'])
def get_exam_info_direct(exam_code):
"""Get exam information"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"📋 Getting exam info for: {exam_code}")
db = get_db()
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Convert ObjectId to string for JSON serialization
if '_id' in exam:
exam['_id'] = str(exam['_id'])
return jsonify({
"success": True,
"exam_info": exam
})
except Exception as e:
print(f"❌ Get exam info error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/get-problem/<exam_code>', methods=['GET', 'OPTIONS'])
def get_exam_problem_direct(exam_code):
"""Get exam problem"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"📝 Getting problem for exam: {exam_code}")
db = get_db()
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Convert ObjectId to string
if '_id' in exam:
exam['_id'] = str(exam['_id'])
return jsonify({
"success": True,
"exam_info": exam,
"problem": exam.get('problem', {})
})
except Exception as e:
print(f"❌ Get problem error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/participants/<exam_code>', methods=['GET', 'OPTIONS'])
def get_participants_direct(exam_code):
"""Get exam participants"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"👥 Getting participants for: {exam_code}")
db = get_db()
participants = list(db.participants.find({"exam_code": exam_code.upper()}))
# Convert ObjectId to string
for participant in participants:
if '_id' in participant:
participant['_id'] = str(participant['_id'])
return jsonify({
"success": True,
"participants": participants
})
except Exception as e:
print(f"❌ Get participants error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/leaderboard/<exam_code>', methods=['GET', 'OPTIONS'])
def get_leaderboard_direct(exam_code):
"""Enhanced leaderboard with dynamic scoring details"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
print(f"🏆 Getting enhanced leaderboard for: {exam_code}")
db = get_db()
# Get exam info
exam = db.exams.find_one({"exam_code": exam_code.upper()})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
# Get participants with enhanced scoring details
participants = list(db.participants.find({"exam_code": exam_code.upper()}))
# Sort by score (highest first) and add ranks
completed_participants = [p for p in participants if p.get('completed', False)]
completed_participants.sort(key=lambda x: x.get('score', 0), reverse=True)
leaderboard = []
for i, participant in enumerate(completed_participants):
participant['rank'] = i + 1
if '_id' in participant:
participant['_id'] = str(participant['_id'])
leaderboard.append(participant)
# Get waiting participants
waiting_participants = [p for p in participants if not p.get('completed', False)]
for participant in waiting_participants:
if '_id' in participant:
participant['_id'] = str(participant['_id'])
# Calculate enhanced stats
scores = [p.get('score', 0) for p in completed_participants]
passed_tests = [p.get('passed_tests', 0) for p in completed_participants]
total_tests = [p.get('total_tests', 1) for p in completed_participants]
stats = {
"total_participants": len(participants),
"completed_submissions": len(completed_participants),
"waiting_submissions": len(waiting_participants),
"average_score": sum(scores) / len(scores) if scores else 0,
"highest_score": max(scores) if scores else 0,
"average_tests_passed": sum(passed_tests) / len(passed_tests) if passed_tests else 0,
"total_test_cases": max(total_tests) if total_tests else 1,
"blockchain_participants": 0 # Add if you have blockchain functionality
}
# Convert exam ObjectId
if '_id' in exam:
exam['_id'] = str(exam['_id'])
return jsonify({
"success": True,
"leaderboard": leaderboard,
"waiting_participants": waiting_participants,
"stats": stats,
"exam_info": exam
})
except Exception as e:
print(f"❌ Enhanced leaderboard error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/start-exam', methods=['POST', 'OPTIONS'])
def start_exam_direct():
"""Start an exam"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
print(f"🚀 Starting exam: {exam_code}")
db = get_db()
# Calculate end time
exam = db.exams.find_one({"exam_code": exam_code})
if not exam:
return jsonify({"success": False, "error": "Exam not found"}), 404
if exam.get('status') != 'waiting':
return jsonify({"success": False, "error": "Exam already started or completed"}), 400
start_time = datetime.now()
duration_minutes = exam.get('duration_minutes', 30)
end_time = datetime.fromtimestamp(start_time.timestamp() + (duration_minutes * 60))
# Update exam status
result = db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"status": "active",
"start_time": start_time,
"end_time": end_time,
"updated_at": datetime.now()
}
}
)
if result.modified_count > 0:
print(f"✅ Exam {exam_code} started successfully")
return jsonify({
"success": True,
"message": "Exam started successfully",
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat()
})
else:
return jsonify({"success": False, "error": "Failed to start exam"}), 500
except Exception as e:
print(f"❌ Start exam error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/exam/stop-exam', methods=['POST', 'OPTIONS'])
def stop_exam_direct():
"""Stop an exam"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
print(f"🛑 Stopping exam: {exam_code}")
db = get_db()
# Update exam status
result = db.exams.update_one(
{"exam_code": exam_code},
{
"$set": {
"status": "completed",
"completed_at": datetime.now(),
"updated_at": datetime.now()
}
}
)
if result.modified_count > 0:
print(f"✅ Exam {exam_code} stopped successfully")
return jsonify({
"success": True,
"message": "Exam stopped successfully"
})
else:
return jsonify({"success": False, "error": "Exam not found"}), 404
except Exception as e:
print(f"❌ Stop exam error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
# ===================================================================
# ✅ COMPILER ENDPOINT (Fixed timeout issue)
# ===================================================================
@app.route('/api/compiler/execute', methods=['POST', 'OPTIONS'])
def execute_code_direct():
"""Direct compiler endpoint - FIXED TIMEOUT ISSUE"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
language = data.get('language', 'python').lower()
code = data.get('code', '').strip()
print(f"🔧 COMPILER: Executing {language} code: {code}")
if not code:
return jsonify({"success": False, "error": "No code provided"}), 400
if language == 'python':
try:
# ✅ SIMPLE EXEC METHOD (no subprocess, no timeout issues)
import io
import sys
from contextlib import redirect_stdout, redirect_stderr
import time
# Capture output
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
start_time = time.time()
try:
# Execute Python code using exec
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
exec(code, {"__builtins__": __builtins__})
execution_time = time.time() - start_time
stdout_content = stdout_buffer.getvalue()
stderr_content = stderr_buffer.getvalue()
print(f"✅ Python exec successful. Output: '{stdout_content.strip()}'")
return jsonify({
"success": True,
"output": stdout_content or "Code executed successfully (no output)",
"error": stderr_content if stderr_content else None,
"language": "python",
"execution_time": round(execution_time, 3)
})
except Exception as e:
execution_time = time.time() - start_time
print(f"❌ Python execution error: {str(e)}")
return jsonify({
"success": False,
"error": f"Runtime error: {str(e)}",
"execution_time": round(execution_time, 3)
})
except Exception as e:
print(f"❌ Python setup error: {str(e)}")
return jsonify({"success": False, "error": f"Setup failed: {str(e)}"}), 500
else:
return jsonify({
"success": False,
"error": f"Language '{language}' not supported. Only Python available."
})
except Exception as e:
print(f"❌ Compiler endpoint error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
# ===================================================================
# ✅ LEGACY ENDPOINTS (keeping your existing ones)
# ===================================================================
@app.route('/api/exam/questions/<exam_code>', methods=['GET', 'OPTIONS'])
def get_exam_questions_direct(exam_code):
"""Get all questions for an exam"""
@@ -333,14 +1006,15 @@ def secure_headers(resp):
def health_root():
return jsonify({
"status":"OpenLearnX API running",
"version":"2.0.1",
"version":"2.1.0 - Enhanced Dynamic Scoring",
"timestamp": datetime.now().isoformat(),
"features":{
"mongodb": MONGO_SERVICE_AVAILABLE,
"web3": WEB3_SERVICE_AVAILABLE,
"wallet": WALLET_SERVICE_AVAILABLE,
"compiler": COMPILER_SERVICE_AVAILABLE,
"docker": check_docker_availability()
"docker": check_docker_availability(),
"dynamic_scoring": True
},
"endpoints": {
"exam": "/api/exam",
@@ -351,9 +1025,16 @@ def health_root():
"blueprints_registered": len(blueprints_registered),
"blueprints_failed": len(blueprints_failed),
"direct_exam_endpoints": [
"/api/exam/upload-question",
"/api/exam/questions/<exam_code>",
"/api/exam/update-question"
"/api/exam/upload-question (Enhanced)",
"/api/exam/submit-solution (Dynamic Scoring)",
"/api/exam/leaderboard/<exam_code> (Enhanced)",
"/api/exam/update-duration",
"/api/exam/info/<exam_code>",
"/api/exam/get-problem/<exam_code>",
"/api/exam/participants/<exam_code>",
"/api/exam/start-exam",
"/api/exam/stop-exam",
"/api/compiler/execute"
]
})
@@ -365,7 +1046,8 @@ def api_health():
"web3": WEB3_SERVICE_AVAILABLE,
"wallet": WALLET_SERVICE_AVAILABLE,
"compiler": COMPILER_SERVICE_AVAILABLE,
"docker": check_docker_availability()
"docker": check_docker_availability(),
"dynamic_scoring": True
}
# Check MongoDB connection
@@ -411,11 +1093,18 @@ def debug_services():
"web3": WEB3_SERVICE_AVAILABLE,
"wallet": WALLET_SERVICE_AVAILABLE,
"compiler": COMPILER_SERVICE_AVAILABLE,
"docker": check_docker_availability()
"docker": check_docker_availability(),
"dynamic_scoring": True
},
"blueprints_registered": blueprints_registered,
"blueprints_failed": blueprints_failed,
"direct_endpoints_added": True
"direct_endpoints_added": True,
"enhanced_features": [
"Dynamic Test Case Scoring",
"Host-defined Correct Solutions",
"Point-based Test Cases",
"Enhanced Leaderboard with Test Details"
]
})
# Error handlers
@@ -427,9 +1116,16 @@ def not_found(e):
"method": request.method,
"available_blueprints": blueprints_registered,
"direct_exam_endpoints": [
"/api/exam/upload-question",
"/api/exam/questions/<exam_code>",
"/api/exam/update-question"
"/api/exam/upload-question (Enhanced with Dynamic Scoring)",
"/api/exam/submit-solution (Dynamic Scoring)",
"/api/exam/leaderboard/<exam_code> (Enhanced)",
"/api/exam/update-duration",
"/api/exam/info/<exam_code>",
"/api/exam/get-problem/<exam_code>",
"/api/exam/participants/<exam_code>",
"/api/exam/start-exam",
"/api/exam/stop-exam",
"/api/compiler/execute"
]
}), 404
@@ -441,52 +1137,9 @@ def internal_error(e):
"timestamp": datetime.now().isoformat()
}), 500
@app.route('/api/exam/update-duration', methods=['POST', 'OPTIONS'])
def update_duration_direct():
"""Direct endpoint for duration update"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
exam_code = data.get('exam_code', '').upper()
duration_minutes = data.get('duration_minutes', 0)
print(f"⏰ DIRECT: Updating duration for exam {exam_code} to {duration_minutes} minutes")
if not exam_code or duration_minutes <= 0:
return jsonify({"success": False, "error": "Invalid data"}), 400
# Get database
db = get_db()
# Find and update exam
result = db.exams.update_one(
{"exam_code": exam_code, "status": "waiting"},
{"$set": {"duration_minutes": duration_minutes}}
)
if result.modified_count > 0:
print(f"✅ Duration updated to {duration_minutes} minutes")
return jsonify({
"success": True,
"message": f"Duration updated to {duration_minutes} minutes"
})
else:
return jsonify({"success": False, "error": "Exam not found or already started"}), 404
except Exception as e:
print(f"❌ Error: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
# Startup initialization
def initialize_application():
logger.info("🚀 Initializing OpenLearnX Backend")
logger.info("🚀 Initializing OpenLearnX Backend with Dynamic Scoring")
# Show blueprint registration status
logger.info(f"📋 Blueprints registered: {len(blueprints_registered)}")
@@ -522,23 +1175,41 @@ def initialize_application():
if COMPILER_SERVICE_AVAILABLE and not check_docker_availability():
logger.warning("⚠️ Docker unavailable")
# Log direct endpoints
logger.info("🔧 Direct exam endpoints added:")
# Log enhanced endpoints
logger.info("🔧 Enhanced exam endpoints with dynamic scoring:")
direct_endpoints = [
"/api/exam/upload-question",
"/api/exam/questions/<exam_code>",
"/api/exam/update-question"
"/api/exam/upload-question (Enhanced with Test Cases)",
"/api/exam/submit-solution (Dynamic Scoring System)",
"/api/exam/leaderboard/<exam_code> (Enhanced Stats)",
"/api/exam/update-duration",
"/api/exam/info/<exam_code>",
"/api/exam/get-problem/<exam_code>",
"/api/exam/participants/<exam_code>",
"/api/exam/start-exam",
"/api/exam/stop-exam",
"/api/compiler/execute"
]
for endpoint in direct_endpoints:
logger.info(f"{endpoint}")
logger.info("🧮 Dynamic Scoring Features:")
scoring_features = [
"Test case based scoring",
"Point distribution per test case",
"Host-defined correct solutions",
"Enhanced leaderboard with test details",
"Automatic output matching"
]
for feature in scoring_features:
logger.info(f" 🎯 {feature}")
return True
if __name__ == '__main__':
initialize_application()
logger.info("🚀 Starting OpenLearnX Backend Server")
logger.info("📚 Features: Coding Exams, Question Upload, Host Panel, Compiler")
logger.info("🚀 Starting OpenLearnX Backend Server with Dynamic Scoring")
logger.info("📚 Enhanced Features: Dynamic Test Case Scoring, Host Solutions, Point-based Tests")
logger.info("🌐 Server starting on http://0.0.0.0:5000")
logger.info("🔧 All /api/* endpoints have CORS enabled")
+521 -140
View File
@@ -1,165 +1,546 @@
from flask import Blueprint, request, jsonify, session
from services.real_compiler_service import real_compiler_service
import uuid
from flask import Blueprint, request, jsonify
import subprocess
import tempfile
import os
import time
import docker
from datetime import datetime
bp = Blueprint('compiler', __name__)
@bp.route("/languages", methods=["GET"])
def get_db():
"""Get MongoDB database connection"""
from pymongo import MongoClient
from flask import current_app
client = MongoClient(current_app.config['MONGODB_URI'])
return client.openlearnx
@bp.route('/execute', methods=['POST', 'OPTIONS'])
def execute_code():
"""Execute code in specified language with Docker support"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
language = data.get('language', 'python').lower()
code = data.get('code', '').strip()
input_data = data.get('input', '')
print(f"🔧 Executing {language} code")
print(f"📝 Code length: {len(code)} characters")
if not code:
return jsonify({"success": False, "error": "No code provided"}), 400
# Execute based on language
if language == 'python':
return execute_python(code, input_data)
elif language == 'java':
return execute_java(code, input_data)
elif language == 'javascript' or language == 'js':
return execute_javascript(code, input_data)
elif language == 'cpp' or language == 'c++':
return execute_cpp(code, input_data)
elif language == 'c':
return execute_c(code, input_data)
else:
return jsonify({
"success": False,
"error": f"Language '{language}' not supported. Available: python, java, javascript, cpp, c"
}), 400
except Exception as e:
print(f"❌ Compiler error: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500
def execute_python(code, input_data=""):
"""Execute Python code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# Execute with subprocess
start_time = time.time()
result = subprocess.run(
['python3', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10, # 10 second timeout
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "python",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Process exited with code {result.returncode}",
"language": "python"
})
finally:
# Clean up temp file
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out (10s limit)"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Python interpreter not found. Please install Python 3."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Python execution error: {str(e)}"
}), 500
def execute_java(code, input_data=""):
"""Execute Java code"""
try:
# Extract class name from code
import re
class_match = re.search(r'public\s+class\s+(\w+)', code)
if not class_match:
return jsonify({
"success": False,
"error": "No public class found. Java code must contain 'public class ClassName'"
}), 400
class_name = class_match.group(1)
# Create temporary directory
temp_dir = tempfile.mkdtemp()
java_file = os.path.join(temp_dir, f"{class_name}.java")
try:
# Write Java code to file
with open(java_file, 'w') as f:
f.write(code)
# Compile Java code
compile_result = subprocess.run(
['javac', java_file],
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "java"
})
# Execute Java code
start_time = time.time()
result = subprocess.run(
['java', class_name],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "java",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "java"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Java compiler/runtime not found. Please install JDK."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Java execution error: {str(e)}"
}), 500
def execute_javascript(code, input_data=""):
"""Execute JavaScript code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
# Add input handling if needed
if input_data:
js_code = f"""
const input = `{input_data}`;
const readline = {{ question: () => input }};
{code}
"""
else:
js_code = code
f.write(js_code)
temp_file = f.name
try:
# Execute with Node.js
start_time = time.time()
result = subprocess.run(
['node', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "javascript",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "javascript"
})
finally:
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Node.js not found. Please install Node.js."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"JavaScript execution error: {str(e)}"
}), 500
def execute_cpp(code, input_data=""):
"""Execute C++ code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
cpp_file = os.path.join(temp_dir, "main.cpp")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C++ code to file
with open(cpp_file, 'w') as f:
f.write(code)
# Compile C++ code
compile_cmd = ['g++', '-o', exe_file, cpp_file, '-std=c++17']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "cpp"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "cpp",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "cpp"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "G++ compiler not found. Please install GCC/G++."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C++ execution error: {str(e)}"
}), 500
def execute_c(code, input_data=""):
"""Execute C code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
c_file = os.path.join(temp_dir, "main.c")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C code to file
with open(c_file, 'w') as f:
f.write(code)
# Compile C code
compile_cmd = ['gcc', '-o', exe_file, c_file, '-std=c99']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "c"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "c",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "c"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "GCC compiler not found. Please install GCC."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C execution error: {str(e)}"
}), 500
@bp.route('/languages', methods=['GET', 'OPTIONS'])
def get_supported_languages():
"""Get list of supported programming languages"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
languages = real_compiler_service.get_supported_languages()
languages = {
"python": {
"name": "Python",
"version": "3.x",
"extension": ".py",
"available": check_language_availability("python3")
},
"java": {
"name": "Java",
"version": "JDK 8+",
"extension": ".java",
"available": check_language_availability("javac")
},
"javascript": {
"name": "JavaScript",
"version": "Node.js",
"extension": ".js",
"available": check_language_availability("node")
},
"cpp": {
"name": "C++",
"version": "GCC/G++",
"extension": ".cpp",
"available": check_language_availability("g++")
},
"c": {
"name": "C",
"version": "GCC",
"extension": ".c",
"available": check_language_availability("gcc")
}
}
return jsonify({
"success": True,
"languages": languages,
"total_languages": len(languages)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/execute", methods=["POST"])
def execute_code():
"""Execute code and return real output"""
try:
data = request.json
# Validate input
code = data.get('code', '').strip()
language = data.get('language', 'python')
input_data = data.get('input', '')
if not code:
return jsonify({"error": "No code provided"}), 400
if language not in [lang['id'] for lang in real_compiler_service.get_supported_languages()]:
return jsonify({"error": f"Language '{language}' not supported"}), 400
# Generate execution ID
execution_id = str(uuid.uuid4())
# Execute code
result = real_compiler_service.execute_code(
code=code,
language=language,
input_data=input_data,
execution_id=execution_id
)
return jsonify(result)
except Exception as e:
return jsonify({"error": f"Execution failed: {str(e)}"}), 500
@bp.route("/execute-async", methods=["POST"])
def execute_code_async():
"""Start asynchronous code execution"""
try:
data = request.json
execution_id = str(uuid.uuid4())
# Add to execution queue
real_compiler_service.execution_queue.put({
'execution_id': execution_id,
'code': data.get('code'),
'language': data.get('language', 'python'),
'input_data': data.get('input', ''),
'callback_url': data.get('callback_url')
})
return jsonify({
"success": True,
"execution_id": execution_id,
"message": "Code execution started",
"status_url": f"/api/compiler/status/{execution_id}"
"total": len(languages),
"available_count": sum(1 for lang in languages.values() if lang["available"])
})
except Exception as e:
return jsonify({"error": str(e)}), 500
return jsonify({"success": False, "error": str(e)}), 500
@bp.route("/status/<execution_id>", methods=["GET"])
def get_execution_status(execution_id):
"""Get status of code execution"""
def check_language_availability(command):
"""Check if a language compiler/interpreter is available"""
try:
status = real_compiler_service.get_execution_status(execution_id)
if status:
return jsonify({
"success": True,
"execution_id": execution_id,
"status": status['status'],
"start_time": status['start_time'].isoformat(),
"language": status['language']
})
else:
return jsonify({
"success": False,
"error": "Execution not found"
}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
result = subprocess.run([command, '--version'],
capture_output=True,
timeout=5)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
@bp.route("/cancel/<execution_id>", methods=["POST"])
def cancel_execution(execution_id):
"""Cancel a running execution"""
@bp.route('/health', methods=['GET'])
def compiler_health():
"""Health check for compiler service"""
try:
success = real_compiler_service.cancel_execution(execution_id)
return jsonify({
"success": success,
"message": "Execution cancelled" if success else "Execution not found"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/test", methods=["POST"])
def test_compiler():
"""Test compiler with sample code"""
try:
language = request.json.get('language', 'python')
test_codes = {
'python': 'print("Hello from OpenLearnX Python Compiler!")\nprint("Current time:", __import__("datetime").datetime.now())',
'java': 'public class Main {\n public static void main(String[] args) {\n System.out.println("Hello from OpenLearnX Java Compiler!");\n }\n}',
'cpp': '#include <iostream>\nint main() {\n std::cout << "Hello from OpenLearnX C++ Compiler!" << std::endl;\n return 0;\n}',
'javascript': 'console.log("Hello from OpenLearnX JavaScript Compiler!");',
'go': 'package main\nimport "fmt"\nfunc main() {\n fmt.Println("Hello from OpenLearnX Go Compiler!")\n}',
'rust': 'fn main() {\n println!("Hello from OpenLearnX Rust Compiler!");\n}'
languages_status = {
"python": check_language_availability("python3"),
"java": check_language_availability("javac"),
"javascript": check_language_availability("node"),
"cpp": check_language_availability("g++"),
"c": check_language_availability("gcc")
}
test_code = test_codes.get(language, test_codes['python'])
available_languages = sum(languages_status.values())
total_languages = len(languages_status)
result = real_compiler_service.execute_code(
code=test_code,
language=language,
input_data=""
)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/stats", methods=["GET"])
def get_compiler_stats():
"""Get compiler service statistics"""
try:
active_executions = len(real_compiler_service.active_executions)
queue_size = real_compiler_service.execution_queue.qsize()
supported_languages = len(real_compiler_service.language_configs)
status = "healthy" if available_languages > 0 else "unavailable"
return jsonify({
"success": True,
"stats": {
"active_executions": active_executions,
"queue_size": queue_size,
"supported_languages": supported_languages,
"max_concurrent": real_compiler_service.max_concurrent_executions
},
"uptime": datetime.now().isoformat()
"status": status,
"timestamp": datetime.now().isoformat(),
"languages": languages_status,
"available_languages": available_languages,
"total_languages": total_languages,
"docker_available": check_docker_availability()
})
except Exception as e:
return jsonify({"error": str(e)}), 500
return jsonify({
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 500
def check_docker_availability():
"""Check if Docker is available for containerized execution"""
try:
client = docker.from_env()
client.ping()
return True
except:
return False
+30 -11
View File
@@ -47,14 +47,15 @@ export default function EnhancedExamInterface() {
const [isSubmitting, setIsSubmitting] = useState(false)
const [hasSubmitted, setHasSubmitted] = useState(false)
const [examStats, setExamStats] = useState<any>({})
// ✅ ADD TIMER INITIALIZED STATE
const [timerInitialized, setTimerInitialized] = useState(false)
const router = useRouter()
const languageIcons: {[key: string]: string} = {
python: '🐍',
java: '☕',
c: '',
javascript: '🌐',
cpp: '⚡',
c: '🔧',
bash: '💻'
}
@@ -176,6 +177,7 @@ export default function EnhancedExamInterface() {
setTestResults([])
}
// ✅ FIXED RUNCODE FUNCTION - Updated to use correct endpoint
const runCode = async () => {
if (!code.trim()) {
alert('Please write some code first!')
@@ -187,25 +189,41 @@ export default function EnhancedExamInterface() {
setTestResults([])
try {
const response = await fetch('http://127.0.0.1:5000/api/exam/execute-code', {
console.log('🔧 Sending code to compiler...')
// ✅ FIXED: Use correct endpoint /api/compiler/execute instead of /api/exam/execute-code
const response = await fetch('http://127.0.0.1:5000/api/compiler/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
code,
language: selectedLanguage
language: selectedLanguage,
code: code,
input: ''
})
})
const result = await response.json()
console.log('📦 Compiler result:', result)
if (result.success) {
setOutput('Code executed successfully!')
setTestResults(result.test_results || [])
setOutput(`Code executed successfully!\n${result.output}`)
if (result.execution_time) {
setOutput(prev => prev + `\n⏱️ Execution time: ${result.execution_time}s`)
}
if (result.error) {
setOutput(prev => prev + `\n⚠️ Warnings:\n${result.error}`)
}
// If there are test results from backend, show them
if (result.test_results) {
setTestResults(result.test_results)
}
} else {
setOutput(`Error: ${result.error}`)
setOutput(`Error:\n${result.error}`)
}
} catch (error) {
setOutput(`Execution failed: ${(error as Error).message}`)
console.error('❌ Compiler network error:', error)
setOutput(`❌ Network error: Could not connect to compiler service.\nPlease check if the backend is running on port 5000.`)
} finally {
setIsRunning(false)
}
@@ -224,8 +242,9 @@ export default function EnhancedExamInterface() {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
code,
language: selectedLanguage
exam_code: examSession?.exam_code,
language: selectedLanguage,
code: code
})
})
File diff suppressed because it is too large Load Diff