feat: vibe code done

This commit is contained in:
2026-01-16 19:58:29 +03:00
parent 408f6b86c6
commit 1a59adf5a5
24 changed files with 889 additions and 1 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

100
src/benchmarks/base.py Normal file
View File

@@ -0,0 +1,100 @@
import logging
import time
from typing import Dict, Any, List
from abc import ABC, abstractmethod
from models.ollama_client import OllamaClient
class Benchmark(ABC):
"""Базовый класс для всех бенчмарков."""
def __init__(self, name: str):
"""
Инициализация бенчмарка.
Args:
name: Название бенчмарка
"""
self.name = name
self.logger = logging.getLogger(__name__)
@abstractmethod
def load_test_data(self) -> List[Dict[str, Any]]:
"""
Загрузка тестовых данных.
Returns:
Список тестовых случаев
"""
pass
@abstractmethod
def evaluate(self, model_response: str, expected: str) -> float:
"""
Оценка качества ответа модели.
Args:
model_response: Ответ от модели
expected: Ожидаемый ответ
Returns:
Метрика качества (0-1)
"""
pass
def run(self, ollama_client: OllamaClient, model_name: str) -> Dict[str, Any]:
"""
Запуск бенчмарка.
Args:
ollama_client: Клиент для работы с Ollama
model_name: Название модели
Returns:
Результаты бенчмарка
"""
test_cases = self.load_test_data()
results = []
for i, test_case in enumerate(test_cases, 1):
try:
self.logger.info(f"Running test case {i}/{len(test_cases)} for {self.name}")
# Замер времени
start_time = time.time()
# Получение ответа от модели
prompt = test_case['prompt']
model_response = ollama_client.generate(
model=model_name,
prompt=prompt,
options={'temperature': 0.7}
)
# Замер времени
latency = time.time() - start_time
# Оценка качества
score = self.evaluate(model_response, test_case['expected'])
results.append({
'test_case': test_case['name'],
'prompt': prompt,
'expected': test_case['expected'],
'model_response': model_response,
'score': score,
'latency': latency
})
except Exception as e:
self.logger.error(f"Error in test case {i}: {e}")
results.append({
'test_case': test_case['name'],
'error': str(e)
})
return {
'benchmark_name': self.name,
'total_tests': len(test_cases),
'successful_tests': len([r for r in results if 'score' in r]),
'results': results
}

62
src/benchmarks/codegen.py Normal file
View File

@@ -0,0 +1,62 @@
import logging
import json
import os
from typing import Dict, Any, List
from benchmarks.base import Benchmark
class CodeGenBenchmark(Benchmark):
"""Бенчмарк для тестирования генерации кода."""
def __init__(self):
super().__init__("codegen")
def load_test_data(self) -> List[Dict[str, Any]]:
"""
Загрузка тестовых данных для генерации кода.
Returns:
Список тестовых случаев
"""
test_data = []
data_dir = "tests/codegen"
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
with open(os.path.join(data_dir, filename), 'r', encoding='utf-8') as f:
data = json.load(f)
test_data.append({
'name': filename.replace('.json', ''),
'prompt': data['prompt'],
'expected': data['expected']
})
return test_data
def evaluate(self, model_response: str, expected: str) -> float:
"""
Оценка качества сгенерированного кода.
Args:
model_response: Ответ от модели
expected: Ожидаемый ответ
Returns:
Метрика качества (0-1)
"""
# Простая оценка на основе совпадения токенов
model_tokens = set(model_response.lower().split())
expected_tokens = set(expected.lower().split())
if len(expected_tokens) == 0:
return 0.0
intersection = model_tokens.intersection(expected_tokens)
precision = len(intersection) / len(model_tokens) if model_tokens else 0.0
recall = len(intersection) / len(expected_tokens) if expected_tokens else 0.0
# F1-score
if (precision + recall) == 0:
return 0.0
f1 = 2 * (precision * recall) / (precision + recall)
return round(f1, 3)

View File

