mirror of
https://github.com/th30d4y/OpenLearnX.git
synced 2026-05-26 11:25:49 +00:00
347 lines
15 KiB
Python
347 lines
15 KiB
Python
import tensorflow as tf
|
|
import pickle
|
|
import json
|
|
import numpy as np
|
|
import random
|
|
import os
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from datetime import datetime
|
|
from bson import ObjectId
|
|
|
|
class AdaptiveQuizMasterLLM:
|
|
def __init__(self, models_path="./models/"):
|
|
"""
|
|
Intelligent Quiz Master with optional model loading
|
|
"""
|
|
self.models_path = models_path
|
|
self.model_available = False
|
|
|
|
# Try to load model components
|
|
try:
|
|
# Check if model files exist
|
|
model_file = f'{models_path}improved_cnn_model.h5'
|
|
tokenizer_file = f'{models_path}tokenizer.pickle'
|
|
label_encoder_file = f'{models_path}label_encoder.pickle'
|
|
data_file = f'{models_path}processed_commonsenseqa_data.json'
|
|
|
|
if all(os.path.exists(f) for f in [model_file, tokenizer_file, label_encoder_file, data_file]):
|
|
# Load model with compatibility handling
|
|
try:
|
|
self.model = tf.keras.models.load_model(model_file)
|
|
self.model_available = True
|
|
print("✅ CNN Model loaded successfully")
|
|
except Exception as model_error:
|
|
print(f"⚠️ Model loading failed: {model_error}")
|
|
print("🔄 Continuing without AI predictions...")
|
|
self.model = None
|
|
self.model_available = False
|
|
|
|
# Load other components
|
|
with open(tokenizer_file, 'rb') as f:
|
|
self.tokenizer = pickle.load(f)
|
|
|
|
with open(label_encoder_file, 'rb') as f:
|
|
self.label_encoder = pickle.load(f)
|
|
|
|
with open(data_file, 'r') as f:
|
|
self.quiz_data = json.load(f)
|
|
|
|
else:
|
|
print("⚠️ Model files not found. Using fallback quiz data...")
|
|
self.model = None
|
|
self.tokenizer = None
|
|
self.label_encoder = None
|
|
self.quiz_data = self._get_fallback_questions()
|
|
self.model_available = False
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Model initialization failed: {e}")
|
|
print("🔄 Using fallback mode...")
|
|
self.model = None
|
|
self.tokenizer = None
|
|
self.label_encoder = None
|
|
self.quiz_data = self._get_fallback_questions()
|
|
self.model_available = False
|
|
|
|
# Separate questions by difficulty
|
|
self.questions_by_difficulty = {
|
|
'easy': [q for q in self.quiz_data if q.get('difficulty') == 'easy'],
|
|
'medium': [q for q in self.quiz_data if q.get('difficulty') == 'medium'],
|
|
'hard': [q for q in self.quiz_data if q.get('difficulty') == 'hard']
|
|
}
|
|
|
|
print("🤖 Adaptive Quiz Master LLM initialized!")
|
|
print(f"📊 Model Available: {self.model_available}")
|
|
print(f"📊 Questions: Easy({len(self.questions_by_difficulty['easy'])}), Medium({len(self.questions_by_difficulty['medium'])}), Hard({len(self.questions_by_difficulty['hard'])})")
|
|
|
|
def _get_fallback_questions(self):
|
|
"""
|
|
Fallback questions when model files are not available
|
|
"""
|
|
return [
|
|
{
|
|
"question": "What is the capital of France?",
|
|
"incorrect_answers": ["London", "Berlin", "Madrid"],
|
|
"correct_answer": "Paris",
|
|
"difficulty": "easy"
|
|
},
|
|
{
|
|
"question": "Which programming language is known for its simplicity and readability?",
|
|
"incorrect_answers": ["C++", "Assembly", "Java"],
|
|
"correct_answer": "Python",
|
|
"difficulty": "easy"
|
|
},
|
|
{
|
|
"question": "What does API stand for?",
|
|
"incorrect_answers": ["Advanced Programming Interface", "Automated Program Integration", "Applied Programming Instructions"],
|
|
"correct_answer": "Application Programming Interface",
|
|
"difficulty": "medium"
|
|
},
|
|
{
|
|
"question": "In machine learning, what does 'overfitting' mean?",
|
|
"incorrect_answers": ["Model performs well on all data", "Model is too simple", "Model trains too quickly"],
|
|
"correct_answer": "Model memorizes training data but fails on new data",
|
|
"difficulty": "medium"
|
|
},
|
|
{
|
|
"question": "What is the time complexity of binary search?",
|
|
"incorrect_answers": ["O(n)", "O(n²)", "O(n log n)"],
|
|
"correct_answer": "O(log n)",
|
|
"difficulty": "hard"
|
|
},
|
|
{
|
|
"question": "Which design pattern ensures a class has only one instance?",
|
|
"incorrect_answers": ["Factory", "Observer", "Strategy"],
|
|
"correct_answer": "Singleton",
|
|
"difficulty": "hard"
|
|
}
|
|
]
|
|
|
|
def create_session(self, user_id):
|
|
"""
|
|
Create new adaptive quiz session
|
|
"""
|
|
session_id = str(ObjectId())
|
|
session_data = {
|
|
'session_id': session_id,
|
|
'user_id': user_id,
|
|
'current_difficulty': 'easy', # Always start with easy
|
|
'consecutive_correct': {'easy': 0, 'medium': 0, 'hard': 0},
|
|
'total_questions': 0,
|
|
'total_correct': 0,
|
|
'question_history': [],
|
|
'created_at': datetime.utcnow(),
|
|
'status': 'active'
|
|
}
|
|
return session_data
|
|
|
|
def get_adaptive_question(self, session_data):
|
|
"""
|
|
Get next question based on current difficulty level
|
|
"""
|
|
current_difficulty = session_data['current_difficulty']
|
|
available_questions = self.questions_by_difficulty[current_difficulty]
|
|
|
|
# Avoid repeating questions
|
|
asked_questions = [q['question_id'] for q in session_data.get('question_history', [])]
|
|
available_questions = [q for q in available_questions
|
|
if q.get('id', str(hash(q['question']))) not in asked_questions]
|
|
|
|
if not available_questions:
|
|
# Fallback to any difficulty if current level exhausted
|
|
all_available = [q for q in self.quiz_data
|
|
if q.get('id', str(hash(q['question']))) not in asked_questions]
|
|
available_questions = all_available[:10] if all_available else self.quiz_data[:5]
|
|
|
|
# Select random question
|
|
question_data = random.choice(available_questions)
|
|
|
|
# Create formatted question with shuffled choices
|
|
choices = question_data['incorrect_answers'] + [question_data['correct_answer']]
|
|
random.shuffle(choices)
|
|
|
|
# Find correct answer position
|
|
correct_position = choices.index(question_data['correct_answer'])
|
|
correct_letter = chr(65 + correct_position)
|
|
|
|
question_obj = {
|
|
'question_id': question_data.get('id', str(hash(question_data['question']))),
|
|
'question_text': question_data['question'],
|
|
'choices': {
|
|
'A': choices[0],
|
|
'B': choices[1],
|
|
'C': choices[2],
|
|
'D': choices[3]
|
|
},
|
|
'correct_answer': correct_letter,
|
|
'difficulty': current_difficulty,
|
|
'explanation': f"The correct answer is {question_data['correct_answer']}."
|
|
}
|
|
|
|
return question_obj
|
|
|
|
def get_llm_prediction(self, question_text, choices):
|
|
"""
|
|
Use trained model to predict answer (with fallback)
|
|
"""
|
|
if not self.model_available or not self.model:
|
|
# Fallback: Random prediction with low confidence
|
|
import random
|
|
fallback_prediction = random.choice(['A', 'B', 'C', 'D'])
|
|
return {
|
|
'llm_prediction': fallback_prediction,
|
|
'confidence': 0.25, # Random confidence
|
|
'model_accuracy': 25.0, # Random accuracy
|
|
'fallback_mode': True
|
|
}
|
|
|
|
try:
|
|
# Format question for model prediction
|
|
formatted_question = f"Difficulty: medium\nQuestion: {question_text}\n"
|
|
formatted_question += f"A) {choices['A']}\n"
|
|
formatted_question += f"B) {choices['B']}\n"
|
|
formatted_question += f"C) {choices['C']}\n"
|
|
formatted_question += f"D) {choices['D']}\n"
|
|
|
|
# Tokenize and predict using your trained model
|
|
sequence = self.tokenizer.texts_to_sequences([formatted_question])
|
|
padded = pad_sequences(sequence, maxlen=400, padding='post')
|
|
|
|
prediction = self.model.predict(padded, verbose=0)
|
|
predicted_class = np.argmax(prediction[0])
|
|
predicted_letter = self.label_encoder.inverse_transform([predicted_class])[0]
|
|
confidence = float(prediction[0][predicted_class])
|
|
|
|
return {
|
|
'llm_prediction': predicted_letter,
|
|
'confidence': confidence,
|
|
'model_accuracy': 33.1, # Your model's test accuracy
|
|
'fallback_mode': False
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Prediction error: {e}")
|
|
# Fallback on error
|
|
import random
|
|
fallback_prediction = random.choice(['A', 'B', 'C', 'D'])
|
|
return {
|
|
'llm_prediction': fallback_prediction,
|
|
'confidence': 0.25,
|
|
'model_accuracy': 25.0,
|
|
'fallback_mode': True,
|
|
'error': str(e)
|
|
}
|
|
|
|
def evaluate_answer(self, session_data, question_data, user_answer):
|
|
"""
|
|
Evaluate user answer and adjust difficulty according to your rules
|
|
"""
|
|
is_correct = (user_answer.upper() == question_data['correct_answer'])
|
|
current_difficulty = session_data['current_difficulty']
|
|
|
|
# Update session stats
|
|
session_data['total_questions'] += 1
|
|
if is_correct:
|
|
session_data['total_correct'] += 1
|
|
session_data['consecutive_correct'][current_difficulty] += 1
|
|
else:
|
|
# Reset consecutive count for current difficulty
|
|
session_data['consecutive_correct'][current_difficulty] = 0
|
|
|
|
# Apply your exact difficulty adjustment rules
|
|
new_difficulty = self._adjust_difficulty(session_data, is_correct)
|
|
|
|
# Record question in history
|
|
question_record = {
|
|
'question_id': question_data['question_id'],
|
|
'question_text': question_data['question_text'],
|
|
'user_answer': user_answer,
|
|
'correct_answer': question_data['correct_answer'],
|
|
'is_correct': is_correct,
|
|
'difficulty': current_difficulty,
|
|
'timestamp': datetime.utcnow()
|
|
}
|
|
session_data['question_history'].append(question_record)
|
|
|
|
# Get LLM prediction for comparison
|
|
llm_result = self.get_llm_prediction(question_data['question_text'], question_data['choices'])
|
|
|
|
result = {
|
|
'is_correct': is_correct,
|
|
'correct_answer': question_data['correct_answer'],
|
|
'explanation': question_data['explanation'],
|
|
'difficulty_changed': new_difficulty != current_difficulty,
|
|
'previous_difficulty': current_difficulty,
|
|
'new_difficulty': new_difficulty,
|
|
'consecutive_correct': session_data['consecutive_correct'][current_difficulty],
|
|
'llm_prediction': llm_result,
|
|
'session_stats': {
|
|
'total_questions': session_data['total_questions'],
|
|
'total_correct': session_data['total_correct'],
|
|
'accuracy': round((session_data['total_correct'] / session_data['total_questions']) * 100, 1)
|
|
}
|
|
}
|
|
|
|
session_data['current_difficulty'] = new_difficulty
|
|
return result
|
|
|
|
def _adjust_difficulty(self, session_data, is_correct):
|
|
"""
|
|
Your exact difficulty adjustment rules:
|
|
- 3 consecutive correct: Easy→Medium→Hard
|
|
- 1 incorrect: Hard→Medium→Easy (stay on Easy if already there)
|
|
"""
|
|
current_difficulty = session_data['current_difficulty']
|
|
consecutive = session_data['consecutive_correct']
|
|
|
|
if is_correct:
|
|
# Move up after 3 consecutive correct answers
|
|
if consecutive[current_difficulty] >= 3:
|
|
if current_difficulty == 'easy':
|
|
# Reset consecutive count for easy, start fresh for medium
|
|
session_data['consecutive_correct']['easy'] = 0
|
|
return 'medium'
|
|
elif current_difficulty == 'medium':
|
|
# Reset consecutive count for medium, start fresh for hard
|
|
session_data['consecutive_correct']['medium'] = 0
|
|
return 'hard'
|
|
# If already hard, stay hard
|
|
else:
|
|
# Move down immediately after 1 wrong answer
|
|
if current_difficulty == 'hard':
|
|
return 'medium'
|
|
elif current_difficulty == 'medium':
|
|
return 'easy'
|
|
# If already easy, stay easy
|
|
|
|
return current_difficulty
|
|
|
|
def get_session_stats(self, session_data):
|
|
"""
|
|
Get comprehensive session statistics
|
|
"""
|
|
total_questions = session_data['total_questions']
|
|
total_correct = session_data['total_correct']
|
|
accuracy = (total_correct / total_questions * 100) if total_questions > 0 else 0
|
|
|
|
difficulty_stats = {}
|
|
for difficulty in ['easy', 'medium', 'hard']:
|
|
questions_at_level = [q for q in session_data['question_history'] if q['difficulty'] == difficulty]
|
|
correct_at_level = sum(1 for q in questions_at_level if q['is_correct'])
|
|
difficulty_stats[difficulty] = {
|
|
'questions': len(questions_at_level),
|
|
'correct': correct_at_level,
|
|
'accuracy': round((correct_at_level / len(questions_at_level) * 100), 1) if questions_at_level else 0
|
|
}
|
|
|
|
return {
|
|
'session_id': session_data['session_id'],
|
|
'current_difficulty': session_data['current_difficulty'],
|
|
'total_questions': total_questions,
|
|
'total_correct': total_correct,
|
|
'overall_accuracy': round(accuracy, 1),
|
|
'consecutive_correct': session_data['consecutive_correct'],
|
|
'difficulty_breakdown': difficulty_stats,
|
|
'status': session_data['status']
|
|
}
|