Fix JWT signature verification vulnerability (GHSA-223g-f5mq-gw33)

- Enable proper JWT signature verification in backend/routes/dashboard.py
- Enable proper JWT signature verification in backend/main.py
- Enable proper JWT signature verification in backend/activity_logger.py
- Replace verify_signature=False with cryptographic verification using JWT_SECRET_KEY
- Prevents JWT forgery attacks and account takeover
This commit is contained in:
Stalin
2026-05-07 16:57:42 +05:30
parent 2f9d94d29d
commit 05f081b205
3 changed files with 58 additions and 21 deletions
+27 -5
View File
@@ -1,28 +1,50 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import os
import jwt import jwt
def _decode_token_unverified(token: str) -> Dict[str, Any]: def _decode_token_verified(token: str, secret: str = None) -> Dict[str, Any]:
"""Decode and verify JWT token signature.
Args:
token: The JWT token to decode
secret: The secret key for verification. If not provided, attempts to get from environment.
Returns:
Decoded token payload, or empty dict if verification fails
"""
if not secret:
secret = os.getenv('JWT_SECRET_KEY')
if not secret:
return {}
try: try:
return jwt.decode( return jwt.decode(
token, token,
options={"verify_signature": False}, secret,
algorithms=["HS256", "RS256"], algorithms=["HS256", "RS256"],
) )
except Exception: except Exception:
return {} return {}
def resolve_user_identity(request, db=None) -> Dict[str, Optional[str]]: def resolve_user_identity(request, db=None, jwt_secret: str = None) -> Dict[str, Optional[str]]:
"""Best-effort identity resolution from auth header, headers, payload, and optional DB lookup.""" """Best-effort identity resolution from auth header, headers, payload, and optional DB lookup.
Args:
request: Flask request object
db: MongoDB database connection (optional)
jwt_secret: JWT secret for token verification. If not provided, attempts to get from environment.
"""
token = None token = None
auth_header = request.headers.get("Authorization", "") auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "): if auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1] token = auth_header.split(" ", 1)[1]
payload = _decode_token_unverified(token) if token else {} payload = _decode_token_verified(token, jwt_secret) if token else {}
request_json = request.get_json(silent=True) or {} request_json = request.get_json(silent=True) or {}
user_id = ( user_id = (
+5 -1
View File
@@ -440,14 +440,18 @@ def write_request_audit_log(response):
auth_header = request.headers.get("Authorization", "") auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "): if auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1] token = auth_header.split(" ", 1)[1]
jwt_secret = app.config.get('JWT_SECRET_KEY')
if jwt_secret:
decoded = pyjwt.decode( decoded = pyjwt.decode(
token, token,
options={"verify_signature": False}, jwt_secret,
algorithms=["HS256", "RS256"], algorithms=["HS256", "RS256"],
) )
auth_user_id = decoded.get("user_id") or decoded.get("sub") or decoded.get("uid") auth_user_id = decoded.get("user_id") or decoded.get("sub") or decoded.get("uid")
auth_wallet_address = decoded.get("wallet_address") auth_wallet_address = decoded.get("wallet_address")
auth_email = decoded.get("email") auth_email = decoded.get("email")
else:
auth_user_id = None
except Exception: except Exception:
auth_user_id = None auth_user_id = None
+14 -3
View File
@@ -25,14 +25,25 @@ def verify_wallet_authentication():
if auth_header.startswith('Bearer '): if auth_header.startswith('Bearer '):
try: try:
token = auth_header.split(' ')[1] token = auth_header.split(' ')[1]
# ✅ FIXED: Add algorithms parameter to fix JWT decode error # ✅ FIXED: Verify JWT signature using JWT_SECRET_KEY
from flask import current_app
jwt_secret = current_app.config.get('JWT_SECRET_KEY') or os.getenv('JWT_SECRET_KEY')
if jwt_secret:
decoded = jwt.decode( decoded = jwt.decode(
token, token,
options={"verify_signature": False}, # For development jwt_secret,
algorithms=["HS256", "RS256"] # This fixes the JWT error algorithms=["HS256", "RS256"]
) )
else:
logger.error("JWT_SECRET_KEY not configured")
decoded = None
if decoded:
user_id = decoded.get('sub') or decoded.get('user_id') or decoded.get('uid') or decoded.get('wallet_address') user_id = decoded.get('sub') or decoded.get('user_id') or decoded.get('uid') or decoded.get('wallet_address')
wallet_address = decoded.get('wallet_address') or user_id wallet_address = decoded.get('wallet_address') or user_id
else:
user_id = None
wallet_address = None
if user_id: if user_id:
logger.info(f"✅ JWT authentication verified: {user_id}") logger.info(f"✅ JWT authentication verified: {user_id}")