319 lines
13 KiB
Python
319 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Скрипт для генерации тестов пересказов из MongoDB.
|
||
|
||
Извлекает текст статьи из коллекции rssNotification (поле .meta.topicContent)
|
||
и генерирует тестовые данные в формате TXT для бенчмарка AI.
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Dict, Optional
|
||
|
||
import pymongo
|
||
from pymongo import MongoClient
|
||
|
||
# Добавляем путь к исходникам, чтобы импортировать константы
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from src.constants import TEST_SEPARATOR
|
||
from src.models.ollama_client import OllamaClient
|
||
|
||
def sanitize_filename(filename: str) -> str:
|
||
"""
|
||
Очищает строку от недопустимых символов для имени файла.
|
||
|
||
Args:
|
||
filename: Исходное имя файла
|
||
|
||
Returns:
|
||
Очищенное имя файла или пустая строка, если очистка невозможна
|
||
"""
|
||
import re
|
||
# Заменяем недопустимые символы на подчеркивание
|
||
# Допустимые символы: буквы, цифры, подчеркивание, тире, точка
|
||
sanitized = re.sub(r'[^\w\-\.]', '_', filename)
|
||
return sanitized if sanitized else filename
|
||
|
||
def connect_to_mongo() -> MongoClient:
|
||
"""Подключается к MongoDB кластеру."""
|
||
client = MongoClient(
|
||
"mongodb://10.0.0.3:27017,10.0.0.4:27017,10.0.0.5:27017/",
|
||
connectTimeoutMS=30000,
|
||
socketTimeoutMS=30000,
|
||
serverSelectionTimeoutMS=30000,
|
||
retryWrites=True,
|
||
retryReads=True
|
||
)
|
||
return client
|
||
|
||
def extract_text_from_topic_content(topic_content: Dict) -> Optional[str]:
|
||
"""
|
||
Извлекает текст статьи из .meta.topicContent.
|
||
|
||
Args:
|
||
topic_content: Содержимое поля .meta.topicContent из MongoDB
|
||
|
||
Returns:
|
||
Текст статьи или None, если не удалось извлечь
|
||
"""
|
||
if not topic_content:
|
||
return None
|
||
|
||
# Преобразуем в строку, если это не строка
|
||
content_str = str(topic_content)
|
||
|
||
return content_str
|
||
|
||
def generate_test_from_mongo_record(record_id: str, verbose: bool = True) -> bool:
|
||
"""
|
||
Генерирует тест пересказа из записи MongoDB.
|
||
|
||
Args:
|
||
record_id: ID записи в MongoDB
|
||
verbose: Выводить подробную отладочную информацию (по умолчанию: True)
|
||
|
||
Returns:
|
||
True, если тест успешно generated, False в случае ошибки
|
||
"""
|
||
try:
|
||
client = connect_to_mongo()
|
||
db = client['tracker_conbot']
|
||
collection = db['rssNotification']
|
||
|
||
# Извлекаем запись по ID
|
||
record = collection.find_one({"_id": record_id})
|
||
if not record:
|
||
if verbose:
|
||
print(f"❌ Запись с ID {record_id} не найдена в коллекции")
|
||
return False
|
||
|
||
# Отладочная информация (только при verbose=True)
|
||
if verbose:
|
||
print(f"🔍 Найдена запись: {record_id}")
|
||
print(f"📋 Полная структура записи:")
|
||
print(json.dumps(record, ensure_ascii=False, indent=2, default=str))
|
||
|
||
# Извлекаем текст из meta.topicContent
|
||
meta_data = record.get('meta', {})
|
||
topic_content = meta_data.get('topicContent')
|
||
if not topic_content:
|
||
if verbose:
|
||
print(f"❌ В записи {record_id} отсутствует поле meta.topicContent")
|
||
print(f"📋 Полная структура записи:")
|
||
print(json.dumps(record, ensure_ascii=False, indent=2, default=str))
|
||
return False
|
||
|
||
if verbose:
|
||
print(f"📝 Тип поля meta.topicContent: {type(topic_content)}")
|
||
print(f"📝 Содержимое meta.topicContent (первые 500 символов):")
|
||
print(str(topic_content)[:500])
|
||
|
||
# Извлекаем текст
|
||
article_text = extract_text_from_topic_content(topic_content)
|
||
if not article_text:
|
||
if verbose:
|
||
print(f"❌ Не удалось извлечь текст из meta.topicContent записи {record_id}")
|
||
print(f"📋 Полная структура записи:")
|
||
print(json.dumps(record, ensure_ascii=False, indent=2, default=str))
|
||
return False
|
||
|
||
if verbose:
|
||
print(f"📝 Итоговый текст (первые 500 символов): {article_text[:500]}")
|
||
|
||
# Генерируем пересказ через LLM (если доступно)
|
||
expected_summary = ""
|
||
try:
|
||
# Создаем клиент Ollama для генерации пересказа
|
||
ollama_client = OllamaClient(base_url="http://10.0.0.9:11434")
|
||
|
||
# Загружаем промпт для пересказа
|
||
with open('prompts/summarization.txt', 'r', encoding='utf-8') as f:
|
||
summarization_prompt = f.read().strip()
|
||
|
||
# Формируем промпт с текстом статьи
|
||
prompt = summarization_prompt.format(text=article_text)
|
||
|
||
# Генерация пересказа с использованием модели translategemma:4b
|
||
# Контекст 128000 как указано в задаче
|
||
expected_summary = ollama_client.generate(
|
||
model="translategemma:4b",
|
||
prompt=prompt,
|
||
num_ctx=128000
|
||
)
|
||
|
||
if verbose:
|
||
print(f"🤖 Сгенерирован пересказ: {expected_summary[:200]}...")
|
||
|
||
except Exception as e:
|
||
if verbose:
|
||
print(f"⚠️ Не удалось сгенерировать пересказ через Ollama: {e}")
|
||
print(" Будет использован пустой пересказ")
|
||
# Если генерация не удалась, оставляем пустым (сохраняем обратную совместимость)
|
||
expected_summary = ""
|
||
|
||
# Создаем директорию для сохранения теста (всегда в tests/summarization)
|
||
output_path = Path("tests") / "summarization"
|
||
output_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Очищаем ID от недопустимых символов для имени файла
|
||
filename = sanitize_filename(record_id)
|
||
if not filename:
|
||
if verbose:
|
||
print(f"❌ Не удалось создать допустимое имя файла из ID записи {record_id}")
|
||
return False
|
||
|
||
# Используем очищенный ID записи как имя файла
|
||
test_file = output_path / f"{filename}.txt"
|
||
|
||
# Сохраняем текст статьи и ожидаемый пересказ с разделителем
|
||
with open(test_file, "w", encoding="utf-8") as f:
|
||
f.write(f"{article_text}{TEST_SEPARATOR}{expected_summary}")
|
||
|
||
if verbose:
|
||
print(f"✅ Создан тест tests/summarization/{filename}.txt")
|
||
print(f" Источник: MongoDB запись {record_id}")
|
||
print(f" Текст статьи (первые 100 символов): {article_text[:100]}...")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
if verbose:
|
||
print(f"❌ Ошибка при генерации теста: {e}")
|
||
try:
|
||
record = collection.find_one({"_id": record_id})
|
||
if record:
|
||
print(f"📋 Полная структура записи:")
|
||
print(json.dumps(record, ensure_ascii=False, indent=2, default=str))
|
||
except:
|
||
pass
|
||
return False
|
||
finally:
|
||
if 'client' in locals():
|
||
client.close()
|
||
|
||
def validate_test(test_data: Dict[str, str]) -> bool:
|
||
"""Валидирует тестовые данные."""
|
||
if not isinstance(test_data, dict):
|
||
print("❌ Тест должен быть словарём (JSON объект)")
|
||
return False
|
||
|
||
if "prompt" not in test_data:
|
||
print("❌ Отсутствует поле 'prompt'")
|
||
return False
|
||
|
||
if "expected" not in test_data:
|
||
print("❌ Отсутствует поле 'expected'")
|
||
return False
|
||
|
||
if not isinstance(test_data["prompt"], str):
|
||
print("❌ Поле 'prompt' должно быть строкой")
|
||
return False
|
||
|
||
if not isinstance(test_data["expected"], str):
|
||
print("❌ Поле 'expected' должно быть строкой")
|
||
return False
|
||
|
||
if not test_data["prompt"].strip():
|
||
print("❌ Поле 'prompt' не может быть пустым")
|
||
return False
|
||
|
||
if not test_data["expected"].strip():
|
||
print("❌ Поле 'expected' не может быть пустым")
|
||
return False
|
||
|
||
return True
|
||
|
||
def read_ids_from_file(file_path: str) -> list:
|
||
"""
|
||
Читает ID записей из файла.
|
||
|
||
Args:
|
||
file_path: Путь к файлу с ID записей (по одной на строку)
|
||
|
||
Returns:
|
||
Список ID записей
|
||
"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
ids = [line.strip() for line in f if line.strip()]
|
||
return ids
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при чтении файла {file_path}: {e}")
|
||
return []
|
||
|
||
def main():
|
||
"""Основная функция скрипта."""
|
||
parser = argparse.ArgumentParser(
|
||
description="Генератор тестов пересказов из MongoDB",
|
||
epilog="Примеры использования:\n"
|
||
" python scripts/generate_summarization_from_mongo.py --record-id 507f1f77bcf86cd799439011\n"
|
||
" python scripts/generate_summarization_from_mongo.py --id-file ids.txt"
|
||
)
|
||
group = parser.add_mutually_exclusive_group(required=True)
|
||
group.add_argument(
|
||
"--record-id",
|
||
type=str,
|
||
help="ID записи в MongoDB (для обработки одной записи)"
|
||
)
|
||
group.add_argument(
|
||
"--id-file",
|
||
type=str,
|
||
help="Файл с ID записей (по одной на строку, для обработки нескольких записей)"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.record_id:
|
||
print(f"🔍 Подключаюсь к MongoDB кластеру...")
|
||
print(f"📄 Извлекаю запись с ID: {args.record_id}")
|
||
print(f"💾 Сохраняю тест в: tests/summarization/")
|
||
|
||
success = generate_test_from_mongo_record(args.record_id)
|
||
|
||
if success:
|
||
print("\n✨ Готово! Тест успешно generated.")
|
||
else:
|
||
print("\n❌ Не удалось generated тест.")
|
||
sys.exit(1)
|
||
elif args.id_file:
|
||
print(f"🔍 Подключаюсь к MongoDB кластеру...")
|
||
print(f"📄 Извлекаю ID записи из файла: {args.id_file}")
|
||
print(f"💾 Сохраняю тесты в: tests/summarization/")
|
||
|
||
# Читаем ID из файла
|
||
record_ids = read_ids_from_file(args.id_file)
|
||
if not record_ids:
|
||
print("❌ Файл с ID записей пуст или недействителен.")
|
||
sys.exit(1)
|
||
|
||
print(f"📊 Найдено {len(record_ids)} ID записей в файле")
|
||
print("🔄 Начинаю обработку записей...\n")
|
||
|
||
success_count = 0
|
||
error_count = 0
|
||
|
||
for i, record_id in enumerate(record_ids, 1):
|
||
print(f"[{i}/{len(record_ids)}] Обрабатываю запись: {record_id}")
|
||
success = generate_test_from_mongo_record(record_id, verbose=True)
|
||
if success:
|
||
success_count += 1
|
||
else:
|
||
error_count += 1
|
||
print() # Пустая строка для разделения логов
|
||
|
||
print(f"\n📊 Итог:")
|
||
print(f" ✅ Успешно generated: {success_count}")
|
||
print(f" ❌ Ошибки: {error_count}")
|
||
|
||
if error_count > 0:
|
||
print(f"\n⚠️ Некоторые записи были обработаны с ошибками. Проверьте логи выше.")
|
||
sys.exit(1)
|
||
else:
|
||
print("\n✨ Готово! Все тесты успешно generated.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|