@@ -0,0 +1,62 @@
import logging
import json
import os
from typing import Dict, Any, List
from benchmarks.base import Benchmark
class SummarizationBenchmark(Benchmark):
"""Бенчмарк для тестирования пересказов."""
def __init__(self):
super().__init__("summarization")
def load_test_data(self) -> List[Dict[str, Any]]:
"""
Загрузка тестовых данных для пересказов.
Returns:
Список тестовых случаев
"""
test_data = []
data_dir = "tests/summarization"
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
with open(os.path.join(data_dir, filename), 'r', encoding='utf-8') as f:
data = json.load(f)
test_data.append({
'name': filename.replace('.json', ''),
'prompt': data['prompt'],
'expected': data['expected']
})
return test_data
def evaluate(self, model_response: str, expected: str) -> float:
"""
Оценка качества пересказа.
Args:
model_response: Ответ от модели
expected: Ожидаемый ответ
Returns:
Метрика качества (0-1)
"""
# Простая оценка на основе совпадения токенов
model_tokens = set(model_response.lower().split())
expected_tokens = set(expected.lower().split())
if len(expected_tokens) == 0:
return 0.0
intersection = model_tokens.intersection(expected_tokens)
precision = len(intersection) / len(model_tokens) if model_tokens else 0.0
recall = len(intersection) / len(expected_tokens) if expected_tokens else 0.0
# F1-score
if (precision + recall) == 0:
return 0.0
f1 = 2 * (precision * recall) / (precision + recall)
return round(f1, 3)

View File

@@ -0,0 +1,63 @@
import logging
import json
import os
from typing import Dict, Any, List
from benchmarks.base import Benchmark
class TranslationBenchmark(Benchmark):
"""Бенчмарк для тестирования переводов."""
def __init__(self):
super().__init__("translation")
def load_test_data(self) -> List[Dict[str, Any]]:
"""
Загрузка тестовых данных для переводов.
Returns:
Список тестовых случаев
"""
test_data = []
data_dir = "tests/translation"
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
with open(os.path.join(data_dir, filename), 'r', encoding='utf-8') as f:
data = json.load(f)
test_data.append({
'name': filename.replace('.json', ''),
'prompt': data['prompt'],
'expected': data['expected']
})
return test_data
def evaluate(self, model_response: str, expected: str) -> float:
"""
Оценка качества перевода.
Args:
model_response: Ответ от модели
expected: Ожидаемый ответ
Returns:
Метрика качества (0-1)
"""
# Простая оценка на основе совпадения токенов
# В реальном проекте можно использовать более сложные метрики
model_tokens = set(model_response.lower().split())
expected_tokens = set(expected.lower().split())
if len(expected_tokens) == 0:
return 0.0
intersection = model_tokens.intersection(expected_tokens)
precision = len(intersection) / len(model_tokens) if model_tokens else 0.0
recall = len(intersection) / len(expected_tokens) if expected_tokens else 0.0
# F1-score
if (precision + recall) == 0:
return 0.0
f1 = 2 * (precision * recall) / (precision + recall)
return round(f1, 3)

97
src/main.py Normal file
View File

