Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 130 additions & 56 deletions v2/eval/evaluate_reranker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os
import logging
from multiprocessing import Process, current_process
from multiprocessing import Process, current_process, Queue
import torch
import json
import queue
from pathlib import Path
import argparse
from typing import List

import mteb
from mteb import MTEB
Expand Down Expand Up @@ -168,35 +172,7 @@ def patched_load_results_file(self):
logger = logging.getLogger("main")


# GPU별 task 매핑 - 필요에 따라 GPU 번호를 조정하세요
TASK_LIST_RERANKER_GPU_MAPPING = {
7: [
"Ko-StrategyQA",
"AutoRAGRetrieval",
"PublicHealthQA",
"BelebeleRetrieval",
"XPQARetrieval",
"MultiLongDocRetrieval",
"MIRACLRetrieval",
"MrTidyRetrieval"
]
}

model_names = [
# "BAAI/bge-reranker-v2-m3",
# "dragonkue/bge-reranker-v2-m3-ko",
# "sigridjineth/ko-reranker-v1.1",
# "sigridjineth/ko-reranker-v1.2-preview",
"Alibaba-NLP/gte-multilingual-reranker-base",
"upskyy/ko-reranker-8k",
"Dongjin-kr/ko-reranker",
# "jinaai/jina-reranker-v2-base-multilingual",
# 여기에 다른 모델들 추가
]

previous_results_dir = "./results/stage1/top_1k_qrels"

def evaluate_reranker_model(model_name, gpu_id, tasks):
def evaluate_reranker_model(model_name: str, gpu_id: int, tasks: List[str], previous_results_dir: Path, output_base_dir: Path, top_k: int, verbosity: int):
try:
device = torch.device(f"cuda:{str(gpu_id)}")
torch.cuda.set_device(device)
Expand All @@ -205,16 +181,17 @@ def evaluate_reranker_model(model_name, gpu_id, tasks):
setproctitle(f"{model_name}-reranker-{gpu_id}")
print(f"Running tasks: {tasks} / {model_name} on GPU {gpu_id} in process {current_process().name}")

model_path = Path(model_name)
output_dir = output_base_dir / model_path.parent.name / model_path.name
output_dir.mkdir(parents=True, exist_ok=True)

cross_encoder = CrossEncoder(
model_name,
trust_remote_code=True,
model_kwargs={"torch_dtype": torch.bfloat16},
device=device
)

output_dir = os.path.join("./results/stage2", model_name)

# TODO 모델별 batch size 조정
batch_size = 2048

for task in tasks:
Expand All @@ -227,45 +204,142 @@ def evaluate_reranker_model(model_name, gpu_id, tasks):
)
evaluation = MTEB(tasks=tasks_mteb)

if os.path.exists(os.path.join(previous_results_dir, task + "_id.jsonl")):
previous_results_path = previous_results_dir / (task + "_id.jsonl")
if previous_results_path.exists():
print(f"Previous results found: {task}")
previous_results = os.path.join(previous_results_dir, task + "_id.jsonl")
previous_results = str(previous_results_path)

evaluation.run(
cross_encoder,
top_k=50,
top_k=top_k,
save_predictions=True,
output_folder=output_dir,
output_folder=str(output_dir),
previous_results=previous_results,
batch_size=batch_size
batch_size=batch_size,
verbosity=verbosity,
)
else:
print(f"Previous results not found: {task}")
evaluation.run(
cross_encoder,
top_k=50,
top_k=top_k,
save_predictions=True,
output_folder=output_dir,
batch_size=batch_size
output_folder=str(output_dir),
batch_size=batch_size,
verbosity=verbosity,
)

except Exception as ex:
print(f"Error in GPU {gpu_id} with model {model_name}: {ex}")
traceback.print_exc()

if __name__ == "__main__":
torch.multiprocessing.set_start_method('spawn')

for model_name in model_names:
print(f"Starting evaluation for model: {model_name}")
processes = []
def worker(job_queue: Queue, gpu_queue: Queue, previous_results_dir: Path, output_base_dir: Path, top_k: int, verbosity: int):
"""작업 큐와 GPU 큐에서 작업을 가져와 실행하는 워커 함수"""
while True:
try:
model_name, task = job_queue.get(timeout=1)
except queue.Empty:
break

for gpu_id, tasks in TASK_LIST_RERANKER_GPU_MAPPING.items():
p = Process(target=evaluate_reranker_model, args=(model_name, gpu_id, tasks))
p.start()
processes.append(p)

for p in processes:
p.join()

