mirror of
https://github.com/th30d4y/OpenLearnX.git
synced 2026-05-26 19:26:33 +00:00
227 lines
8.3 KiB
Python
227 lines
8.3 KiB
Python
from datetime import datetime
|
|
import os
|
|
|
|
from flask import Blueprint, jsonify, request
|
|
from pymongo import MongoClient
|
|
|
|
from services.real_compiler_service import real_compiler_service
|
|
|
|
bp = Blueprint("compiler", __name__)
|
|
|
|
mongo_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
|
|
client = MongoClient(mongo_uri)
|
|
db = client.openlearnx
|
|
|
|
|
|
def _json_response(payload, status=200):
|
|
response = jsonify(payload)
|
|
response.headers.add("Access-Control-Allow-Origin", "*")
|
|
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
|
|
response.headers.add("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
|
|
return response, status
|
|
|
|
|
|
def _client_ip():
|
|
forwarded_for = request.headers.get("X-Forwarded-For", "")
|
|
if forwarded_for:
|
|
return forwarded_for.split(",")[0].strip()
|
|
return request.remote_addr or "unknown"
|
|
|
|
|
|
def _log_security(event_type, action, severity="info", status_code=200, metadata=None):
|
|
try:
|
|
log_doc = {
|
|
"timestamp": datetime.utcnow(),
|
|
"event_type": event_type,
|
|
"action": action,
|
|
"status_code": int(status_code),
|
|
"severity": severity,
|
|
"path": request.path,
|
|
"method": request.method,
|
|
"ip": _client_ip(),
|
|
"user_agent": request.headers.get("User-Agent", ""),
|
|
"metadata": metadata or {},
|
|
"metadata_text": str(metadata or {}),
|
|
}
|
|
db.security_logs.insert_one(log_doc)
|
|
except Exception as e:
|
|
print(f"Compiler security log failure: {e}")
|
|
|
|
|
|
@bp.route("/execute", methods=["POST", "OPTIONS"])
|
|
def execute_code():
|
|
if request.method == "OPTIONS":
|
|
return _json_response({"status": "ok"}, 200)
|
|
|
|
try:
|
|
data = request.get_json(silent=True) or {}
|
|
language = str(data.get("language", "python")).strip().lower()
|
|
code = str(data.get("code", ""))
|
|
input_data = str(data.get("input", ""))
|
|
|
|
if not code.strip():
|
|
_log_security(
|
|
"compiler_input_invalid",
|
|
"empty_code_submission",
|
|
severity="warning",
|
|
status_code=400,
|
|
metadata={"language": language},
|
|
)
|
|
return _json_response({"success": False, "error": "No code provided"}, 400)
|
|
|
|
result = real_compiler_service.execute_code(code=code, language=language, input_data=input_data)
|
|
|
|
log_metadata = {
|
|
"language": language,
|
|
"code_size": len(code),
|
|
"execution_id": result.get("execution_id"),
|
|
"exit_code": result.get("exit_code"),
|
|
"execution_time": result.get("execution_time", 0),
|
|
"memory_used": result.get("memory_used", 0),
|
|
"blocked": bool(result.get("blocked")),
|
|
"security_violations": result.get("security_violations", []),
|
|
"error": result.get("error", ""),
|
|
}
|
|
|
|
try:
|
|
status = "success"
|
|
if result.get("blocked"):
|
|
status = "blocked"
|
|
elif result.get("error") and not result.get("success", False):
|
|
status = "failed"
|
|
|
|
request_payload = {
|
|
"language": language,
|
|
"code": code[:4000],
|
|
"code_size": len(code),
|
|
"input": input_data[:2000],
|
|
"input_size": len(input_data),
|
|
}
|
|
response_payload = {
|
|
"success": bool(result.get("success")),
|
|
"blocked": bool(result.get("blocked")),
|
|
"execution_id": result.get("execution_id"),
|
|
"output": (result.get("output") or "")[:4000],
|
|
"error": result.get("error", ""),
|
|
"security_violations": result.get("security_violations", []),
|
|
"execution_time": result.get("execution_time", 0),
|
|
"memory_used": result.get("memory_used", 0),
|
|
"exit_code": result.get("exit_code", 0),
|
|
}
|
|
|
|
db.code_execution_events.insert_one(
|
|
{
|
|
"timestamp": datetime.utcnow(),
|
|
"event_type": "execution",
|
|
"source": "compiler",
|
|
"language": language,
|
|
"execution_id": result.get("execution_id"),
|
|
"execution_time": result.get("execution_time", 0),
|
|
"memory_used": result.get("memory_used", 0),
|
|
"exit_code": result.get("exit_code", 0),
|
|
"status": status,
|
|
"blocked": bool(result.get("blocked")),
|
|
"security_violations": result.get("security_violations", []),
|
|
"error": result.get("error", ""),
|
|
"request_body": request_payload,
|
|
"response_body": response_payload,
|
|
"ip": _client_ip(),
|
|
"user_agent": request.headers.get("User-Agent", ""),
|
|
}
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if result.get("blocked"):
|
|
_log_security(
|
|
"compiler_security_block",
|
|
"static_policy_blocked_submission",
|
|
severity="warning",
|
|
status_code=400,
|
|
metadata=log_metadata,
|
|
)
|
|
return _json_response({"success": False, **result}, 400)
|
|
|
|
if result.get("error") and not result.get("success", False):
|
|
_log_security(
|
|
"compiler_execution_failed",
|
|
"secure_container_execution_failed",
|
|
severity="warning",
|
|
status_code=400,
|
|
metadata=log_metadata,
|
|
)
|
|
return _json_response({"success": False, **result}, 400)
|
|
|
|
_log_security(
|
|
"compiler_execution_success",
|
|
"secure_container_execution_completed",
|
|
severity="info",
|
|
status_code=200,
|
|
metadata=log_metadata,
|
|
)
|
|
|
|
return _json_response(result, 200)
|
|
|
|
except Exception as e:
|
|
_log_security(
|
|
"compiler_internal_error",
|
|
"compiler_route_exception",
|
|
severity="error",
|
|
status_code=500,
|
|
metadata={"error": str(e)},
|
|
)
|
|
return _json_response({"success": False, "error": f"Server error: {str(e)}"}, 500)
|
|
|
|
|
|
@bp.route("/languages", methods=["GET", "OPTIONS"])
|
|
def get_supported_languages():
|
|
if request.method == "OPTIONS":
|
|
return _json_response({"status": "ok"}, 200)
|
|
|
|
try:
|
|
return _json_response({"success": True, "languages": real_compiler_service.get_supported_languages()}, 200)
|
|
except Exception as e:
|
|
return _json_response({"success": False, "error": str(e)}, 500)
|
|
|
|
|
|
@bp.route("/test", methods=["POST", "OPTIONS"])
|
|
def compiler_test():
|
|
if request.method == "OPTIONS":
|
|
return _json_response({"status": "ok"}, 200)
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
language = str(data.get("language", "python")).strip().lower()
|
|
|
|
smoke_code = {
|
|
"python": 'print("ok")',
|
|
"javascript": 'console.log("ok")',
|
|
"java": "public class Main { public static void main(String[] args){ System.out.println(\"ok\"); } }",
|
|
"c": "#include <stdio.h>\nint main(){ printf(\"ok\\n\"); return 0; }",
|
|
"cpp": "#include <iostream>\nint main(){ std::cout << \"ok\\n\"; return 0; }",
|
|
"go": "package main\nimport \"fmt\"\nfunc main(){ fmt.Println(\"ok\") }",
|
|
"rust": "fn main(){ println!(\"ok\"); }",
|
|
}
|
|
|
|
if language not in smoke_code:
|
|
return _json_response({"success": False, "error": f"Unsupported language: {language}"}, 400)
|
|
|
|
result = real_compiler_service.execute_code(smoke_code[language], language, "")
|
|
if result.get("success"):
|
|
return _json_response(result, 200)
|
|
return _json_response({"success": False, **result}, 400)
|
|
|
|
|
|
@bp.route("/health", methods=["GET"])
|
|
def compiler_health():
|
|
docker_ok = real_compiler_service._get_docker_client() is not None and real_compiler_service.docker_available
|
|
return _json_response(
|
|
{
|
|
"status": "healthy" if docker_ok else "degraded",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"docker_available": docker_ok,
|
|
"secure_execution_only": True,
|
|
"supported_languages": [l["id"] for l in real_compiler_service.get_supported_languages()],
|
|
},
|
|
200 if docker_ok else 503,
|
|
)
|