diff --git a/requirements.txt b/requirements.txt index c9b9796c..f9f38b84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ yoyo-migrations ruff pre-commit better_profanity +sqlalchemy # api fastapi[all] # install all to avoid random bugs diff --git a/scripts/flush_db.py b/scripts/flush_db.py index ed1c37cb..e758c436 100644 --- a/scripts/flush_db.py +++ b/scripts/flush_db.py @@ -2,9 +2,9 @@ import os -import psycopg2 from dotenv import load_dotenv -from psycopg2 import Error +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError def flush_database(): @@ -18,35 +18,34 @@ def flush_database(): return try: - # Connect to database + # Connect to database using SQLAlchemy print("📡 Connecting to database...") - connection = psycopg2.connect(DATABASE_URL, sslmode="require") - cursor = connection.cursor() - - # Drop existing tables - print("🗑️ Dropping existing tables...") - drop_tables_query = """ - DROP TABLE IF EXISTS submissions CASCADE; - DROP TABLE IF EXISTS leaderboard CASCADE; - DROP TABLE IF EXISTS runinfo CASCADE; - DROP TABLE IF EXISTS _yoyo_log CASCADE; - DROP TABLE IF EXISTS _yoyo_migration CASCADE; - DROP TABLE IF EXISTS _yoyo_version CASCADE; - DROP TABLE IF EXISTS yoyo_lock CASCADE; - DROP SCHEMA IF EXISTS leaderboard CASCADE; - """ - cursor.execute(drop_tables_query) - # Commit changes - connection.commit() + engine = create_engine(DATABASE_URL) + + with engine.connect() as connection: + with connection.begin(): + # Drop existing tables + print("🗑️ Dropping existing tables...") + drop_tables_query = text(""" + DROP TABLE IF EXISTS submissions CASCADE; + DROP TABLE IF EXISTS leaderboard CASCADE; + DROP TABLE IF EXISTS runinfo CASCADE; + DROP TABLE IF EXISTS _yoyo_log CASCADE; + DROP TABLE IF EXISTS _yoyo_migration CASCADE; + DROP TABLE IF EXISTS _yoyo_version CASCADE; + DROP TABLE IF EXISTS yoyo_lock CASCADE; + DROP SCHEMA IF EXISTS leaderboard CASCADE; + """) + connection.execute(drop_tables_query) + print("✅ Database flushed and recreated successfully!") - except Error as e: + except SQLAlchemyError as e: print(f"❌ Database error: {e}") + except Exception as e: + print(f"❌ Unexpected error: {e}") finally: - if "connection" in locals(): - cursor.close() - connection.close() - print("🔌 Database connection closed") + print("🔌 Database operation completed") if __name__ == "__main__": diff --git a/scripts/update_leaderboard.py b/scripts/update_leaderboard.py index 0ae7e224..dc884299 100644 --- a/scripts/update_leaderboard.py +++ b/scripts/update_leaderboard.py @@ -2,9 +2,10 @@ import os from datetime import datetime -import psycopg2 import requests from jinja2 import Template +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError TOKEN = os.environ.get("DISCORD_DUMMY_TOKEN") @@ -80,102 +81,108 @@ def get_name_from_id(user_id: str) -> str: def fetch_leaderboard_data(): print("Fetching data from database...") try: - with psycopg2.connect(DATABASE_URL) as conn: - with conn.cursor() as cur: - cur.execute( - """ - SELECT id, name, deadline - FROM leaderboard.leaderboard - """ + engine = create_engine(DATABASE_URL) + with engine.connect() as connection: + # Get all leaderboards + leaderboards_query = text(""" + SELECT id, name, deadline + FROM leaderboard.leaderboard + """) + + leaderboards_result = connection.execute(leaderboards_query) + leaderboards = leaderboards_result.fetchall() + + # Get active leaderboards with their GPU types and submission counts + submissions_query = text(""" + WITH unique_best_submissions AS ( + SELECT DISTINCT ON (s.user_id) + s.file_name, + s.user_id, + s.submission_time, + r.score, + r.runner + FROM leaderboard.runs r + JOIN leaderboard.submission s ON r.submission_id = s.id + JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id + WHERE l.name = :leaderboard_name AND r.runner = :gpu_type AND NOT r.secret + AND r.score IS NOT NULL AND r.passed + ORDER BY s.user_id, r.score ASC ) - - leaderboards = cur.fetchall() - - # Get active leaderboards with their GPU types and submission counts - query = """ - WITH unique_best_submissions AS ( - SELECT DISTINCT ON (s.user_id) - s.file_name, - s.user_id, - s.submission_time, - r.score, - r.runner - FROM leaderboard.runs r - JOIN leaderboard.submission s ON r.submission_id = s.id - JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id - WHERE l.name = %s AND r.runner = %s AND NOT r.secret - AND r.score IS NOT NULL AND r.passed - ORDER BY s.user_id, r.score ASC + SELECT + file_name, + user_id, + submission_time, + score, + runner, + ROW_NUMBER() OVER (ORDER BY score ASC) as rank + FROM unique_best_submissions + ORDER BY score ASC; + """) + + gpu_type_data = {} + for _lb_id, name, deadline in leaderboards: + # Get GPU types for this leaderboard + gpu_types_query = text(""" + SELECT gpu_type + FROM leaderboard.gpu_type + WHERE leaderboard_id = :leaderboard_id + """) + + gpu_types_result = connection.execute(gpu_types_query, {'leaderboard_id': _lb_id}) + gpu_types = [row[0] for row in gpu_types_result.fetchall()] + + for gpu_type in gpu_types: + submissions_result = connection.execute( + submissions_query, + {'leaderboard_name': name, 'gpu_type': gpu_type} ) - SELECT - file_name, - user_id, - submission_time, - score, - runner, - ROW_NUMBER() OVER (ORDER BY score ASC) as rank - FROM unique_best_submissions - ORDER BY score ASC; - """ - - gpu_type_data = {} - for ( - _lb_id, - name, - deadline, - ) in leaderboards: - cur.execute( - "SELECT * from leaderboard.gpu_type where leaderboard_id = %s", [_lb_id] + submissions = submissions_result.fetchall() + + print( + f"Found {len(submissions)} active submissions in {name} for {gpu_type}" ) - gpu_types = [x[1] for x in cur.fetchall()] - - for gpu_type in gpu_types: - args = (name, gpu_type) - cur.execute(query, args) - submissions = cur.fetchall() - - print( - f"Found {len(submissions)} active submissions in {name} for {gpu_type}" - ) - - if len(submissions) > 0: - if gpu_type not in gpu_type_data: - gpu_type_data[gpu_type] = {} - - gpu_submissions = [] - for lb in submissions: - user_id = lb[1] - time = lb[3] - rank = lb[5] - global_name = get_name_from_id(user_id) - gpu_submissions.append( - { - "user": f"{global_name}", - "time": f"{time:.9f}", - "rank": rank, - } - ) - - # Sort submissions by time - gpu_submissions.sort(key=lambda x: float(x["time"])) - - gpu_type_data[gpu_type][name] = { - "name": name, - "deadline": deadline.strftime("%Y-%m-%d %H:%M"), - "submissions": gpu_submissions, - } - - # Convert to final format - formatted_data = { - "gpu_types": [ - {"name": gpu_type, "problems": list(problems.values())} - for gpu_type, problems in gpu_type_data.items() - ], - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC"), - } - - print("Data fetched successfully") - return formatted_data + + if len(submissions) > 0: + if gpu_type not in gpu_type_data: + gpu_type_data[gpu_type] = {} + + gpu_submissions = [] + for lb in submissions: + user_id = lb[1] + time = lb[3] + rank = lb[5] + global_name = get_name_from_id(user_id) + gpu_submissions.append( + { + "user": f"{global_name}", + "time": f"{time:.9f}", + "rank": rank, + } + ) + + # Sort submissions by time + gpu_submissions.sort(key=lambda x: float(x["time"])) + + gpu_type_data[gpu_type][name] = { + "name": name, + "deadline": deadline.strftime("%Y-%m-%d %H:%M"), + "submissions": gpu_submissions, + } + + # Convert to final format + formatted_data = { + "gpu_types": [ + {"name": gpu_type, "problems": list(problems.values())} + for gpu_type, problems in gpu_type_data.items() + ], + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC"), + } + + print("Data fetched successfully") + return formatted_data + except SQLAlchemyError as e: + print(f"Database error: {str(e)}") + raise except Exception as e: print(f"Error fetching data: {str(e)}") raise diff --git a/src/discord-cluster-manager/cogs/misc_cog.py b/src/discord-cluster-manager/cogs/misc_cog.py index 9771f69c..714990c4 100644 --- a/src/discord-cluster-manager/cogs/misc_cog.py +++ b/src/discord-cluster-manager/cogs/misc_cog.py @@ -2,9 +2,10 @@ from typing import TYPE_CHECKING import discord -import psycopg2 from discord import app_commands from discord.ext import commands +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError from env import DATABASE_URL from utils import send_discord_message, setup_logging @@ -33,17 +34,21 @@ async def verify_db(self, interaction: discord.Interaction): return try: - with psycopg2.connect(DATABASE_URL, sslmode="require") as conn: - with conn.cursor() as cursor: - cursor.execute("SELECT RANDOM()") - result = cursor.fetchone() - if result: - random_value = result[0] - await send_discord_message( - interaction, f"Your lucky number is {random_value}." - ) - else: - await send_discord_message(interaction, "No result returned.") + engine = create_engine(DATABASE_URL) + with engine.connect() as connection: + result = connection.execute(text("SELECT RANDOM()")) + row = result.fetchone() + if row: + random_value = row[0] + await send_discord_message( + interaction, f"Your lucky number is {random_value}." + ) + else: + await send_discord_message(interaction, "No result returned.") + except SQLAlchemyError as e: + message = "Database error occurred" + logger.error(f"{message}: {str(e)}", exc_info=True) + await send_discord_message(interaction, f"{message}.") except Exception as e: message = "Error interacting with the database" logger.error(f"{message}: {str(e)}", exc_info=True) diff --git a/src/discord-cluster-manager/leaderboard_db.py b/src/discord-cluster-manager/leaderboard_db.py index 173fd5b8..e5582c48 100644 --- a/src/discord-cluster-manager/leaderboard_db.py +++ b/src/discord-cluster-manager/leaderboard_db.py @@ -5,7 +5,14 @@ from typing import List, Optional import discord -import psycopg2 +from sqlalchemy import ( + Boolean, Column, DateTime, Integer, String, Text, Float, + MetaData, Table, create_engine, select, insert, update, delete, + func, and_, or_, desc, asc +) +from sqlalchemy.engine import Engine +from sqlalchemy.sql import text +from sqlalchemy.dialects.postgresql import UUID from env import ( DATABASE_URL, DISABLE_SSL, @@ -31,6 +38,71 @@ logger = setup_logging(__name__) +# Define database schema using SQLAlchemy Core +metadata = MetaData(schema='leaderboard') + +leaderboard_table = Table( + 'leaderboard', metadata, + Column('id', Integer, primary_key=True), + Column('name', String, unique=True, nullable=False), + Column('deadline', DateTime), + Column('task', Text), + Column('creator_id', String), + Column('forum_id', String), + Column('secret_seed', String) +) + +gpu_type_table = Table( + 'gpu_type', metadata, + Column('id', Integer, primary_key=True), + Column('leaderboard_id', Integer, nullable=False), + Column('gpu_type', String, nullable=False) +) + +submission_table = Table( + 'submission', metadata, + Column('id', Integer, primary_key=True), + Column('leaderboard_id', Integer, nullable=False), + Column('file_name', String), + Column('user_id', String, nullable=False), + Column('code_id', Integer), + Column('submission_time', DateTime), + Column('done', Boolean, default=False) +) + +runs_table = Table( + 'runs', metadata, + Column('id', Integer, primary_key=True), + Column('submission_id', Integer, nullable=False), + Column('start_time', DateTime), + Column('end_time', DateTime), + Column('mode', String), + Column('secret', Boolean, default=False), + Column('runner', String), + Column('score', Float), + Column('passed', Boolean), + Column('compilation', Text), + Column('meta', Text), + Column('result', Text), + Column('system_info', Text) +) + +code_files_table = Table( + 'code_files', metadata, + Column('id', Integer, primary_key=True), + Column('code', Text, nullable=False), + Column('hash', String) +) + +user_info_table = Table( + 'user_info', metadata, + Column('id', String, primary_key=True), + Column('user_name', String), + Column('cli_id', String), + Column('cli_auth_provider', String), + Column('cli_valid', Boolean, default=False), + Column('created_at', DateTime, default=func.now()) +) async def leaderboard_name_autocomplete( interaction: discord.Interaction, @@ -58,38 +130,30 @@ async def leaderboard_name_autocomplete( class LeaderboardDB: def __init__(self, host: str, database: str, user: str, password: str, port: str = "5432"): """Initialize database connection parameters""" - self.connection_params = { - "host": host, - "database": database, - "user": user, - "password": password, - "port": port, - } - self.connection: Optional[psycopg2.extensions.connection] = None + if DATABASE_URL: + ssl_args = {} if DISABLE_SSL else {"sslmode": "require"} + self.engine = create_engine(DATABASE_URL, **ssl_args) + else: + connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}" + ssl_args = {} if DISABLE_SSL else {"sslmode": "require"} + self.engine = create_engine(connection_string, **ssl_args) + + self.connection = None self.refcount: int = 0 - self.cursor: Optional[psycopg2.extensions.cursor] = None def connect(self) -> bool: """Establish connection to the database""" try: - self.connection = ( - psycopg2.connect(DATABASE_URL, sslmode="require" if not DISABLE_SSL else "disable") - if DATABASE_URL - else psycopg2.connect(**self.connection_params) - ) - self.cursor = self.connection.cursor() + self.connection = self.engine.connect() return True - except psycopg2.Error as e: + except Exception as e: logger.exception("Error connecting to PostgreSQL", exc_info=e) return False def disconnect(self): - """Close database connection and cursor""" - if self.cursor: - self.cursor.close() + """Close database connection""" if self.connection: self.connection.close() - self.cursor = None self.connection = None def __enter__(self): @@ -111,103 +175,95 @@ def __exit__(self, exc_type, exc_val, exc_tb): def create_leaderboard(self, leaderboard: LeaderboardItem) -> int: try: - self.cursor.execute( - """ - INSERT INTO leaderboard.leaderboard (name, deadline, task, creator_id, forum_id) - VALUES (%s, %s, %s, %s, %s) - RETURNING id - """, - ( - leaderboard["name"], - leaderboard["deadline"], - leaderboard["task"].to_str(), - leaderboard["creator_id"], - leaderboard["forum_id"], - ), - ) + with self.connection.begin(): + # Insert leaderboard + stmt = insert(leaderboard_table).values( + name=leaderboard["name"], + deadline=leaderboard["deadline"], + task=leaderboard["task"].to_str(), + creator_id=leaderboard["creator_id"], + forum_id=leaderboard["forum_id"] + ).returning(leaderboard_table.c.id) + + result = self.connection.execute(stmt) + leaderboard_id = result.fetchone()[0] + + # Insert GPU types + if isinstance(leaderboard["gpu_types"], str): + gpu_types = [leaderboard["gpu_types"]] + else: + gpu_types = leaderboard["gpu_types"] + + for gpu_type in gpu_types: + stmt = insert(gpu_type_table).values( + leaderboard_id=leaderboard_id, + gpu_type=gpu_type + ) + self.connection.execute(stmt) - leaderboard_id = self.cursor.fetchone()[0] - - if isinstance(leaderboard["gpu_types"], str): - gpu_types = [leaderboard["gpu_types"]] - else: - gpu_types = leaderboard["gpu_types"] - - for gpu_type in gpu_types: - self.cursor.execute( - """ - INSERT INTO leaderboard.gpu_type (leaderboard_id, gpu_type) - VALUES (%s, %s) - """, - (leaderboard_id, gpu_type), - ) - - self.connection.commit() leaderboard_name_cache.invalidate() # Invalidate autocomplete cache return leaderboard_id - except psycopg2.Error as e: + except Exception as e: logger.exception("Error in leaderboard creation.", e) - if isinstance(e, psycopg2.errors.UniqueViolation): + if "unique" in str(e).lower(): raise KernelBotError( "Error: Tried to create a leaderboard " f'"{leaderboard["name"]}" that already exists.' ) from e - self.connection.rollback() # Ensure rollback if error occurs raise KernelBotError("Error in leaderboard creation.") from e def update_leaderboard(self, name, deadline, task): try: - self.cursor.execute( - """ - UPDATE leaderboard.leaderboard - SET deadline = %s, task = %s - WHERE name = %s; - """, - (deadline, task.to_str(), name), - ) - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + with self.connection.begin(): + stmt = update(leaderboard_table).where( + leaderboard_table.c.name == name + ).values( + deadline=deadline, + task=task.to_str() + ) + self.connection.execute(stmt) + except Exception as e: logger.exception("Error during leaderboard update", exc_info=e) raise KernelBotError("Error during leaderboard update") from e def delete_leaderboard(self, leaderboard_name: str, force: bool = False): try: - if force: - self.cursor.execute( - """ - DELETE FROM leaderboard.runs - WHERE submission_id IN ( - SELECT leaderboard.submission.id - FROM leaderboard.submission - WHERE leaderboard.submission.leaderboard_id IN ( - SELECT leaderboard.leaderboard.id FROM leaderboard.leaderboard - WHERE leaderboard.leaderboard.name = %s + with self.connection.begin(): + if force: + # Get leaderboard ID first + lb_stmt = select(leaderboard_table.c.id).where( + leaderboard_table.c.name == leaderboard_name + ) + lb_result = self.connection.execute(lb_stmt) + lb_row = lb_result.fetchone() + + if lb_row: + leaderboard_id = lb_row[0] + + # Delete runs for this leaderboard's submissions + runs_delete_stmt = delete(runs_table).where( + runs_table.c.submission_id.in_( + select(submission_table.c.id).where( + submission_table.c.leaderboard_id == leaderboard_id + ) + ) ) - ); -""", - (leaderboard_name,), - ) - self.cursor.execute( - """ - DELETE FROM leaderboard.submission - USING leaderboard.leaderboard - WHERE leaderboard.submission.leaderboard_id = leaderboard.leaderboard.id - AND leaderboard.leaderboard.name = %s; - """, - (leaderboard_name,), - ) + self.connection.execute(runs_delete_stmt) + + # Delete submissions for this leaderboard + submissions_delete_stmt = delete(submission_table).where( + submission_table.c.leaderboard_id == leaderboard_id + ) + self.connection.execute(submissions_delete_stmt) - self.cursor.execute( - """ - DELETE FROM leaderboard.leaderboard WHERE name = %s - """, - (leaderboard_name,), - ) - self.connection.commit() + # Delete the leaderboard itself + leaderboard_delete_stmt = delete(leaderboard_table).where( + leaderboard_table.c.name == leaderboard_name + ) + self.connection.execute(leaderboard_delete_stmt) + leaderboard_name_cache.invalidate() # Invalidate autocomplete cache - except psycopg2.Error as e: - self.connection.rollback() + except Exception as e: logger.exception("Could not delete leaderboard %s.", leaderboard_name, exc_info=e) raise KernelBotError(f"Could not delete leaderboard {leaderboard_name}.") from e @@ -221,77 +277,69 @@ def create_submission( user_name: str = None, ) -> Optional[int]: try: - # check if we already have the code - self.cursor.execute( - """ - SELECT id, code - FROM leaderboard.code_files - WHERE hash = encode(sha256(%s::bytea), 'hex') - """, - (code,), - ) - - code_id = None - for candidate in self.cursor.fetchall(): - if candidate[1] == code: - code_id = candidate[0] - break - - if code_id is None: - # a genuinely new submission - self.cursor.execute( - """ - INSERT INTO leaderboard.code_files (CODE) - VALUES (%s) - RETURNING id - """, - (code,), + with self.connection.begin(): + # Check if we already have the code by hash + # Note: We'll use a simple approach since SQLAlchemy Core doesn't have direct sha256 + hash_stmt = select(code_files_table.c.id, code_files_table.c.code).where( + text("hash = encode(sha256(:code::bytea), 'hex')") + ).params(code=code) + + hash_result = self.connection.execute(hash_stmt) + code_id = None + + for candidate in hash_result.fetchall(): + if candidate[1] == code: + code_id = candidate[0] + break + + if code_id is None: + # A genuinely new submission - insert new code + code_stmt = insert(code_files_table).values(code=code).returning(code_files_table.c.id) + code_result = self.connection.execute(code_stmt) + code_id = code_result.fetchone()[0] + + # Check if user exists in user_info, if not add them + user_check_stmt = select(user_info_table.c.id).where( + user_info_table.c.id == str(user_id) ) - code_id = self.cursor.fetchone() - # Check if user exists in user_info, if not add them - self.cursor.execute( - """ - SELECT 1 FROM leaderboard.user_info WHERE id = %s - """, - (str(user_id),), - ) - if not self.cursor.fetchone(): - self.cursor.execute( - """ - INSERT INTO leaderboard.user_info (id, user_name) - VALUES (%s, %s) - """, - (str(user_id), user_name), + user_result = self.connection.execute(user_check_stmt) + + if not user_result.fetchone(): + user_stmt = insert(user_info_table).values( + id=str(user_id), + user_name=user_name + ) + self.connection.execute(user_stmt) + + # Get leaderboard ID + lb_stmt = select(leaderboard_table.c.id).where( + leaderboard_table.c.name == leaderboard ) - self.cursor.execute( - """ - INSERT INTO leaderboard.submission (leaderboard_id, file_name, - user_id, code_id, submission_time) - VALUES ( - (SELECT id FROM leaderboard.leaderboard WHERE name = %s), - %s, %s, %s, %s) - RETURNING id - """, - ( - leaderboard, - file_name, - user_id, - code_id, - time, - ), - ) - submission_id = self.cursor.fetchone()[0] - assert submission_id is not None - self.connection.commit() - return submission_id - except psycopg2.Error as e: + lb_result = self.connection.execute(lb_stmt) + leaderboard_id = lb_result.fetchone()[0] + + # Insert submission + submission_stmt = insert(submission_table).values( + leaderboard_id=leaderboard_id, + file_name=file_name, + user_id=user_id, + code_id=code_id, + submission_time=time + ).returning(submission_table.c.id) + + submission_result = self.connection.execute(submission_stmt) + submission_id = submission_result.fetchone()[0] + + assert submission_id is not None + return submission_id + + except Exception as e: logger.error( "Error during creation of submission for leaderboard '%s' by user '%s'", leaderboard, user_id, exc_info=e, ) - self.connection.rollback() # Ensure rollback if error occurs raise KernelBotError("Error during creation of submission") from e def mark_submission_done( @@ -299,18 +347,13 @@ def mark_submission_done( submission: int, ) -> Optional[int]: try: - self.cursor.execute( - """ - UPDATE leaderboard.submission - SET done = TRUE - WHERE id = %s - """, - (submission,), - ) - self.connection.commit() - except psycopg2.Error as e: + with self.connection.begin(): + stmt = update(submission_table).where( + submission_table.c.id == submission + ).values(done=True) + self.connection.execute(stmt) + except Exception as e: logger.error("Could not mark submission '%s' as done.", submission, exc_info=e) - self.connection.rollback() # Ensure rollback if error occurs raise KernelBotError("Error while finalizing submission") from e def create_submission_run( @@ -327,37 +370,33 @@ def create_submission_run( system: SystemInfo, ): try: - if compilation is not None: - compilation = json.dumps(dataclasses.asdict(compilation)) - - meta = { - k: result.__dict__[k] - for k in ["stdout", "stderr", "success", "exit_code", "command", "duration"] - } - self.cursor.execute( - """ - INSERT INTO leaderboard.runs (submission_id, start_time, end_time, mode, - secret, runner, score, passed, compilation, meta, result, system_info + with self.connection.begin(): + compilation_json = None + if compilation is not None: + compilation_json = json.dumps(dataclasses.asdict(compilation)) + + meta = { + k: result.__dict__[k] + for k in ["stdout", "stderr", "success", "exit_code", "command", "duration"] + } + + stmt = insert(runs_table).values( + submission_id=submission, + start_time=start, + end_time=end, + mode=mode, + secret=secret, + runner=runner, + score=score, + passed=result.passed, + compilation=compilation_json, + meta=json.dumps(meta), + result=json.dumps(result.result), + system_info=json.dumps(dataclasses.asdict(system)) ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """, - ( - submission, - start, - end, - mode, - secret, - runner, - score, - result.passed, - compilation, - json.dumps(meta), - json.dumps(result.result), - json.dumps(dataclasses.asdict(system)), - ), - ) - self.connection.commit() - except psycopg2.Error as e: + self.connection.execute(stmt) + + except Exception as e: logger.exception( "Error during adding %s run on %s for submission '%s'", mode, @@ -365,29 +404,32 @@ def create_submission_run( submission, exc_info=e, ) - self.connection.rollback() # Ensure rollback if error occurs raise KernelBotError("Could not create leaderboard submission entry in database") from e def get_leaderboard_names(self) -> list[str]: - self.cursor.execute("SELECT name FROM leaderboard.leaderboard") - return [x[0] for x in self.cursor.fetchall()] + stmt = select(leaderboard_table.c.name) + result = self.connection.execute(stmt) + return [row[0] for row in result.fetchall()] def get_leaderboards(self) -> list[LeaderboardItem]: - self.cursor.execute( - """ - SELECT id, name, deadline, task, creator_id - FROM leaderboard.leaderboard - """ + stmt = select( + leaderboard_table.c.id, + leaderboard_table.c.name, + leaderboard_table.c.deadline, + leaderboard_table.c.task, + leaderboard_table.c.creator_id ) - - lbs = self.cursor.fetchall() + result = self.connection.execute(stmt) + lbs = result.fetchall() + leaderboards = [] - for lb in lbs: - self.cursor.execute( - "SELECT * from leaderboard.gpu_type WHERE leaderboard_id = %s", [lb[0]] + # Get GPU types for this leaderboard + gpu_stmt = select(gpu_type_table.c.gpu_type).where( + gpu_type_table.c.leaderboard_id == lb[0] ) - gpu_types = [x[1] for x in self.cursor.fetchall()] + gpu_result = self.connection.execute(gpu_stmt) + gpu_types = [row[0] for row in gpu_result.fetchall()] leaderboards.append( LeaderboardItem( @@ -403,37 +445,38 @@ def get_leaderboards(self) -> list[LeaderboardItem]: return leaderboards def get_leaderboard_gpu_types(self, leaderboard_name: str) -> List[str] | None: - self.cursor.execute( - """ - SELECT * - FROM leaderboard.gpu_type - WHERE leaderboard_id = ( - SELECT id - FROM leaderboard.leaderboard - WHERE name = %s - ) - """, - (leaderboard_name,), + # First get the leaderboard ID + lb_stmt = select(leaderboard_table.c.id).where( + leaderboard_table.c.name == leaderboard_name ) - - gpu_types = [x[1] for x in self.cursor.fetchall()] - - if gpu_types: - return gpu_types - else: + lb_result = self.connection.execute(lb_stmt) + lb_row = lb_result.fetchone() + + if not lb_row: return None - - def get_leaderboard(self, leaderboard_name: str) -> LeaderboardItem | None: - self.cursor.execute( - """ - SELECT id, name, deadline, task, creator_id, forum_id, secret_seed - FROM leaderboard.leaderboard - WHERE name = %s - """, - (leaderboard_name,), + + # Then get GPU types + gpu_stmt = select(gpu_type_table.c.gpu_type).where( + gpu_type_table.c.leaderboard_id == lb_row[0] ) + gpu_result = self.connection.execute(gpu_stmt) + gpu_types = [row[0] for row in gpu_result.fetchall()] - res = self.cursor.fetchone() + return gpu_types if gpu_types else None + + def get_leaderboard(self, leaderboard_name: str) -> LeaderboardItem | None: + stmt = select( + leaderboard_table.c.id, + leaderboard_table.c.name, + leaderboard_table.c.deadline, + leaderboard_table.c.task, + leaderboard_table.c.creator_id, + leaderboard_table.c.forum_id, + leaderboard_table.c.secret_seed + ).where(leaderboard_table.c.name == leaderboard_name) + + result = self.connection.execute(stmt) + res = result.fetchone() if res: task = LeaderboardTask.from_dict(res[3]) @@ -458,10 +501,10 @@ def get_leaderboard_submissions( limit: int = None, offset: int = 0, ) -> list[LeaderboardRankedEntry]: - # separate cases, for personal we want all submissions, for general we want best per user + # For complex queries with window functions and CTEs, we'll use text() with SQLAlchemy if user_id: # Query all if user_id (means called from show-personal) - query = """ + query = text(""" SELECT s.file_name, s.id, @@ -475,19 +518,25 @@ def get_leaderboard_submissions( JOIN leaderboard.submission s ON r.submission_id = s.id JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id JOIN leaderboard.user_info ui ON s.user_id = ui.id - WHERE l.name = %s - AND r.runner = %s + WHERE l.name = :leaderboard_name + AND r.runner = :gpu_name AND NOT r.secret AND r.score IS NOT NULL AND r.passed - AND s.user_id = %s + AND s.user_id = :user_id ORDER BY r.score ASC - LIMIT %s OFFSET %s - """ - args = (leaderboard_name, gpu_name, user_id, limit, offset) + LIMIT :limit OFFSET :offset + """) + params = { + 'leaderboard_name': leaderboard_name, + 'gpu_name': gpu_name, + 'user_id': user_id, + 'limit': limit, + 'offset': offset + } else: # Query best submission per user if no user_id (means called from show) - query = """ + query = text(""" WITH best_submissions AS ( SELECT DISTINCT ON (s.user_id) s.id as submission_id, @@ -500,7 +549,7 @@ def get_leaderboard_submissions( JOIN leaderboard.submission s ON r.submission_id = s.id JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id JOIN leaderboard.user_info ui ON s.user_id = ui.id - WHERE l.name = %s AND r.runner = %s AND NOT r.secret + WHERE l.name = :leaderboard_name AND r.runner = :gpu_name AND NOT r.secret AND r.score IS NOT NULL AND r.passed ORDER BY s.user_id, r.score ASC ) @@ -516,11 +565,16 @@ def get_leaderboard_submissions( FROM best_submissions bs JOIN leaderboard.user_info ui ON bs.user_id = ui.id ORDER BY bs.score ASC - LIMIT %s OFFSET %s - """ - args = (leaderboard_name, gpu_name, limit, offset) + LIMIT :limit OFFSET :offset + """) + params = { + 'leaderboard_name': leaderboard_name, + 'gpu_name': gpu_name, + 'limit': limit, + 'offset': offset + } - self.cursor.execute(query, args) + result = self.connection.execute(query, params) return [ LeaderboardRankedEntry( @@ -534,7 +588,7 @@ def get_leaderboard_submissions( leaderboard_name=leaderboard_name, gpu_type=gpu_name, ) - for submission in self.cursor.fetchall() + for submission in result.fetchall() ] def generate_stats(self, last_day: bool): @@ -545,10 +599,9 @@ def generate_stats(self, last_day: bool): raise def _generate_runner_stats(self, last_day: bool = False): - select_expr = "WHERE NOW() - s.submission_time <= interval '24 hours'" if last_day else "" - # per-runner stats - self.cursor.execute( - f""" + where_clause = "WHERE NOW() - s.submission_time <= interval '24 hours'" if last_day else "" + + query = text(f""" SELECT runner, COUNT(*), @@ -559,13 +612,13 @@ def _generate_runner_stats(self, last_day: bool = False): AVG(runs.start_time - s.submission_time), SUM(runs.end_time - runs.start_time) FROM leaderboard.runs JOIN leaderboard.submission s ON submission_id = s.id - {select_expr} + {where_clause} GROUP BY runner; - """ - ) + """) + query_result = self.connection.execute(query) result = {} - for row in self.cursor.fetchall(): + for row in query_result.fetchall(): result[f"num_run.{row[0]}"] = row[1] result[f"runs_passed.{row[0]}"] = row[2] result[f"runs_scored.{row[0]}"] = row[3] @@ -577,19 +630,20 @@ def _generate_runner_stats(self, last_day: bool = False): return result def _generate_submission_stats(self, last_day: bool = False): - select_expr = "WHERE NOW() - submission_time <= interval '24 hours'" if last_day else "" - self.cursor.execute( - f""" + where_clause = "WHERE NOW() - submission_time <= interval '24 hours'" if last_day else "" + + query = text(f""" SELECT COUNT(*), COUNT(*) FILTER (WHERE NOT done), COUNT(DISTINCT user_id) FROM leaderboard.submission - {select_expr} + {where_clause} ; - """ - ) - num_sub, num_sub_wait, num_users = self.cursor.fetchone() + """) + + result = self.connection.execute(query) + num_sub, num_sub_wait, num_users = result.fetchone() return { "num_submissions": num_sub, "sub_waiting": num_sub_wait, @@ -602,17 +656,13 @@ def _generate_stats(self, last_day: bool = False): # code-level stats if not last_day: - self.cursor.execute( - """ - SELECT COUNT(*) FROM leaderboard.code_files; - """ - ) - result["num_unique_codes"] = self.cursor.fetchone()[0] + stmt = select(func.count()).select_from(code_files_table) + count_result = self.connection.execute(stmt) + result["num_unique_codes"] = count_result.fetchone()[0] else: # calculate heavy hitters - self.cursor.execute( - """ + query = text(""" WITH run_durations AS ( SELECT s.user_id AS user_id, @@ -628,73 +678,79 @@ def _generate_stats(self, last_day: bool = False): GROUP BY user_id ORDER BY total DESC LIMIT 10; - """ - ) + """) - for row in self.cursor.fetchall(): + heavy_hitters_result = self.connection.execute(query) + for row in heavy_hitters_result.fetchall(): result[f"total.{row[0]}"] = row[1] return result def get_user_from_id(self, id: str) -> Optional[str]: try: - self.cursor.execute( - """ - SELECT user_name FROM leaderboard.user_info WHERE id = %s - """, - (id,), + stmt = select(user_info_table.c.user_name).where( + user_info_table.c.id == id ) - return self.cursor.fetchone()[0] + result = self.connection.execute(stmt) + row = result.fetchone() + return row[0] if row else None except Exception: return None def delete_submission(self, submission_id: int): try: - # first, the runs - query = """ - DELETE FROM leaderboard.runs - WHERE submission_id = %s - """ - self.cursor.execute(query, (submission_id,)) - - # next, the submission itself - query = """ - DELETE FROM leaderboard.submission - WHERE id = %s - """ - self.cursor.execute(query, (submission_id,)) - - # TODO delete code file? Could be one-to-many mapping, so we'd need - # to figure out if it is used elsewhere first. - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + with self.connection.begin(): + # First, delete the runs + runs_delete_stmt = delete(runs_table).where( + runs_table.c.submission_id == submission_id + ) + self.connection.execute(runs_delete_stmt) + + # Next, delete the submission itself + submission_delete_stmt = delete(submission_table).where( + submission_table.c.id == submission_id + ) + self.connection.execute(submission_delete_stmt) + + # TODO delete code file? Could be one-to-many mapping, so we'd need + # to figure out if it is used elsewhere first. + except Exception as e: logger.exception("Could not delete submission %s.", submission_id, exc_info=e) raise KernelBotError(f"Could not delete submission {submission_id}!") from e def get_submission_by_id(self, submission_id: int) -> Optional[SubmissionItem]: - query = """ + # Get submission details with JOINs + submission_query = text(""" SELECT s.leaderboard_id, lb.name, s.file_name, s.user_id, s.submission_time, s.done, c.code FROM leaderboard.submission s JOIN leaderboard.code_files c ON s.code_id = c.id JOIN leaderboard.leaderboard lb ON s.leaderboard_id = lb.id - WHERE s.id = %s - """ - self.cursor.execute(query, (submission_id,)) - submission = self.cursor.fetchone() + WHERE s.id = :submission_id + """) + + submission_result = self.connection.execute(submission_query, {'submission_id': submission_id}) + submission = submission_result.fetchone() if submission is None: return None - # OK, now get the runs - query = """ - SELECT start_time, end_time, mode, secret, runner, score, - passed, compilation, meta, result, system_info - FROM leaderboard.runs - WHERE submission_id = %s - """ - self.cursor.execute(query, (submission_id,)) - runs = self.cursor.fetchall() + # Get the runs for this submission + runs_stmt = select( + runs_table.c.start_time, + runs_table.c.end_time, + runs_table.c.mode, + runs_table.c.secret, + runs_table.c.runner, + runs_table.c.score, + runs_table.c.passed, + runs_table.c.compilation, + runs_table.c.meta, + runs_table.c.result, + runs_table.c.system_info + ).where(runs_table.c.submission_id == submission_id) + + runs_result = self.connection.execute(runs_stmt) + runs = runs_result.fetchall() runs = [ RunItem( @@ -733,35 +789,35 @@ def get_leaderboard_submission_count( ) -> int: """Get the total count of submissions for a leaderboard""" if user_id: - query = """ + query = text(""" SELECT COUNT(*) FROM leaderboard.runs r JOIN leaderboard.submission s ON r.submission_id = s.id JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id - WHERE l.name = %s - AND r.runner = %s + WHERE l.name = :leaderboard_name + AND r.runner = :gpu_name AND NOT r.secret AND r.score IS NOT NULL AND r.passed - AND s.user_id = %s - """ - args = (leaderboard_name, gpu_name, user_id) + AND s.user_id = :user_id + """) + params = {'leaderboard_name': leaderboard_name, 'gpu_name': gpu_name, 'user_id': user_id} else: - query = """ + query = text(""" SELECT COUNT(DISTINCT s.user_id) FROM leaderboard.runs r JOIN leaderboard.submission s ON r.submission_id = s.id JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id - WHERE l.name = %s - AND r.runner = %s + WHERE l.name = :leaderboard_name + AND r.runner = :gpu_name AND NOT r.secret AND r.score IS NOT NULL AND r.passed - """ - args = (leaderboard_name, gpu_name) + """) + params = {'leaderboard_name': leaderboard_name, 'gpu_name': gpu_name} - self.cursor.execute(query, args) - return self.cursor.fetchone()[0] + result = self.connection.execute(query, params) + return result.fetchone()[0] def init_user_from_cli(self, cli_id: str, auth_provider: str): """ @@ -780,29 +836,27 @@ def init_user_from_cli(self, cli_id: str, auth_provider: str): raise Exception("Invalid auth provider") try: - # Check if cli_id already exists - self.cursor.execute( - """ - SELECT 1 FROM leaderboard.user_info WHERE cli_id = %s - """, - (cli_id,), - ) - - if self.cursor.fetchone(): - raise Exception("CLI ID already exists") - - self.cursor.execute( - """ - INSERT INTO leaderboard.user_info (id, user_name, - cli_id, cli_auth_provider, cli_valid) - VALUES (%s, %s, %s, %s, %s) - """, - (f"temp_{cli_id}", f"temp_user_{cli_id}", cli_id, auth_provider, False), - ) + with self.connection.begin(): + # Check if cli_id already exists + check_stmt = select(user_info_table.c.id).where( + user_info_table.c.cli_id == cli_id + ) + check_result = self.connection.execute(check_stmt) + + if check_result.fetchone(): + raise Exception("CLI ID already exists") + + # Insert temporary user + insert_stmt = insert(user_info_table).values( + id=f"temp_{cli_id}", + user_name=f"temp_user_{cli_id}", + cli_id=cli_id, + cli_auth_provider=auth_provider, + cli_valid=False + ) + self.connection.execute(insert_stmt) - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + except Exception as e: logger.exception("Error initializing user from CLI with ID %s", cli_id, exc_info=e) raise KernelBotError("Error initializing user from CLI") from e @@ -813,89 +867,97 @@ def create_user_from_cli(self, user_id: str, user_name: str, cli_id: str, auth_p are temporary values that need to be updated. """ try: - self.cursor.execute( - """ - SELECT 1 FROM leaderboard.user_info WHERE id = %s - """, - (user_id,), - ) - if self.cursor.fetchone(): - raise Exception( - "User already has a valid account with this User ID." - "Please use the re-register command to re-authenticate." + with self.connection.begin(): + # Check if user_id already exists + user_check_stmt = select(user_info_table.c.id).where( + user_info_table.c.id == user_id ) - - self.cursor.execute( - """ - SELECT cli_valid FROM leaderboard.user_info - WHERE cli_id = %s AND cli_valid = TRUE AND cli_auth_provider = %s - """, - (cli_id, auth_provider), - ) - - if self.cursor.fetchone(): - raise Exception( - "User already has a valid account with this CLI ID." - "Please use the re-register command to re-authenticate." + user_check_result = self.connection.execute(user_check_stmt) + + if user_check_result.fetchone(): + raise Exception( + "User already has a valid account with this User ID." + "Please use the re-register command to re-authenticate." + ) + + # Check if CLI ID already has a valid account + cli_check_stmt = select(user_info_table.c.cli_valid).where( + and_( + user_info_table.c.cli_id == cli_id, + user_info_table.c.cli_valid == True, + user_info_table.c.cli_auth_provider == auth_provider + ) ) + cli_check_result = self.connection.execute(cli_check_stmt) + + if cli_check_result.fetchone(): + raise Exception( + "User already has a valid account with this CLI ID." + "Please use the re-register command to re-authenticate." + ) + + # Update the temporary user to be valid + update_stmt = update(user_info_table).where( + and_( + user_info_table.c.cli_id == cli_id, + user_info_table.c.cli_valid == False + ) + ).values( + id=user_id, + user_name=user_name, + cli_valid=True, + cli_auth_provider=auth_provider + ) + + result = self.connection.execute(update_stmt) + if result.rowcount == 0: + raise Exception("No temporary user found with this CLI ID. No effect.") - self.cursor.execute( - """ - UPDATE leaderboard.user_info - SET id = %s, user_name = %s, cli_valid = TRUE, cli_auth_provider = %s - WHERE cli_id = %s AND cli_valid = FALSE - """, - (user_id, user_name, auth_provider, cli_id), - ) - - if self.cursor.rowcount == 0: - raise Exception("No temporary user found with this CLI ID. No effect.") - - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + except Exception as e: logger.exception("Could not create/update user %s from CLI.", user_id, exc_info=e) raise KernelBotError("Database error while creating/updating user from CLI") from e def reset_user_from_cli(self, user_id: str, cli_id: str, auth_provider: str): try: - self.cursor.execute( - """ - SELECT 1 FROM leaderboard.user_info WHERE id = %s - """, - (user_id,), - ) - if not self.cursor.fetchone(): - raise Exception( - "User not found. Please use the register command to create an account." + with self.connection.begin(): + # Check if user exists + user_check_stmt = select(user_info_table.c.id).where( + user_info_table.c.id == user_id ) + user_check_result = self.connection.execute(user_check_stmt) + + if not user_check_result.fetchone(): + raise Exception( + "User not found. Please use the register command to create an account." + ) + + # Update the user's CLI information + update_stmt = update(user_info_table).where( + user_info_table.c.id == user_id + ).values( + cli_id=cli_id, + cli_auth_provider=auth_provider, + cli_valid=True + ) + self.connection.execute(update_stmt) - self.cursor.execute( - """ - UPDATE leaderboard.user_info - SET cli_id = %s, cli_auth_provider = %s, cli_valid = TRUE - WHERE id = %s - """, - (cli_id, auth_provider, user_id), - ) - - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + except Exception as e: logger.exception("Could not reset user %s from CLI.", user_id, exc_info=e) raise KernelBotError("Database error while resetting user from CLI") from e def cleanup_temp_users(self): try: - self.cursor.execute( - """ - DELETE FROM leaderboard.user_info WHERE cli_valid = FALSE and created_at < - NOW() - INTERVAL '10 minutes' AND id LIKE 'temp_%' AND user_name LIKE 'temp_%' - """ - ) - self.connection.commit() - except psycopg2.Error as e: - self.connection.rollback() + with self.connection.begin(): + cleanup_stmt = delete(user_info_table).where( + and_( + user_info_table.c.cli_valid == False, + text("created_at < NOW() - INTERVAL '10 minutes'"), + user_info_table.c.id.like('temp_%'), + user_info_table.c.user_name.like('temp_%') + ) + ) + self.connection.execute(cleanup_stmt) + except Exception as e: logger.exception("Could not cleanup temp users", exc_info=e) raise KernelBotError("Database error while cleaning up temp users") from e @@ -910,17 +972,19 @@ def validate_cli_id(self, cli_id: str) -> Optional[dict[str, str]]: Optional[str]: The user ID if the CLI ID is valid, otherwise None. """ try: - self.cursor.execute( - """ - SELECT id, user_name FROM leaderboard.user_info - WHERE cli_id = %s AND cli_valid = TRUE - """, - (cli_id,), + stmt = select( + user_info_table.c.id, + user_info_table.c.user_name + ).where( + and_( + user_info_table.c.cli_id == cli_id, + user_info_table.c.cli_valid == True + ) ) - result = self.cursor.fetchone() - return {"user_id": result[0], "user_name": result[1]} if result else None - except psycopg2.Error as e: - self.connection.rollback() + result = self.connection.execute(stmt) + row = result.fetchone() + return {"user_id": row[0], "user_name": row[1]} if row else None + except Exception as e: logger.exception("Error validating CLI ID %s", cli_id, exc_info=e) raise KernelBotError("Error validating CLI ID") from e @@ -941,5 +1005,5 @@ def validate_cli_id(self, cli_id: str) -> Optional[dict[str, str]]: POSTGRES_PASSWORD, POSTGRES_PORT, ) - leaderboard_db.connect() - leaderboard_db.disconnect() + with leaderboard_db: + print("Database connection successful!")