diff --git a/v2/eval/evaluate_reranker.py b/v2/eval/evaluate_reranker.py index c16aa95..9503d24 100644 --- a/v2/eval/evaluate_reranker.py +++ b/v2/eval/evaluate_reranker.py @@ -1,8 +1,12 @@ import os import logging -from multiprocessing import Process, current_process +from multiprocessing import Process, current_process, Queue import torch import json +import queue +from pathlib import Path +import argparse +from typing import List import mteb from mteb import MTEB @@ -168,35 +172,7 @@ def patched_load_results_file(self): logger = logging.getLogger("main") -# GPU별 task 매핑 - 필요에 따라 GPU 번호를 조정하세요 -TASK_LIST_RERANKER_GPU_MAPPING = { - 7: [ - "Ko-StrategyQA", - "AutoRAGRetrieval", - "PublicHealthQA", - "BelebeleRetrieval", - "XPQARetrieval", - "MultiLongDocRetrieval", - "MIRACLRetrieval", - "MrTidyRetrieval" - ] -} - -model_names = [ - # "BAAI/bge-reranker-v2-m3", - # "dragonkue/bge-reranker-v2-m3-ko", - # "sigridjineth/ko-reranker-v1.1", - # "sigridjineth/ko-reranker-v1.2-preview", - "Alibaba-NLP/gte-multilingual-reranker-base", - "upskyy/ko-reranker-8k", - "Dongjin-kr/ko-reranker", - # "jinaai/jina-reranker-v2-base-multilingual", - # 여기에 다른 모델들 추가 -] - -previous_results_dir = "./results/stage1/top_1k_qrels" - -def evaluate_reranker_model(model_name, gpu_id, tasks): +def evaluate_reranker_model(model_name: str, gpu_id: int, tasks: List[str], previous_results_dir: Path, output_base_dir: Path, top_k: int, verbosity: int): try: device = torch.device(f"cuda:{str(gpu_id)}") torch.cuda.set_device(device) @@ -205,6 +181,10 @@ def evaluate_reranker_model(model_name, gpu_id, tasks): setproctitle(f"{model_name}-reranker-{gpu_id}") print(f"Running tasks: {tasks} / {model_name} on GPU {gpu_id} in process {current_process().name}") + model_path = Path(model_name) + output_dir = output_base_dir / model_path.parent.name / model_path.name + output_dir.mkdir(parents=True, exist_ok=True) + cross_encoder = CrossEncoder( model_name, trust_remote_code=True, @@ -212,9 +192,6 @@ def evaluate_reranker_model(model_name, gpu_id, tasks): device=device ) - output_dir = os.path.join("./results/stage2", model_name) - - # TODO 모델별 batch size 조정 batch_size = 2048 for task in tasks: @@ -227,45 +204,142 @@ def evaluate_reranker_model(model_name, gpu_id, tasks): ) evaluation = MTEB(tasks=tasks_mteb) - if os.path.exists(os.path.join(previous_results_dir, task + "_id.jsonl")): + previous_results_path = previous_results_dir / (task + "_id.jsonl") + if previous_results_path.exists(): print(f"Previous results found: {task}") - previous_results = os.path.join(previous_results_dir, task + "_id.jsonl") + previous_results = str(previous_results_path) evaluation.run( cross_encoder, - top_k=50, + top_k=top_k, save_predictions=True, - output_folder=output_dir, + output_folder=str(output_dir), previous_results=previous_results, - batch_size=batch_size + batch_size=batch_size, + verbosity=verbosity, ) else: print(f"Previous results not found: {task}") evaluation.run( cross_encoder, - top_k=50, + top_k=top_k, save_predictions=True, - output_folder=output_dir, - batch_size=batch_size + output_folder=str(output_dir), + batch_size=batch_size, + verbosity=verbosity, ) except Exception as ex: print(f"Error in GPU {gpu_id} with model {model_name}: {ex}") traceback.print_exc() -if __name__ == "__main__": - torch.multiprocessing.set_start_method('spawn') - - for model_name in model_names: - print(f"Starting evaluation for model: {model_name}") - processes = [] +def worker(job_queue: Queue, gpu_queue: Queue, previous_results_dir: Path, output_base_dir: Path, top_k: int, verbosity: int): + """작업 큐와 GPU 큐에서 작업을 가져와 실행하는 워커 함수""" + while True: + try: + model_name, task = job_queue.get(timeout=1) + except queue.Empty: + break - for gpu_id, tasks in TASK_LIST_RERANKER_GPU_MAPPING.items(): - p = Process(target=evaluate_reranker_model, args=(model_name, gpu_id, tasks)) - p.start() - processes.append(p) - - for p in processes: - p.join() - - print(f"Completed evaluation for model: {model_name}") \ No newline at end of file + gpu_id = None + try: + gpu_id = gpu_queue.get() + print(f"Process {current_process().name}: Starting task: {task} / {model_name} on GPU {gpu_id}") + evaluate_reranker_model(model_name, gpu_id, [task], previous_results_dir, output_base_dir, top_k, verbosity) + print(f"Process {current_process().name}: Finished task: {task} / {model_name} on GPU {gpu_id}") + except Exception: + print(f"!!!!!!!!!! Process {current_process().name}: Error during task: {task} / {model_name} on GPU {gpu_id} !!!!!!!!!!!") + traceback.print_exc() + finally: + if gpu_id is not None: + gpu_queue.put(gpu_id) + + +# --- 기본 설정값 (커맨드라인 인자로 덮어쓸 수 있음) --- +DEFAULT_MODEL_NAMES = [ + "BAAI/bge-reranker-v2-m3", + "dragonkue/bge-reranker-v2-m3-ko", + "sigridjineth/ko-reranker-v1.1", + "sigridjineth/ko-reranker-v1.2-preview", + "Alibaba-NLP/gte-multilingual-reranker-base", + "upskyy/ko-reranker-8k", + "Dongjin-kr/ko-reranker", + "jinaai/jina-reranker-v2-base-multilingual", +] +DEFAULT_TASKS = [ + "Ko-StrategyQA", "AutoRAGRetrieval", "PublicHealthQA", "BelebeleRetrieval", + "XPQARetrieval", "MultiLongDocRetrieval", "MIRACLRetrieval", "MrTidyRetrieval" +] +DEFAULT_GPU_IDS = [0, 1, 2, 3, 4, 6, 7] +V2_ROOT = Path(__file__).resolve().parents[1] +DEFAULT_PREVIOUS_RESULTS_DIR = V2_ROOT / "eval/results/stage1/top_1k_qrels" +DEFAULT_OUTPUT_DIR = V2_ROOT / "eval/results/stage2" + +assert V2_ROOT.exists(), f"V2_ROOT does not exist: {V2_ROOT}" +assert DEFAULT_PREVIOUS_RESULTS_DIR.exists(), f"DEFAULT_PREVIOUS_RESULTS_DIR does not exist: {DEFAULT_PREVIOUS_RESULTS_DIR}" +assert DEFAULT_OUTPUT_DIR.exists(), f"DEFAULT_OUTPUT_DIR does not exist: {DEFAULT_OUTPUT_DIR}" +# ----------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser(description="MTEB Reranker 벤치마크를 병렬로 실행합니다.") + parser.add_argument( + "--model_names", nargs="+", default=DEFAULT_MODEL_NAMES, help="평가할 리랭커 모델 이름 또는 경로 리스트" + ) + parser.add_argument( + "--tasks", nargs="+", default=DEFAULT_TASKS, help="평가할 MTEB 태스크 리스트" + ) + parser.add_argument( + "--gpu_ids", nargs="+", type=int, default=DEFAULT_GPU_IDS, help="사용할 GPU ID 리스트" + ) + parser.add_argument( + "--previous_results_dir", type=str, default=str(DEFAULT_PREVIOUS_RESULTS_DIR), help="1단계(BM25) 결과가 저장된 디렉토리" + ) + parser.add_argument( + "--output_dir", type=str, default=str(DEFAULT_OUTPUT_DIR), help="2단계(리랭킹) 최종 결과를 저장할 디렉토리" + ) + parser.add_argument( + "--model_dir", type=str, default=None, help="평가할 로컬 모델들이 저장된 디렉토리. 각 하위 디렉토리가 모델로 간주됩니다." + ) + parser.add_argument( + "--top_k", type=int, default=50, help="리랭킹에 사용할 상위 K개 문서 수" + ) + parser.add_argument( + "--verbosity", type=int, default=0, help="MTEB 로그 상세 수준 (0: 진행률 표시줄만, 1: 점수 표시, 2: 상세 정보, 3: 디버그용)" + ) + args = parser.parse_args() + + torch.multiprocessing.set_start_method('spawn', force=True) + + previous_results_dir = Path(args.previous_results_dir) + output_dir = Path(args.output_dir) + + job_queue = Queue() + gpu_queue = Queue() + + total_jobs = 0 + for model_name in args.model_names: + for task in args.tasks: + job_queue.put((model_name, task)) + total_jobs += 1 + + for gpu_id in args.gpu_ids: + gpu_queue.put(gpu_id) + + processes = [] + num_workers = len(args.gpu_ids) + print(f"Starting {num_workers} workers on GPUs: {args.gpu_ids}") + print(f"Total jobs to process: {total_jobs}") + + for _ in range(num_workers): + p = Process(target=worker, args=(job_queue, gpu_queue, previous_results_dir, output_dir, args.top_k, args.verbosity)) + p.start() + processes.append(p) + + for p in processes: + p.join() + + print("All evaluation tasks completed.") + +if __name__ == "__main__": + main() \ No newline at end of file