From 05f081b20537f2718dc73dcb3571bce057afaec1 Mon Sep 17 00:00:00 2001 From: Stalin Date: Thu, 7 May 2026 16:57:42 +0530 Subject: [PATCH] Fix JWT signature verification vulnerability (GHSA-223g-f5mq-gw33) - Enable proper JWT signature verification in backend/routes/dashboard.py - Enable proper JWT signature verification in backend/main.py - Enable proper JWT signature verification in backend/activity_logger.py - Replace verify_signature=False with cryptographic verification using JWT_SECRET_KEY - Prevents JWT forgery attacks and account takeover --- backend/activity_logger.py | 32 +++++++++++++++++++++++++++----- backend/main.py | 20 ++++++++++++-------- backend/routes/dashboard.py | 27 +++++++++++++++++++-------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/backend/activity_logger.py b/backend/activity_logger.py index a697bf4..711c571 100644 --- a/backend/activity_logger.py +++ b/backend/activity_logger.py @@ -1,28 +1,50 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional +import os import jwt -def _decode_token_unverified(token: str) -> Dict[str, Any]: +def _decode_token_verified(token: str, secret: str = None) -> Dict[str, Any]: + """Decode and verify JWT token signature. + + Args: + token: The JWT token to decode + secret: The secret key for verification. If not provided, attempts to get from environment. + + Returns: + Decoded token payload, or empty dict if verification fails + """ + if not secret: + secret = os.getenv('JWT_SECRET_KEY') + + if not secret: + return {} + try: return jwt.decode( token, - options={"verify_signature": False}, + secret, algorithms=["HS256", "RS256"], ) except Exception: return {} -def resolve_user_identity(request, db=None) -> Dict[str, Optional[str]]: - """Best-effort identity resolution from auth header, headers, payload, and optional DB lookup.""" +def resolve_user_identity(request, db=None, jwt_secret: str = None) -> Dict[str, Optional[str]]: + """Best-effort identity resolution from auth header, headers, payload, and optional DB lookup. + + Args: + request: Flask request object + db: MongoDB database connection (optional) + jwt_secret: JWT secret for token verification. If not provided, attempts to get from environment. + """ token = None auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header.split(" ", 1)[1] - payload = _decode_token_unverified(token) if token else {} + payload = _decode_token_verified(token, jwt_secret) if token else {} request_json = request.get_json(silent=True) or {} user_id = ( diff --git a/backend/main.py b/backend/main.py index 33e8c6f..a0d53a7 100644 --- a/backend/main.py +++ b/backend/main.py @@ -440,14 +440,18 @@ def write_request_audit_log(response): auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header.split(" ", 1)[1] - decoded = pyjwt.decode( - token, - options={"verify_signature": False}, - algorithms=["HS256", "RS256"], - ) - auth_user_id = decoded.get("user_id") or decoded.get("sub") or decoded.get("uid") - auth_wallet_address = decoded.get("wallet_address") - auth_email = decoded.get("email") + jwt_secret = app.config.get('JWT_SECRET_KEY') + if jwt_secret: + decoded = pyjwt.decode( + token, + jwt_secret, + algorithms=["HS256", "RS256"], + ) + auth_user_id = decoded.get("user_id") or decoded.get("sub") or decoded.get("uid") + auth_wallet_address = decoded.get("wallet_address") + auth_email = decoded.get("email") + else: + auth_user_id = None except Exception: auth_user_id = None diff --git a/backend/routes/dashboard.py b/backend/routes/dashboard.py index 13075ad..84cf141 100644 --- a/backend/routes/dashboard.py +++ b/backend/routes/dashboard.py @@ -25,14 +25,25 @@ def verify_wallet_authentication(): if auth_header.startswith('Bearer '): try: token = auth_header.split(' ')[1] - # ✅ FIXED: Add algorithms parameter to fix JWT decode error - decoded = jwt.decode( - token, - options={"verify_signature": False}, # For development - algorithms=["HS256", "RS256"] # This fixes the JWT error - ) - user_id = decoded.get('sub') or decoded.get('user_id') or decoded.get('uid') or decoded.get('wallet_address') - wallet_address = decoded.get('wallet_address') or user_id + # ✅ FIXED: Verify JWT signature using JWT_SECRET_KEY + from flask import current_app + jwt_secret = current_app.config.get('JWT_SECRET_KEY') or os.getenv('JWT_SECRET_KEY') + if jwt_secret: + decoded = jwt.decode( + token, + jwt_secret, + algorithms=["HS256", "RS256"] + ) + else: + logger.error("JWT_SECRET_KEY not configured") + decoded = None + + if decoded: + user_id = decoded.get('sub') or decoded.get('user_id') or decoded.get('uid') or decoded.get('wallet_address') + wallet_address = decoded.get('wallet_address') or user_id + else: + user_id = None + wallet_address = None if user_id: logger.info(f"✅ JWT authentication verified: {user_id}")