Files
OpenLearnX/backend/routes/coding.py

316 lines
11 KiB
Python

from flask import Blueprint, request, jsonify, session
from functools import wraps
import subprocess
import tempfile
import os
import time
import uuid
from datetime import datetime
import docker
import psutil
from pymongo import MongoClient
from activity_logger import log_user_activity, resolve_user_identity
from services.real_compiler_service import real_compiler_service
bp = Blueprint('coding', __name__)
# MongoDB connection
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
client = MongoClient(mongo_uri)
db = client.openlearnx
def secure_execution_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Check if user is in secure coding mode
if not session.get('secure_coding_mode'):
return jsonify({"error": "Secure coding mode required"}), 403
return f(*args, **kwargs)
return decorated_function
@bp.route("/start-session", methods=["POST"])
def start_coding_session():
"""Start a secure coding session"""
try:
data = request.json
course_id = data.get('course_id')
lesson_id = data.get('lesson_id')
session_id = str(uuid.uuid4())
session['coding_session_id'] = session_id
session['secure_coding_mode'] = True
session['start_time'] = datetime.now().isoformat()
session['course_id'] = course_id
session['lesson_id'] = lesson_id
identity = resolve_user_identity(request, db)
log_user_activity(
db,
identity.get("user_id"),
"coding",
"Started coding session",
"Entered secure coding session",
{
"session_id": session_id,
"course_id": course_id,
"lesson_id": lesson_id,
},
)
return jsonify({
"success": True,
"session_id": session_id,
"message": "Secure coding session started",
"restrictions": {
"copy_paste_disabled": True,
"browser_locked": True,
"extensions_blocked": True,
"virtual_detection": True
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/execute", methods=["POST"])
@secure_execution_required
def execute_code():
"""Execute code securely in isolated environment"""
try:
data = request.json
code = data.get('code')
language = data.get('language', 'python')
test_cases = data.get('test_cases', [])
if not code:
return jsonify({"error": "No code provided"}), 400
# Log coding attempt
log_coding_attempt(session['coding_session_id'], code, language)
# Execute code in hardened Docker sandbox
result = real_compiler_service.execute_code(code=code, language=language, input_data="")
event_type = "coding_execution_success" if result.get("success") else "coding_execution_blocked"
severity = "info" if result.get("success") else "warning"
execution_status = "success" if result.get("success") else "failed"
if result.get("blocked"):
execution_status = "blocked"
db.security_logs.insert_one({
"timestamp": datetime.utcnow(),
"event_type": event_type,
"action": "secure_coding_execute",
"status_code": 200 if result.get("success") else 400,
"severity": severity,
"path": request.path,
"method": request.method,
"ip": request.remote_addr or "unknown",
"user_agent": request.headers.get("User-Agent", ""),
"metadata": {
"language": language,
"execution_id": result.get("execution_id"),
"blocked": bool(result.get("blocked")),
"security_violations": result.get("security_violations", []),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"exit_code": result.get("exit_code", -1),
},
"metadata_text": str(result.get("security_violations", [])),
})
try:
request_payload = {
"language": language,
"code": (code or "")[:4000],
"code_size": len(code or ""),
"test_case_count": len(test_cases) if isinstance(test_cases, list) else 0,
}
response_payload = {
"success": bool(result.get("success")),
"blocked": bool(result.get("blocked")),
"execution_id": result.get("execution_id"),
"output": (result.get("output") or "")[:4000],
"error": result.get("error", ""),
"security_violations": result.get("security_violations", []),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"exit_code": result.get("exit_code", -1),
}
db.code_execution_events.insert_one({
"timestamp": datetime.utcnow(),
"event_type": "execution",
"source": "coding",
"language": language,
"execution_id": result.get("execution_id"),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"exit_code": result.get("exit_code", -1),
"status": execution_status,
"blocked": bool(result.get("blocked")),
"security_violations": result.get("security_violations", []),
"error": result.get("error", ""),
"request_body": request_payload,
"response_body": response_payload,
"ip": request.remote_addr or "unknown",
"user_agent": request.headers.get("User-Agent", ""),
})
except Exception:
pass
if result.get("success"):
return jsonify({"success": True, **result})
return jsonify({"success": False, **result}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/submit-test", methods=["POST"])
@secure_execution_required
def submit_coding_test():
"""Submit coding test for evaluation"""
try:
data = request.json
code = data.get('code')
problem_id = data.get('problem_id')
# Validate against test cases
test_result = validate_test_submission(code, problem_id)
# Store submission
submission_id = store_submission(
session['coding_session_id'],
session['course_id'],
problem_id,
code,
test_result
)
identity = resolve_user_identity(request, db)
resolved_user_id = identity.get("user_id")
if resolved_user_id:
db.user_submissions.insert_one({
"user_id": resolved_user_id,
"session_id": session.get('coding_session_id'),
"course_id": session.get('course_id'),
"problem_id": problem_id,
"score": test_result.get('score', 0),
"points_earned": int(test_result.get('score', 0)),
"submitted_at": datetime.now(),
"status": "submitted",
})
log_user_activity(
db,
resolved_user_id,
"coding",
"Submitted coding solution",
f"Submitted coding test for problem '{problem_id}'",
{
"submission_id": submission_id,
"problem_id": problem_id,
"score": test_result.get('score', 0),
"passed": test_result.get('passed', 0),
"total": test_result.get('total', 0),
},
points_earned=int(test_result.get('score', 0)),
)
return jsonify({
"success": True,
"submission_id": submission_id,
"score": test_result['score'],
"passed_tests": test_result['passed'],
"total_tests": test_result['total'],
"feedback": test_result['feedback']
})
except Exception as e:
return jsonify({"error": str(e)}), 500
def execute_in_container(code, language, test_cases):
"""Backward-compatible wrapper around the hardened compiler service."""
result = real_compiler_service.execute_code(code=code, language=language, input_data="")
if result.get("success"):
return {
"success": True,
"output": result.get("output", ""),
"test_results": [],
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"execution_id": result.get("execution_id"),
}
return {
"success": False,
"error": result.get("error", "Execution failed"),
"security_violations": result.get("security_violations", []),
"execution_id": result.get("execution_id"),
}
def get_file_extension(language):
extensions = {
'python': 'py',
'java': 'java',
'javascript': 'js'
}
return extensions.get(language, 'txt')
def get_run_command(language, filename):
commands = {
'python': f'python /app/{os.path.basename(filename)}',
'java': f'javac /app/{os.path.basename(filename)} && java -cp /app {os.path.splitext(os.path.basename(filename))[0]}',
'javascript': f'node /app/{os.path.basename(filename)}'
}
return commands.get(language)
def log_coding_attempt(session_id, code, language):
"""Log all coding attempts for monitoring"""
db.coding_logs.insert_one({
"session_id": session_id,
"code": code,
"language": language,
"timestamp": datetime.now(),
"ip_address": request.remote_addr,
"user_agent": request.headers.get('User-Agent')
})
def validate_test_submission(code, problem_id):
"""Validate code against predefined test cases"""
# Load test cases for the problem
test_cases = get_problem_test_cases(problem_id)
passed = 0
total = len(test_cases)
feedback = []
for i, test_case in enumerate(test_cases):
result = run_test_case(code, 'python', test_case)
if result['passed']:
passed += 1
feedback.append(f"Test {i+1}: ✅ Passed")
else:
feedback.append(f"Test {i+1}: ❌ Failed - {result['error']}")
score = (passed / total) * 100
return {
"score": score,
"passed": passed,
"total": total,
"feedback": feedback
}
def get_problem_test_cases(problem_id):
"""Get test cases for a specific problem"""
# This would load from your database
test_cases_db = {
"python-basics-1": [
{"input": "hello", "expected_output": "HELLO"},
{"input": "world", "expected_output": "WORLD"}
],
"java-oop-1": [
{"input": "5", "expected_output": "25"},
{"input": "10", "expected_output": "100"}
]
}
return test_cases_db.get(problem_id, [])