@@ -0,0 +1,97 @@
import logging
import argparse
from typing import List
from models.ollama_client import OllamaClient
from benchmarks.translation import TranslationBenchmark
from benchmarks.summarization import SummarizationBenchmark
from benchmarks.codegen import CodeGenBenchmark
from utils.report import ReportGenerator
def setup_logging(verbose: bool = False):
"""Настройка логирования."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
def run_benchmarks(ollama_client: OllamaClient, model_name: str, benchmarks: List[str]) -> List[dict]:
"""
Запуск выбранных бенчмарков.
Args:
ollama_client: Клиент для работы с Ollama
model_name: Название модели
benchmarks: Список имен бенчмарков для запуска
Returns:
Список результатов бенчмарков
"""
benchmark_classes = {
'translation': TranslationBenchmark,
'summarization': SummarizationBenchmark,
'codegen': CodeGenBenchmark
}
results = []
for benchmark_name in benchmarks:
if benchmark_name not in benchmark_classes:
logging.warning(f"Unknown benchmark: {benchmark_name}")
continue
logging.info(f"Running {benchmark_name} benchmark...")
benchmark = benchmark_classes[benchmark_name]()
result = benchmark.run(ollama_client, model_name)
results.append(result)
return results
def main():
"""Основная функция запуска."""
parser = argparse.ArgumentParser(description='LLM Benchmarking Tool')
parser.add_argument('--model', required=True, help='Название модели для тестирования')
parser.add_argument('--ollama-url', required=True, help='URL подключения к Ollama серверу')
parser.add_argument('--benchmarks', nargs='+', default=['translation', 'summarization', 'codegen'],
help='Список бенчмарков для выполнения (translation, summarization, codegen)')
parser.add_argument('--output', default='results', help='Директория для сохранения результатов')
parser.add_argument('--verbose', action='store_true', help='Подробный режим вывода')
args = parser.parse_args()
# Настройка логирования
setup_logging(args.verbose)
logging.info(f"Starting benchmarking for model: {args.model}")
logging.info(f"Ollama URL: {args.ollama_url}")
logging.info(f"Benchmarks to run: {', '.join(args.benchmarks)}")
try:
# Инициализация клиента
ollama_client = OllamaClient(args.ollama_url)
# Запуск бенчмарков
results = run_benchmarks(ollama_client, args.model, args.benchmarks)
# Генерация отчетов
report_generator = ReportGenerator()
for result in results:
report_generator.generate_benchmark_report(result, args.output, args.model)
if len(results) > 1:
report_generator.generate_summary_report(results, args.output, args.model)
logging.info("Benchmarking completed successfully!")
except Exception as e:
logging.error(f"Error during benchmarking: {e}")
return 1
return 0
if __name__ == '__main__':
exit(main())

Binary file not shown.

View File

@@ -0,0 +1,85 @@
import logging
from typing import Optional, Dict, Any
from ollama import Client
class OllamaClient:
"""Клиент для работы с Ollama API."""
def __init__(self, base_url: str):
"""
Инициализация клиента.
Args:
base_url: Базовый URL для подключения к Ollama серверу
"""
self.base_url = base_url
self.client = Client(host=base_url)
self.logger = logging.getLogger(__name__)
def generate(self, model: str, prompt: str, **kwargs) -> str:
"""
Генерация ответа от модели.
Args:
model: Название модели
prompt: Входной промпт
**kwargs: Дополнительные параметры для запроса
Returns:
Сгенерированный ответ
Raises:
Exception: Если произошла ошибка при генерации
"""
try:
self.logger.info(f"Generating response for model {model}")
response = self.client.generate(
model=model,
prompt=prompt,
**kwargs
)
return response['response']
except Exception as e:
self.logger.error(f"Error generating response: {e}")
raise
def chat(self, model: str, messages: list, **kwargs) -> str:
"""
Диалог с моделью.
Args:
model: Название модели
messages: Список сообщений в формате [{'role': 'user', 'content': '...'}, ...]
**kwargs: Дополнительные параметры для запроса
Returns:
Ответ от модели
Raises:
Exception: Если произошла ошибка при чате
"""
try:
self.logger.info(f"Chatting with model {model}")
response = self.client.chat(
model=model,
messages=messages,
**kwargs
)
return response['message']['content']
except Exception as e:
self.logger.error(f"Error in chat: {e}")
raise
def list_models(self) -> list:
"""
Получение списка доступных моделей.
Returns:
Список моделей
"""
try:
response = self.client.list()
return [model['name'] for model in response['models']]
except Exception as e:
self.logger.error(f"Error listing models: {e}")
raise

Binary file not shown.

162
src/utils/report.py Normal file
View File

@@ -0,0 +1,162 @@
import logging
import os
from typing import Dict, Any, List
from datetime import datetime
from py_markdown_table.markdown_table import markdown_table
class ReportGenerator:
"""Генератор отчетов в формате Markdown."""
def __init__(self):
self.logger = logging.getLogger(__name__)
def generate_benchmark_report(self, results: Dict[str, Any], output_dir: str = "results", model_name: str = None) -> str:
"""
Генерация отчета для одного бенчмарка.
Args:
results: Результаты бенчмарка
output_dir: Директория для сохранения отчета
model_name: Имя модели (для структурирования результатов)
Returns:
Путь к сгенерированному файлу
"""
if model_name:
model_dir = os.path.join(output_dir, model_name)
os.makedirs(model_dir, exist_ok=True)
output_dir = model_dir
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{results['benchmark_name']}_{timestamp}.md"
file_path = os.path.join(output_dir, filename)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(f"# Отчет бенчмарка: {results['benchmark_name']}\n\n")
f.write(f"**Дата:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**Общее количество тестов:** {results['total_tests']}\n\n")
f.write(f"**Успешно выполнено:** {results['successful_tests']}\n\n")
# Таблица с результатами
table_data = [
{
"Тест": "Тест",
"Скор": "Скор",
"Время (с)": "Время (с)",
"Промпт": "Промпт",
"Ожидаемый": "Ожидаемый",
"Ответ модели": "Ответ модели"
}
]
for result in results['results']:
if 'error' in result:
table_data.append({
"Тест": result['test_case'],
"Скор": "Ошибка",
"Время (с)": "-",
"Промпт": result['prompt'][:50] + "..." if len(result['prompt']) > 50 else result['prompt'],
"Ожидаемый": result['expected'][:50] + "..." if len(result['expected']) > 50 else result['expected'],
"Ответ модели": result['error']
})
else:
table_data.append({
"Тест": result['test_case'],
"Скор": str(result['score']),
"Время (с)": f"{result['latency']:.2f}",
"Промпт": result['prompt'][:50] + "..." if len(result['prompt']) > 50 else result['prompt'],
"Ожидаемый": result['expected'][:50] + "..." if len(result['expected']) > 50 else result['expected'],
"Ответ модели": result['model_response'][:50] + "..." if len(result['model_response']) > 50 else result['model_response']
})
f.write("## Результаты тестов\n\n")
f.write(markdown_table(table_data).get_markdown())
f.write("\n\n")
# Статистика
successful = [r for r in results['results'] if 'score' in r]
if successful:
avg_score = sum(r['score'] for r in successful) / len(successful)
avg_latency = sum(r['latency'] for r in successful) / len(successful)
f.write("## Статистика\n\n")
f.write(f"- **Средний скор:** {avg_score:.3f}\n")
f.write(f"- **Среднее время ответа:** {avg_latency:.3f} секунд\n")
self.logger.info(f"Report saved to {file_path}")
return file_path
def generate_summary_report(self, all_results: List[Dict[str, Any]], output_dir: str = "results", model_name: str = None) -> str:
"""
Генерация сводного отчета по всем бенчмаркам.
Args:
all_results: Список результатов всех бенчмарков
output_dir: Директория для сохранения отчета
model_name: Имя модели (для структурирования результатов)
Returns:
Путь к сгенерированному файлу
"""
if model_name:
model_dir = os.path.join(output_dir, model_name)
os.makedirs(model_dir, exist_ok=True)
output_dir = model_dir
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"summary_{timestamp}.md"
file_path = os.path.join(output_dir, filename)
with open(file_path, 'w', encoding='utf-8') as f:
f.write("# Сводный отчет по всем бенчмаркам\n\n")
f.write(f"**Дата:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
if model_name:
f.write(f"**Модель:** {model_name}\n\n")
# Таблица с общими результатами
table_data = [
{
"Бенчмарк": "Бенчмарк",
"Тестов": "Тестов",
"Успешно": "Успешно",
"Средний скор": "Средний скор",
"Среднее время": "Среднее время"
}
]
for result in all_results:
successful = [r for r in result['results'] if 'score' in r]
avg_score = sum(r['score'] for r in successful) / len(successful) if successful else 0
avg_latency = sum(r['latency'] for r in successful) / len(successful) if successful else 0
table_data.append({
"Бенчмарк": result['benchmark_name'],
"Тестов": str(result['total_tests']),
"Успешно": str(result['successful_tests']),
"Средний скор": f"{avg_score:.3f}" if successful else "0",
"Среднее время": f"{avg_latency:.3f}" if successful else "0"
})
f.write("## Общие результаты\n\n")
f.write(markdown_table(table_data).get_markdown())
f.write("\n\n")
# Подробности по каждому бенчмарку
f.write("## Подробности\n\n")
for result in all_results:
f.write(f"### {result['benchmark_name']}\n\n")
f.write(f"- **Тестов:** {result['total_tests']}\n")
f.write(f"- **Успешно:** {result['successful_tests']}\n")
successful = [r for r in result['results'] if 'score' in r]
if successful:
avg_score = sum(r['score'] for r in successful) / len(successful)
avg_latency = sum(r['latency'] for r in successful) / len(successful)
f.write(f"- **Средний скор:** {avg_score:.3f}\n")
f.write(f"- **Среднее время:** {avg_latency:.3f} секунд\n")
f.write("\n")
self.logger.info(f"Summary report saved to {file_path}")
return file_path