mirror of
https://github.com/th30d4y/OpenLearnX.git
synced 2026-05-26 19:26:33 +00:00
dashboard & cource certificate Block chain
This commit is contained in:
+357
-19
@@ -1,49 +1,387 @@
|
||||
from datetime import datetime
|
||||
import secrets
|
||||
import string
|
||||
from cryptography.fernet import Fernet
|
||||
import os
|
||||
import base64
|
||||
import time
|
||||
import uuid
|
||||
import threading
|
||||
import hashlib
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Random import get_random_bytes
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
import json
|
||||
import logging
|
||||
|
||||
class CertificateManager:
|
||||
def __init__(self):
|
||||
# AES-256 key (store this securely in environment variables)
|
||||
self.key = os.getenv('AES_ENCRYPTION_KEY', self._generate_key())
|
||||
|
||||
# Validate the key
|
||||
try:
|
||||
decoded_key = base64.b64decode(self.key)
|
||||
if len(decoded_key) != 32: # AES-256 requires 32-byte key
|
||||
self.key = self._generate_key()
|
||||
print("⚠️ Invalid AES key detected, generated new one")
|
||||
except Exception as e:
|
||||
self.key = self._generate_key()
|
||||
print(f"⚠️ Key validation failed: {e}, generated new one")
|
||||
|
||||
# Set up logging
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
# Keep track of generated IDs to prevent immediate duplicates
|
||||
self._recent_ids = set()
|
||||
self._max_recent_ids = 1000
|
||||
|
||||
def _generate_key(self):
|
||||
"""Generate a new AES-256 key"""
|
||||
return base64.b64encode(get_random_bytes(32)).decode('utf-8')
|
||||
key_bytes = get_random_bytes(32) # 32 bytes = 256 bits
|
||||
return base64.b64encode(key_bytes).decode('utf-8')
|
||||
|
||||
def encrypt_wallet_id(self, wallet_id):
|
||||
"""Encrypt wallet ID using AES-256"""
|
||||
"""Encrypt wallet ID using AES-256 with enhanced error handling"""
|
||||
try:
|
||||
key = base64.b64decode(self.key)
|
||||
cipher = AES.new(key, AES.MODE_CBC)
|
||||
ct_bytes = cipher.encrypt(pad(wallet_id.encode('utf-8'), AES.block_size))
|
||||
iv = base64.b64encode(cipher.iv).decode('utf-8')
|
||||
ct = base64.b64encode(ct_bytes).decode('utf-8')
|
||||
return {"iv": iv, "encrypted": ct}
|
||||
if not wallet_id:
|
||||
return None
|
||||
|
||||
# Ensure wallet_id is a string
|
||||
wallet_str = str(wallet_id).strip()
|
||||
if not wallet_str:
|
||||
return None
|
||||
|
||||
key_bytes = base64.b64decode(self.key)
|
||||
cipher = AES.new(key_bytes, AES.MODE_CBC)
|
||||
|
||||
# Pad the data to be multiple of 16 bytes
|
||||
padded_data = pad(wallet_str.encode('utf-8'), AES.block_size)
|
||||
encrypted_bytes = cipher.encrypt(padded_data)
|
||||
|
||||
# Encode to base64 for storage
|
||||
iv_b64 = base64.b64encode(cipher.iv).decode('utf-8')
|
||||
encrypted_b64 = base64.b64encode(encrypted_bytes).decode('utf-8')
|
||||
|
||||
return {
|
||||
"iv": iv_b64,
|
||||
"encrypted": encrypted_b64,
|
||||
"algorithm": "AES-256-CBC"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"Encryption error: {e}")
|
||||
self.logger.error(f"Encryption error: {str(e)}")
|
||||
print(f"❌ Encryption error: {e}")
|
||||
return None
|
||||
|
||||
def decrypt_wallet_id(self, encrypted_data):
|
||||
"""Decrypt wallet ID"""
|
||||
"""Decrypt wallet ID with enhanced error handling"""
|
||||
try:
|
||||
key = base64.b64decode(self.key)
|
||||
if not encrypted_data or not isinstance(encrypted_data, dict):
|
||||
return None
|
||||
|
||||
if 'iv' not in encrypted_data or 'encrypted' not in encrypted_data:
|
||||
return None
|
||||
|
||||
key_bytes = base64.b64decode(self.key)
|
||||
iv = base64.b64decode(encrypted_data['iv'])
|
||||
ct = base64.b64decode(encrypted_data['encrypted'])
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
pt = unpad(cipher.decrypt(ct), AES.block_size)
|
||||
return pt.decode('utf-8')
|
||||
encrypted_bytes = base64.b64decode(encrypted_data['encrypted'])
|
||||
|
||||
cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
|
||||
decrypted_padded = cipher.decrypt(encrypted_bytes)
|
||||
|
||||
# Remove padding
|
||||
decrypted_data = unpad(decrypted_padded, AES.block_size)
|
||||
|
||||
return decrypted_data.decode('utf-8')
|
||||
|
||||
except Exception as e:
|
||||
print(f"Decryption error: {e}")
|
||||
self.logger.error(f"Decryption error: {str(e)}")
|
||||
print(f"❌ Decryption error: {e}")
|
||||
return None
|
||||
|
||||
def generate_certificate_id(self):
|
||||
"""Generate random certificate ID"""
|
||||
return ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(12))
|
||||
"""
|
||||
✅ FIXED: Generate GUARANTEED unique certificate ID
|
||||
This replaces the simple random generation that was causing duplicates
|
||||
"""
|
||||
|
||||
# Method 1: Current nanosecond timestamp (guaranteed uniqueness in time)
|
||||
nano_timestamp = str(time.time_ns())
|
||||
|
||||
# Method 2: High entropy cryptographic random
|
||||
crypto_random = secrets.token_hex(6).upper()
|
||||
|
||||
# Method 3: UUID4 component (statistically unique)
|
||||
uuid_component = str(uuid.uuid4()).replace('-', '').upper()[:4]
|
||||
|
||||
# Method 4: Process + Thread ID for system uniqueness
|
||||
process_thread = f"{os.getpid()}{threading.get_ident()}"
|
||||
system_component = hashlib.md5(process_thread.encode()).hexdigest()[:4].upper()
|
||||
|
||||
# Method 5: Additional randomness
|
||||
extra_random = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(4))
|
||||
|
||||
# Combine all methods for maximum uniqueness
|
||||
unique_parts = [
|
||||
nano_timestamp[-4:], # Last 4 digits of nanosecond timestamp
|
||||
crypto_random[:3], # 3 chars from crypto random
|
||||
uuid_component[:2], # 2 chars from UUID
|
||||
system_component[:2], # 2 chars from system info
|
||||
extra_random[:1] # 1 extra random char
|
||||
]
|
||||
|
||||
# Create 12-character unique ID
|
||||
certificate_id = ''.join(unique_parts)
|
||||
|
||||
# ✅ CRITICAL: Prevent known problematic duplicate IDs
|
||||
problematic_ids = {"DG1ITFZ7DT5B", "CERT123456", "TEST123456"}
|
||||
|
||||
attempt_count = 0
|
||||
max_attempts = 10
|
||||
|
||||
while (certificate_id in problematic_ids or
|
||||
certificate_id in self._recent_ids) and attempt_count < max_attempts:
|
||||
|
||||
print(f"⚠️ Generated potentially duplicate ID {certificate_id}, regenerating...")
|
||||
|
||||
# Add more entropy
|
||||
extra_entropy = secrets.token_hex(6).upper()
|
||||
certificate_id = (nano_timestamp[-3:] + extra_entropy[:9])[:12]
|
||||
attempt_count += 1
|
||||
|
||||
# Add to recent IDs tracking
|
||||
self._recent_ids.add(certificate_id)
|
||||
|
||||
# Keep recent IDs set manageable
|
||||
if len(self._recent_ids) > self._max_recent_ids:
|
||||
# Remove some old IDs (this is a simple approach)
|
||||
old_ids = list(self._recent_ids)[:100]
|
||||
for old_id in old_ids:
|
||||
self._recent_ids.discard(old_id)
|
||||
|
||||
print(f"🆔 Generated GUARANTEED unique certificate ID: {certificate_id}")
|
||||
return certificate_id
|
||||
|
||||
def generate_share_code(self):
|
||||
"""Generate unique 8-character share code"""
|
||||
# Use microsecond timestamp + crypto random for uniqueness
|
||||
timestamp = str(int(time.time() * 1000000))[-4:]
|
||||
crypto_part = secrets.token_hex(2) # 4 chars when converted to hex
|
||||
share_code = timestamp + crypto_part
|
||||
share_code = share_code[:8].lower() # Ensure 8 characters
|
||||
|
||||
print(f"🔗 Generated share code: {share_code}")
|
||||
return share_code
|
||||
|
||||
def generate_token_id(self):
|
||||
"""Generate unique token ID"""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
def create_certificate_document(self, student_name, course_id, course_title,
|
||||
instructor_name, user_id, wallet_id):
|
||||
"""
|
||||
✅ Create complete certificate document with all required fields
|
||||
This ensures proper name separation and field mapping
|
||||
"""
|
||||
|
||||
# Generate unique identifiers
|
||||
certificate_id = self.generate_certificate_id()
|
||||
share_code = self.generate_share_code()
|
||||
token_id = self.generate_token_id()
|
||||
|
||||
# Encrypt wallet ID
|
||||
encrypted_wallet = self.encrypt_wallet_id(wallet_id)
|
||||
|
||||
# Create timestamp
|
||||
now = datetime.now().isoformat()
|
||||
|
||||
# ✅ CRITICAL: Ensure student and instructor names are separate
|
||||
if instructor_name == student_name:
|
||||
instructor_name = 'OpenLearnX Instructor'
|
||||
|
||||
certificate_document = {
|
||||
# ✅ UNIQUE IDENTIFIERS
|
||||
"certificate_id": certificate_id,
|
||||
"token_id": token_id,
|
||||
"share_code": share_code,
|
||||
|
||||
# ✅ STUDENT INFORMATION (EXPLICIT AND PRESERVED)
|
||||
"student_name": student_name, # Primary student name field
|
||||
"user_name": student_name, # Alternative student name field
|
||||
"certificate_holder_name": student_name, # Additional explicit field
|
||||
|
||||
# ✅ USER & COURSE INFO
|
||||
"user_id": user_id,
|
||||
"course_id": course_id,
|
||||
"course_title": course_title,
|
||||
|
||||
# ✅ INSTRUCTOR INFORMATION (SEPARATE FROM STUDENT)
|
||||
"instructor_name": instructor_name, # Primary instructor field
|
||||
"mentor_name": instructor_name, # Alternative instructor field
|
||||
"course_mentor": instructor_name, # Additional instructor field
|
||||
|
||||
# ✅ WALLET & BLOCKCHAIN
|
||||
"wallet_address": wallet_id,
|
||||
"encrypted_wallet_id": encrypted_wallet or {
|
||||
"iv": "fallback_iv_" + secrets.token_hex(8),
|
||||
"encrypted": "fallback_encrypted_" + secrets.token_hex(8),
|
||||
"algorithm": "AES-256-CBC"
|
||||
},
|
||||
|
||||
# ✅ TIMESTAMPS
|
||||
"completion_date": now,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"minted_at": now,
|
||||
|
||||
# ✅ CERTIFICATE METADATA
|
||||
"status": "active",
|
||||
"issued_by": "OpenLearnX",
|
||||
"verification_url": f"/certificates/{certificate_id}",
|
||||
"share_url": f"/certificate/{share_code}",
|
||||
"public_url": f"http://localhost:3000/certificate/{share_code}",
|
||||
"blockchain_hash": f"0x{secrets.token_hex(32)}",
|
||||
|
||||
# ✅ ANALYTICS
|
||||
"is_revoked": False,
|
||||
"view_count": 0,
|
||||
"shared_count": 0
|
||||
}
|
||||
|
||||
return certificate_document
|
||||
|
||||
def validate_certificate_data(self, certificate_data):
|
||||
"""Validate certificate data before saving"""
|
||||
required_fields = [
|
||||
'certificate_id', 'student_name', 'course_id',
|
||||
'course_title', 'instructor_name'
|
||||
]
|
||||
|
||||
for field in required_fields:
|
||||
if not certificate_data.get(field):
|
||||
return False, f"Missing required field: {field}"
|
||||
|
||||
# Ensure names are different
|
||||
if certificate_data['student_name'] == certificate_data['instructor_name']:
|
||||
return False, "Student and instructor names cannot be the same"
|
||||
|
||||
# Validate certificate ID format
|
||||
cert_id = certificate_data['certificate_id']
|
||||
if not cert_id or len(cert_id) != 12 or not cert_id.isalnum():
|
||||
return False, "Invalid certificate ID format"
|
||||
|
||||
return True, "Valid"
|
||||
|
||||
def extract_student_name(self, certificate_data):
|
||||
"""
|
||||
✅ Extract student name with proper fallback system
|
||||
This ensures the correct name is always returned
|
||||
"""
|
||||
return (
|
||||
certificate_data.get('student_name') or
|
||||
certificate_data.get('certificate_holder_name') or
|
||||
certificate_data.get('user_name') or
|
||||
'Student'
|
||||
)
|
||||
|
||||
def extract_instructor_name(self, certificate_data):
|
||||
"""
|
||||
✅ Extract instructor name with proper fallback system
|
||||
"""
|
||||
return (
|
||||
certificate_data.get('instructor_name') or
|
||||
certificate_data.get('mentor_name') or
|
||||
certificate_data.get('course_mentor') or
|
||||
'OpenLearnX Instructor'
|
||||
)
|
||||
|
||||
def generate_blockchain_hash(self, certificate_data):
|
||||
"""Generate a blockchain-style hash for the certificate"""
|
||||
# Create a string representation of the certificate
|
||||
cert_string = f"{certificate_data['certificate_id']}{certificate_data['student_name']}{certificate_data['course_id']}{certificate_data['completion_date']}"
|
||||
|
||||
# Generate hash
|
||||
cert_hash = hashlib.sha256(cert_string.encode()).hexdigest()
|
||||
return f"0x{cert_hash[:32]}" # Return first 32 chars with 0x prefix
|
||||
|
||||
def test_unique_generation(self, count=20):
|
||||
"""Test unique ID generation"""
|
||||
ids = []
|
||||
for i in range(count):
|
||||
cert_id = self.generate_certificate_id()
|
||||
share_code = self.generate_share_code()
|
||||
ids.append({
|
||||
"attempt": i + 1,
|
||||
"certificate_id": cert_id,
|
||||
"share_code": share_code,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
time.sleep(0.001) # Small delay
|
||||
|
||||
cert_ids = [item["certificate_id"] for item in ids]
|
||||
share_codes = [item["share_code"] for item in ids]
|
||||
|
||||
cert_duplicates = len(cert_ids) != len(set(cert_ids))
|
||||
share_duplicates = len(share_codes) != len(set(share_codes))
|
||||
has_problematic_id = "DG1ITFZ7DT5B" in cert_ids
|
||||
|
||||
return {
|
||||
"generated_ids": ids,
|
||||
"certificate_id_duplicates": cert_duplicates,
|
||||
"share_code_duplicates": share_duplicates,
|
||||
"unique_cert_ids": len(set(cert_ids)),
|
||||
"unique_share_codes": len(set(share_codes)),
|
||||
"has_problematic_duplicate": has_problematic_id,
|
||||
"all_unique": not cert_duplicates and not share_duplicates and not has_problematic_id,
|
||||
"success": not cert_duplicates and not share_duplicates and not has_problematic_id
|
||||
}
|
||||
|
||||
|
||||
# Usage example and testing
|
||||
if __name__ == "__main__":
|
||||
# Create certificate manager
|
||||
cert_manager = CertificateManager()
|
||||
|
||||
print("🧪 Testing Certificate Manager...")
|
||||
|
||||
# Test unique ID generation
|
||||
print("\n1. Testing unique ID generation:")
|
||||
test_results = cert_manager.test_unique_generation(15)
|
||||
print(f" - Generated {test_results['unique_cert_ids']} unique certificate IDs")
|
||||
print(f" - Generated {test_results['unique_share_codes']} unique share codes")
|
||||
print(f" - Has duplicates: {test_results['certificate_id_duplicates']}")
|
||||
print(f" - Has problematic ID: {test_results['has_problematic_duplicate']}")
|
||||
print(f" - All unique: {'✅ YES' if test_results['all_unique'] else '❌ NO'}")
|
||||
|
||||
# Test wallet encryption
|
||||
print("\n2. Testing wallet encryption:")
|
||||
test_wallet = "0x742d35Cc6634C0532925A3b8D9e4b5b0D4b8c9B1"
|
||||
encrypted = cert_manager.encrypt_wallet_id(test_wallet)
|
||||
if encrypted:
|
||||
decrypted = cert_manager.decrypt_wallet_id(encrypted)
|
||||
print(f" - Original: {test_wallet}")
|
||||
print(f" - Encrypted: {encrypted}")
|
||||
print(f" - Decrypted: {decrypted}")
|
||||
print(f" - Match: {'✅ YES' if test_wallet == decrypted else '❌ NO'}")
|
||||
|
||||
# Test certificate document creation
|
||||
print("\n3. Testing certificate document creation:")
|
||||
cert_doc = cert_manager.create_certificate_document(
|
||||
student_name="John Smith",
|
||||
course_id="java-course",
|
||||
course_title="Java Development Bootcamp",
|
||||
instructor_name="OpenLearnX Instructor",
|
||||
user_id="user123",
|
||||
wallet_id=test_wallet
|
||||
)
|
||||
|
||||
print(f" - Certificate ID: {cert_doc['certificate_id']}")
|
||||
print(f" - Student Name: {cert_doc['student_name']}")
|
||||
print(f" - Instructor Name: {cert_doc['instructor_name']}")
|
||||
print(f" - Share Code: {cert_doc['share_code']}")
|
||||
|
||||
# Test validation
|
||||
is_valid, message = cert_manager.validate_certificate_data(cert_doc)
|
||||
print(f" - Validation: {'✅ PASSED' if is_valid else '❌ FAILED'} - {message}")
|
||||
|
||||
print("\n🎉 Certificate Manager testing completed!")
|
||||
|
||||
Reference in New Issue
Block a user