print(f"Completed evaluation for model: {model_name}")
gpu_id = None
try:
gpu_id = gpu_queue.get()
print(f"Process {current_process().name}: Starting task: {task} / {model_name} on GPU {gpu_id}")
evaluate_reranker_model(model_name, gpu_id, [task], previous_results_dir, output_base_dir, top_k, verbosity)
print(f"Process {current_process().name}: Finished task: {task} / {model_name} on GPU {gpu_id}")
except Exception:
print(f"!!!!!!!!!! Process {current_process().name}: Error during task: {task} / {model_name} on GPU {gpu_id} !!!!!!!!!!!")
traceback.print_exc()
finally:
if gpu_id is not None:
gpu_queue.put(gpu_id)


# --- 기본 설정값 (커맨드라인 인자로 덮어쓸 수 있음) ---
DEFAULT_MODEL_NAMES = [
"BAAI/bge-reranker-v2-m3",
"dragonkue/bge-reranker-v2-m3-ko",
"sigridjineth/ko-reranker-v1.1",
"sigridjineth/ko-reranker-v1.2-preview",
"Alibaba-NLP/gte-multilingual-reranker-base",
"upskyy/ko-reranker-8k",
"Dongjin-kr/ko-reranker",
"jinaai/jina-reranker-v2-base-multilingual",
]
DEFAULT_TASKS = [
"Ko-StrategyQA", "AutoRAGRetrieval", "PublicHealthQA", "BelebeleRetrieval",
"XPQARetrieval", "MultiLongDocRetrieval", "MIRACLRetrieval", "MrTidyRetrieval"
]
DEFAULT_GPU_IDS = [0, 1, 2, 3, 4, 6, 7]
V2_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_PREVIOUS_RESULTS_DIR = V2_ROOT / "eval/results/stage1/top_1k_qrels"
DEFAULT_OUTPUT_DIR = V2_ROOT / "eval/results/stage2"

assert V2_ROOT.exists(), f"V2_ROOT does not exist: {V2_ROOT}"
assert DEFAULT_PREVIOUS_RESULTS_DIR.exists(), f"DEFAULT_PREVIOUS_RESULTS_DIR does not exist: {DEFAULT_PREVIOUS_RESULTS_DIR}"
assert DEFAULT_OUTPUT_DIR.exists(), f"DEFAULT_OUTPUT_DIR does not exist: {DEFAULT_OUTPUT_DIR}"
# -----------------------------------------------------


def main():
parser = argparse.ArgumentParser(description="MTEB Reranker 벤치마크를 병렬로 실행합니다.")
parser.add_argument(
"--model_names", nargs="+", default=DEFAULT_MODEL_NAMES, help="평가할 리랭커 모델 이름 또는 경로 리스트"
)
parser.add_argument(
"--tasks", nargs="+", default=DEFAULT_TASKS, help="평가할 MTEB 태스크 리스트"
)
parser.add_argument(
"--gpu_ids", nargs="+", type=int, default=DEFAULT_GPU_IDS, help="사용할 GPU ID 리스트"
)
parser.add_argument(
"--previous_results_dir", type=str, default=str(DEFAULT_PREVIOUS_RESULTS_DIR), help="1단계(BM25) 결과가 저장된 디렉토리"
)
parser.add_argument(
"--output_dir", type=str, default=str(DEFAULT_OUTPUT_DIR), help="2단계(리랭킹) 최종 결과를 저장할 디렉토리"
)
parser.add_argument(
"--model_dir", type=str, default=None, help="평가할 로컬 모델들이 저장된 디렉토리. 각 하위 디렉토리가 모델로 간주됩니다."
)
parser.add_argument(
"--top_k", type=int, default=50, help="리랭킹에 사용할 상위 K개 문서 수"
)
parser.add_argument(
"--verbosity", type=int, default=0, help="MTEB 로그 상세 수준 (0: 진행률 표시줄만, 1: 점수 표시, 2: 상세 정보, 3: 디버그용)"
)
args = parser.parse_args()

torch.multiprocessing.set_start_method('spawn', force=True)

previous_results_dir = Path(args.previous_results_dir)
output_dir = Path(args.output_dir)

job_queue = Queue()
gpu_queue = Queue()

total_jobs = 0
for model_name in args.model_names:
for task in args.tasks:
job_queue.put((model_name, task))
total_jobs += 1

for gpu_id in args.gpu_ids:
gpu_queue.put(gpu_id)

processes = []
num_workers = len(args.gpu_ids)
print(f"Starting {num_workers} workers on GPUs: {args.gpu_ids}")
print(f"Total jobs to process: {total_jobs}")

for _ in range(num_workers):
p = Process(target=worker, args=(job_queue, gpu_queue, previous_results_dir, output_dir, args.top_k, args.verbosity))
p.start()
processes.append(p)

for p in processes:
p.join()

print("All evaluation tasks completed.")

if __name__ == "__main__":
main()