Files
OpenLearnX/backend/services/adaptive_quiz_service.py
2025-07-28 00:15:37 +05:30

347 lines
15 KiB
Python

import tensorflow as tf
import pickle
import json
import numpy as np
import random
import os
from tensorflow.keras.preprocessing.sequence import pad_sequences
from datetime import datetime
from bson import ObjectId
class AdaptiveQuizMasterLLM:
def __init__(self, models_path="./models/"):
"""
Intelligent Quiz Master with optional model loading
"""
self.models_path = models_path
self.model_available = False
# Try to load model components
try:
# Check if model files exist
model_file = f'{models_path}improved_cnn_model.h5'
tokenizer_file = f'{models_path}tokenizer.pickle'
label_encoder_file = f'{models_path}label_encoder.pickle'
data_file = f'{models_path}processed_commonsenseqa_data.json'
if all(os.path.exists(f) for f in [model_file, tokenizer_file, label_encoder_file, data_file]):
# Load model with compatibility handling
try:
self.model = tf.keras.models.load_model(model_file)
self.model_available = True
print("✅ CNN Model loaded successfully")
except Exception as model_error:
print(f"⚠️ Model loading failed: {model_error}")
print("🔄 Continuing without AI predictions...")
self.model = None
self.model_available = False
# Load other components
with open(tokenizer_file, 'rb') as f:
self.tokenizer = pickle.load(f)
with open(label_encoder_file, 'rb') as f:
self.label_encoder = pickle.load(f)
with open(data_file, 'r') as f:
self.quiz_data = json.load(f)
else:
print("⚠️ Model files not found. Using fallback quiz data...")
self.model = None
self.tokenizer = None
self.label_encoder = None
self.quiz_data = self._get_fallback_questions()
self.model_available = False
except Exception as e:
print(f"⚠️ Model initialization failed: {e}")
print("🔄 Using fallback mode...")
self.model = None
self.tokenizer = None
self.label_encoder = None
self.quiz_data = self._get_fallback_questions()
self.model_available = False
# Separate questions by difficulty
self.questions_by_difficulty = {
'easy': [q for q in self.quiz_data if q.get('difficulty') == 'easy'],
'medium': [q for q in self.quiz_data if q.get('difficulty') == 'medium'],
'hard': [q for q in self.quiz_data if q.get('difficulty') == 'hard']
}
print("🤖 Adaptive Quiz Master LLM initialized!")
print(f"📊 Model Available: {self.model_available}")
print(f"📊 Questions: Easy({len(self.questions_by_difficulty['easy'])}), Medium({len(self.questions_by_difficulty['medium'])}), Hard({len(self.questions_by_difficulty['hard'])})")
def _get_fallback_questions(self):
"""
Fallback questions when model files are not available
"""
return [
{
"question": "What is the capital of France?",
"incorrect_answers": ["London", "Berlin", "Madrid"],
"correct_answer": "Paris",
"difficulty": "easy"
},
{
"question": "Which programming language is known for its simplicity and readability?",
"incorrect_answers": ["C++", "Assembly", "Java"],
"correct_answer": "Python",
"difficulty": "easy"
},
{
"question": "What does API stand for?",
"incorrect_answers": ["Advanced Programming Interface", "Automated Program Integration", "Applied Programming Instructions"],
"correct_answer": "Application Programming Interface",
"difficulty": "medium"
},
{
"question": "In machine learning, what does 'overfitting' mean?",
"incorrect_answers": ["Model performs well on all data", "Model is too simple", "Model trains too quickly"],
"correct_answer": "Model memorizes training data but fails on new data",
"difficulty": "medium"
},
{
"question": "What is the time complexity of binary search?",
"incorrect_answers": ["O(n)", "O(n²)", "O(n log n)"],
"correct_answer": "O(log n)",
"difficulty": "hard"
},
{
"question": "Which design pattern ensures a class has only one instance?",
"incorrect_answers": ["Factory", "Observer", "Strategy"],
"correct_answer": "Singleton",
"difficulty": "hard"
}
]
def create_session(self, user_id):
"""
Create new adaptive quiz session
"""
session_id = str(ObjectId())
session_data = {
'session_id': session_id,
'user_id': user_id,
'current_difficulty': 'easy', # Always start with easy
'consecutive_correct': {'easy': 0, 'medium': 0, 'hard': 0},
'total_questions': 0,
'total_correct': 0,
'question_history': [],
'created_at': datetime.utcnow(),
'status': 'active'
}
return session_data
def get_adaptive_question(self, session_data):
"""
Get next question based on current difficulty level
"""
current_difficulty = session_data['current_difficulty']
available_questions = self.questions_by_difficulty[current_difficulty]
# Avoid repeating questions
asked_questions = [q['question_id'] for q in session_data.get('question_history', [])]
available_questions = [q for q in available_questions
if q.get('id', str(hash(q['question']))) not in asked_questions]
if not available_questions:
# Fallback to any difficulty if current level exhausted
all_available = [q for q in self.quiz_data
if q.get('id', str(hash(q['question']))) not in asked_questions]
available_questions = all_available[:10] if all_available else self.quiz_data[:5]
# Select random question
question_data = random.choice(available_questions)
# Create formatted question with shuffled choices
choices = question_data['incorrect_answers'] + [question_data['correct_answer']]
random.shuffle(choices)
# Find correct answer position
correct_position = choices.index(question_data['correct_answer'])
correct_letter = chr(65 + correct_position)
question_obj = {
'question_id': question_data.get('id', str(hash(question_data['question']))),
'question_text': question_data['question'],
'choices': {
'A': choices[0],
'B': choices[1],
'C': choices[2],
'D': choices[3]
},
'correct_answer': correct_letter,
'difficulty': current_difficulty,
'explanation': f"The correct answer is {question_data['correct_answer']}."
}
return question_obj
def get_llm_prediction(self, question_text, choices):
"""
Use trained model to predict answer (with fallback)
"""
if not self.model_available or not self.model:
# Fallback: Random prediction with low confidence
import random
fallback_prediction = random.choice(['A', 'B', 'C', 'D'])
return {
'llm_prediction': fallback_prediction,
'confidence': 0.25, # Random confidence
'model_accuracy': 25.0, # Random accuracy
'fallback_mode': True
}
try:
# Format question for model prediction
formatted_question = f"Difficulty: medium\nQuestion: {question_text}\n"
formatted_question += f"A) {choices['A']}\n"
formatted_question += f"B) {choices['B']}\n"
formatted_question += f"C) {choices['C']}\n"
formatted_question += f"D) {choices['D']}\n"
# Tokenize and predict using your trained model
sequence = self.tokenizer.texts_to_sequences([formatted_question])
padded = pad_sequences(sequence, maxlen=400, padding='post')
prediction = self.model.predict(padded, verbose=0)
predicted_class = np.argmax(prediction[0])
predicted_letter = self.label_encoder.inverse_transform([predicted_class])[0]
confidence = float(prediction[0][predicted_class])
return {
'llm_prediction': predicted_letter,
'confidence': confidence,
'model_accuracy': 33.1, # Your model's test accuracy
'fallback_mode': False
}
except Exception as e:
print(f"⚠️ Prediction error: {e}")
# Fallback on error
import random
fallback_prediction = random.choice(['A', 'B', 'C', 'D'])
return {
'llm_prediction': fallback_prediction,
'confidence': 0.25,
'model_accuracy': 25.0,
'fallback_mode': True,
'error': str(e)
}
def evaluate_answer(self, session_data, question_data, user_answer):
"""
Evaluate user answer and adjust difficulty according to your rules
"""
is_correct = (user_answer.upper() == question_data['correct_answer'])
current_difficulty = session_data['current_difficulty']
# Update session stats
session_data['total_questions'] += 1
if is_correct:
session_data['total_correct'] += 1
session_data['consecutive_correct'][current_difficulty] += 1
else:
# Reset consecutive count for current difficulty
session_data['consecutive_correct'][current_difficulty] = 0
# Apply your exact difficulty adjustment rules
new_difficulty = self._adjust_difficulty(session_data, is_correct)
# Record question in history
question_record = {
'question_id': question_data['question_id'],
'question_text': question_data['question_text'],
'user_answer': user_answer,
'correct_answer': question_data['correct_answer'],
'is_correct': is_correct,
'difficulty': current_difficulty,
'timestamp': datetime.utcnow()
}
session_data['question_history'].append(question_record)
# Get LLM prediction for comparison
llm_result = self.get_llm_prediction(question_data['question_text'], question_data['choices'])
result = {
'is_correct': is_correct,
'correct_answer': question_data['correct_answer'],
'explanation': question_data['explanation'],
'difficulty_changed': new_difficulty != current_difficulty,
'previous_difficulty': current_difficulty,
'new_difficulty': new_difficulty,
'consecutive_correct': session_data['consecutive_correct'][current_difficulty],
'llm_prediction': llm_result,
'session_stats': {
'total_questions': session_data['total_questions'],
'total_correct': session_data['total_correct'],
'accuracy': round((session_data['total_correct'] / session_data['total_questions']) * 100, 1)
}
}
session_data['current_difficulty'] = new_difficulty
return result
def _adjust_difficulty(self, session_data, is_correct):
"""
Your exact difficulty adjustment rules:
- 3 consecutive correct: Easy→Medium→Hard
- 1 incorrect: Hard→Medium→Easy (stay on Easy if already there)
"""
current_difficulty = session_data['current_difficulty']
consecutive = session_data['consecutive_correct']
if is_correct:
# Move up after 3 consecutive correct answers
if consecutive[current_difficulty] >= 3:
if current_difficulty == 'easy':
# Reset consecutive count for easy, start fresh for medium
session_data['consecutive_correct']['easy'] = 0
return 'medium'
elif current_difficulty == 'medium':
# Reset consecutive count for medium, start fresh for hard
session_data['consecutive_correct']['medium'] = 0
return 'hard'
# If already hard, stay hard
else:
# Move down immediately after 1 wrong answer
if current_difficulty == 'hard':
return 'medium'
elif current_difficulty == 'medium':
return 'easy'
# If already easy, stay easy
return current_difficulty
def get_session_stats(self, session_data):
"""
Get comprehensive session statistics
"""
total_questions = session_data['total_questions']
total_correct = session_data['total_correct']
accuracy = (total_correct / total_questions * 100) if total_questions > 0 else 0
difficulty_stats = {}
for difficulty in ['easy', 'medium', 'hard']:
questions_at_level = [q for q in session_data['question_history'] if q['difficulty'] == difficulty]
correct_at_level = sum(1 for q in questions_at_level if q['is_correct'])
difficulty_stats[difficulty] = {
'questions': len(questions_at_level),
'correct': correct_at_level,
'accuracy': round((correct_at_level / len(questions_at_level) * 100), 1) if questions_at_level else 0
}
return {
'session_id': session_data['session_id'],
'current_difficulty': session_data['current_difficulty'],
'total_questions': total_questions,
'total_correct': total_correct,
'overall_accuracy': round(accuracy, 1),
'consecutive_correct': session_data['consecutive_correct'],
'difficulty_breakdown': difficulty_stats,
'status': session_data['status']
}