fix(security): harden execution sandbox and add dedicated admin execution logs

This commit is contained in:
Stalin
2026-04-19 23:03:08 +05:30
parent d19c4e4d15
commit 14765d7d18
12 changed files with 1296 additions and 834 deletions
+210 -530
View File
@@ -1,546 +1,226 @@
from flask import Blueprint, request, jsonify
import subprocess
import tempfile
import os
import time
import docker
from datetime import datetime
import os
bp = Blueprint('compiler', __name__)
from flask import Blueprint, jsonify, request
from pymongo import MongoClient
def get_db():
"""Get MongoDB database connection"""
from pymongo import MongoClient
from flask import current_app
client = MongoClient(current_app.config['MONGODB_URI'])
return client.openlearnx
from services.real_compiler_service import real_compiler_service
@bp.route('/execute', methods=['POST', 'OPTIONS'])
bp = Blueprint("compiler", __name__)
mongo_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
client = MongoClient(mongo_uri)
db = client.openlearnx
def _json_response(payload, status=200):
response = jsonify(payload)
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
return response, status
def _client_ip():
forwarded_for = request.headers.get("X-Forwarded-For", "")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
return request.remote_addr or "unknown"
def _log_security(event_type, action, severity="info", status_code=200, metadata=None):
try:
log_doc = {
"timestamp": datetime.utcnow(),
"event_type": event_type,
"action": action,
"status_code": int(status_code),
"severity": severity,
"path": request.path,
"method": request.method,
"ip": _client_ip(),
"user_agent": request.headers.get("User-Agent", ""),
"metadata": metadata or {},
"metadata_text": str(metadata or {}),
}
db.security_logs.insert_one(log_doc)
except Exception as e:
print(f"Compiler security log failure: {e}")
@bp.route("/execute", methods=["POST", "OPTIONS"])
def execute_code():
"""Execute code in specified language with Docker support"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "POST,OPTIONS")
return response
try:
data = request.get_json()
language = data.get('language', 'python').lower()
code = data.get('code', '').strip()
input_data = data.get('input', '')
print(f"🔧 Executing {language} code")
print(f"📝 Code length: {len(code)} characters")
if not code:
return jsonify({"success": False, "error": "No code provided"}), 400
# Execute based on language
if language == 'python':
return execute_python(code, input_data)
elif language == 'java':
return execute_java(code, input_data)
elif language == 'javascript' or language == 'js':
return execute_javascript(code, input_data)
elif language == 'cpp' or language == 'c++':
return execute_cpp(code, input_data)
elif language == 'c':
return execute_c(code, input_data)
else:
return jsonify({
"success": False,
"error": f"Language '{language}' not supported. Available: python, java, javascript, cpp, c"
}), 400
except Exception as e:
print(f"❌ Compiler error: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500
return _json_response({"status": "ok"}, 200)
def execute_python(code, input_data=""):
"""Execute Python code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
data = request.get_json(silent=True) or {}
language = str(data.get("language", "python")).strip().lower()
code = str(data.get("code", ""))
input_data = str(data.get("input", ""))
if not code.strip():
_log_security(
"compiler_input_invalid",
"empty_code_submission",
severity="warning",
status_code=400,
metadata={"language": language},
)
return _json_response({"success": False, "error": "No code provided"}, 400)
result = real_compiler_service.execute_code(code=code, language=language, input_data=input_data)
log_metadata = {
"language": language,
"code_size": len(code),
"execution_id": result.get("execution_id"),
"exit_code": result.get("exit_code"),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"blocked": bool(result.get("blocked")),
"security_violations": result.get("security_violations", []),
"error": result.get("error", ""),
}
try:
# Execute with subprocess
start_time = time.time()
result = subprocess.run(
['python3', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10, # 10 second timeout
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "python",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Process exited with code {result.returncode}",
"language": "python"
})
finally:
# Clean up temp file
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out (10s limit)"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Python interpreter not found. Please install Python 3."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Python execution error: {str(e)}"
}), 500
status = "success"
if result.get("blocked"):
status = "blocked"
elif result.get("error") and not result.get("success", False):
status = "failed"
def execute_java(code, input_data=""):
"""Execute Java code"""
try:
# Extract class name from code
import re
class_match = re.search(r'public\s+class\s+(\w+)', code)
if not class_match:
return jsonify({
"success": False,
"error": "No public class found. Java code must contain 'public class ClassName'"
}), 400
class_name = class_match.group(1)
# Create temporary directory
temp_dir = tempfile.mkdtemp()
java_file = os.path.join(temp_dir, f"{class_name}.java")
try:
# Write Java code to file
with open(java_file, 'w') as f:
f.write(code)
# Compile Java code
compile_result = subprocess.run(
['javac', java_file],
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "java"
})
# Execute Java code
start_time = time.time()
result = subprocess.run(
['java', class_name],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "java",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "java"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Java compiler/runtime not found. Please install JDK."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"Java execution error: {str(e)}"
}), 500
def execute_javascript(code, input_data=""):
"""Execute JavaScript code"""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
# Add input handling if needed
if input_data:
js_code = f"""
const input = `{input_data}`;
const readline = {{ question: () => input }};
{code}
"""
else:
js_code = code
f.write(js_code)
temp_file = f.name
try:
# Execute with Node.js
start_time = time.time()
result = subprocess.run(
['node', temp_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=tempfile.gettempdir()
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "javascript",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "javascript"
})
finally:
try:
os.unlink(temp_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "Node.js not found. Please install Node.js."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"JavaScript execution error: {str(e)}"
}), 500
def execute_cpp(code, input_data=""):
"""Execute C++ code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
cpp_file = os.path.join(temp_dir, "main.cpp")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C++ code to file
with open(cpp_file, 'w') as f:
f.write(code)
# Compile C++ code
compile_cmd = ['g++', '-o', exe_file, cpp_file, '-std=c++17']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "cpp"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "cpp",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "cpp"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "G++ compiler not found. Please install GCC/G++."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C++ execution error: {str(e)}"
}), 500
def execute_c(code, input_data=""):
"""Execute C code"""
try:
# Create temporary files
temp_dir = tempfile.mkdtemp()
c_file = os.path.join(temp_dir, "main.c")
exe_file = os.path.join(temp_dir, "main.exe") if os.name == 'nt' else os.path.join(temp_dir, "main")
try:
# Write C code to file
with open(c_file, 'w') as f:
f.write(code)
# Compile C code
compile_cmd = ['gcc', '-o', exe_file, c_file, '-std=c99']
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=30,
cwd=temp_dir
)
if compile_result.returncode != 0:
return jsonify({
"success": False,
"error": f"Compilation error:\n{compile_result.stderr}",
"language": "c"
})
# Execute compiled program
start_time = time.time()
result = subprocess.run(
[exe_file],
input=input_data,
text=True,
capture_output=True,
timeout=10,
cwd=temp_dir
)
execution_time = time.time() - start_time
if result.returncode == 0:
return jsonify({
"success": True,
"output": result.stdout or "Code executed successfully (no output)",
"error": result.stderr if result.stderr else None,
"language": "c",
"execution_time": round(execution_time, 3)
})
else:
return jsonify({
"success": False,
"error": result.stderr or f"Runtime error (exit code {result.returncode})",
"language": "c"
})
finally:
# Clean up temp files
import shutil
try:
shutil.rmtree(temp_dir)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Code execution timed out"
}), 400
except FileNotFoundError:
return jsonify({
"success": False,
"error": "GCC compiler not found. Please install GCC."
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": f"C execution error: {str(e)}"
}), 500
@bp.route('/languages', methods=['GET', 'OPTIONS'])
def get_supported_languages():
"""Get list of supported programming languages"""
if request.method == "OPTIONS":
response = jsonify({'status': 'ok'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,OPTIONS")
return response
try:
languages = {
"python": {
"name": "Python",
"version": "3.x",
"extension": ".py",
"available": check_language_availability("python3")
},
"java": {
"name": "Java",
"version": "JDK 8+",
"extension": ".java",
"available": check_language_availability("javac")
},
"javascript": {
"name": "JavaScript",
"version": "Node.js",
"extension": ".js",
"available": check_language_availability("node")
},
"cpp": {
"name": "C++",
"version": "GCC/G++",
"extension": ".cpp",
"available": check_language_availability("g++")
},
"c": {
"name": "C",
"version": "GCC",
"extension": ".c",
"available": check_language_availability("gcc")
request_payload = {
"language": language,
"code": code[:4000],
"code_size": len(code),
"input": input_data[:2000],
"input_size": len(input_data),
}
}
return jsonify({
"success": True,
"languages": languages,
"total": len(languages),
"available_count": sum(1 for lang in languages.values() if lang["available"])
})
response_payload = {
"success": bool(result.get("success")),
"blocked": bool(result.get("blocked")),
"execution_id": result.get("execution_id"),
"output": (result.get("output") or "")[:4000],
"error": result.get("error", ""),
"security_violations": result.get("security_violations", []),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"exit_code": result.get("exit_code", 0),
}
db.code_execution_events.insert_one(
{
"timestamp": datetime.utcnow(),
"event_type": "execution",
"source": "compiler",
"language": language,
"execution_id": result.get("execution_id"),
"execution_time": result.get("execution_time", 0),
"memory_used": result.get("memory_used", 0),
"exit_code": result.get("exit_code", 0),
"status": status,
"blocked": bool(result.get("blocked")),
"security_violations": result.get("security_violations", []),
"error": result.get("error", ""),
"request_body": request_payload,
"response_body": response_payload,
"ip": _client_ip(),
"user_agent": request.headers.get("User-Agent", ""),
}
)
except Exception:
pass
if result.get("blocked"):
_log_security(
"compiler_security_block",
"static_policy_blocked_submission",
severity="warning",
status_code=400,
metadata=log_metadata,
)
return _json_response({"success": False, **result}, 400)
if result.get("error") and not result.get("success", False):
_log_security(
"compiler_execution_failed",
"secure_container_execution_failed",
severity="warning",
status_code=400,
metadata=log_metadata,
)
return _json_response({"success": False, **result}, 400)
_log_security(
"compiler_execution_success",
"secure_container_execution_completed",
severity="info",
status_code=200,
metadata=log_metadata,
)
return _json_response(result, 200)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
_log_security(
"compiler_internal_error",
"compiler_route_exception",
severity="error",
status_code=500,
metadata={"error": str(e)},
)
return _json_response({"success": False, "error": f"Server error: {str(e)}"}, 500)
@bp.route("/languages", methods=["GET", "OPTIONS"])
def get_supported_languages():
if request.method == "OPTIONS":
return _json_response({"status": "ok"}, 200)
def check_language_availability(command):
"""Check if a language compiler/interpreter is available"""
try:
result = subprocess.run([command, '--version'],
capture_output=True,
timeout=5)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
return _json_response({"success": True, "languages": real_compiler_service.get_supported_languages()}, 200)
except Exception as e:
return _json_response({"success": False, "error": str(e)}, 500)
@bp.route('/health', methods=['GET'])
@bp.route("/test", methods=["POST", "OPTIONS"])
def compiler_test():
if request.method == "OPTIONS":
return _json_response({"status": "ok"}, 200)
data = request.get_json(silent=True) or {}
language = str(data.get("language", "python")).strip().lower()
smoke_code = {
"python": 'print("ok")',
"javascript": 'console.log("ok")',
"java": "public class Main { public static void main(String[] args){ System.out.println(\"ok\"); } }",
"c": "#include <stdio.h>\nint main(){ printf(\"ok\\n\"); return 0; }",
"cpp": "#include <iostream>\nint main(){ std::cout << \"ok\\n\"; return 0; }",
"go": "package main\nimport \"fmt\"\nfunc main(){ fmt.Println(\"ok\") }",
"rust": "fn main(){ println!(\"ok\"); }",
}
if language not in smoke_code:
return _json_response({"success": False, "error": f"Unsupported language: {language}"}, 400)
result = real_compiler_service.execute_code(smoke_code[language], language, "")
if result.get("success"):
return _json_response(result, 200)
return _json_response({"success": False, **result}, 400)
@bp.route("/health", methods=["GET"])
def compiler_health():
"""Health check for compiler service"""
try:
languages_status = {
"python": check_language_availability("python3"),
"java": check_language_availability("javac"),
"javascript": check_language_availability("node"),
"cpp": check_language_availability("g++"),
"c": check_language_availability("gcc")
}
available_languages = sum(languages_status.values())
total_languages = len(languages_status)
status = "healthy" if available_languages > 0 else "unavailable"
return jsonify({
"status": status,
"timestamp": datetime.now().isoformat(),
"languages": languages_status,
"available_languages": available_languages,
"total_languages": total_languages,
"docker_available": check_docker_availability()
})
except Exception as e:
return jsonify({
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 500
def check_docker_availability():
"""Check if Docker is available for containerized execution"""
try:
client = docker.from_env()
client.ping()
return True
except:
return False
docker_ok = real_compiler_service._get_docker_client() is not None and real_compiler_service.docker_available
return _json_response(
{
"status": "healthy" if docker_ok else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"docker_available": docker_ok,
"secure_execution_only": True,
"supported_languages": [l["id"] for l in real_compiler_service.get_supported_languages()],
},
200 if docker_ok else 503,
)