From 6db6c3cc63929e9d0d204de73e55cad98707b2e1 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 11 Sep 2025 12:41:30 +0300 Subject: [PATCH 1/3] Topic workload SLO --- .github/workflows/slo.yml | 15 +- tests/slo/TOPIC_SLO.md | 173 +++++++++++++++++++++ tests/slo/src/metrics.py | 1 + tests/slo/src/options.py | 105 +++++++++---- tests/slo/src/runner.py | 125 +++++++++++++++- tests/slo/src/topic_jobs.py | 289 ++++++++++++++++++++++++++++++++++++ 6 files changed, 670 insertions(+), 38 deletions(-) create mode 100644 tests/slo/TOPIC_SLO.md create mode 100644 tests/slo/src/topic_jobs.py diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 9ea5abd4..7644e3a1 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -30,9 +30,12 @@ jobs: strategy: matrix: - workload: - - sync-table - - sync-query + include: + - prefix: table + workload: sync-table + - prefix: table + workload: sync-query + concurrency: group: slo-${{ github.ref }}-${{ matrix.workload }} @@ -64,14 +67,14 @@ jobs: - name: Prepare SLO Database run: | - python ./tests/slo/src create grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-create grpc://localhost:2135 /Root/testdb - name: Run SLO Tests env: REF: '${{ github.head_ref || github.ref }}' WORKLOAD: '${{ matrix.workload }}' run: | - python ./tests/slo/src run grpc://localhost:2135 /Root/testdb \ + python ./tests/slo/src ${{ matrix.prefix }}-run grpc://localhost:2135 /Root/testdb \ --prom-pgw localhost:9091 \ --report-period 250 \ --time ${{inputs.slo_workload_duration_seconds || 600}} \ @@ -83,7 +86,7 @@ jobs: - if: always() name: Cleanup SLO Database run: | - python ./tests/slo/src cleanup grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-cleanup grpc://localhost:2135 /Root/testdb - if: always() name: Store ydb chaos testing logs diff --git a/tests/slo/TOPIC_SLO.md b/tests/slo/TOPIC_SLO.md new file mode 100644 index 00000000..50d22123 --- /dev/null +++ b/tests/slo/TOPIC_SLO.md @@ -0,0 +1,173 @@ +# YDB Python SDK Topic SLO Tests + +Этот документ описывает как использовать SLO тесты для топиков YDB. + +## Описание + +Topic SLO тесты измеряют производительность операций с топиками YDB: +- **TopicWriter** - запись сообщений в топик +- **TopicReader** - чтение сообщений из топика с коммитом + +Тесты собирают метрики: +- Latency (задержка операций) +- RPS (количество операций в секунду) +- Error rate (процент ошибок) +- Retry attempts (количество попыток повтора) + +## Использование + +### Запуск Topic SLO тестов + +Topic SLO тесты автоматически создают топик перед началом тестирования и удаляют его после завершения: + +```bash +cd tests/slo +python -m src topic-run grpc://localhost:2135 /local +``` + +### Параметры + +#### Основные параметры: +- `endpoint` - YDB endpoint (например: `grpc://localhost:2135`) +- `db` - база данных (например: `/local`) + +#### Параметры для топиков: +- `--topic-path` - путь к топику (по умолчанию: `/local/slo_topic`) +- `--topic-consumer` - имя консьюмера (по умолчанию: `slo_consumer`) +- `--topic-message-size` - размер сообщения в байтах (по умолчанию: 100) + +#### Параметры производительности: +- `--topic-write-rps` - RPS для записи (по умолчанию: 20) +- `--topic-read-rps` - RPS для чтения (по умолчанию: 50) +- `--topic-write-threads` - количество потоков записи (по умолчанию: 2) +- `--topic-read-threads` - количество потоков чтения (по умолчанию: 4) + +#### Таймауты: +- `--topic-write-timeout` - таймаут записи в мс (по умолчанию: 10000) +- `--topic-read-timeout` - таймаут чтения в мс (по умолчанию: 5000) + +#### Временные параметры: +- `--time` - время работы теста в секундах (по умолчанию: 60) +- `--shutdown-time` - время на graceful shutdown в секундах (по умолчанию: 10) + +#### Метрики: +- `--prom-pgw` - Prometheus push gateway (по умолчанию: `localhost:9091`) +- `--report-period` - период отправки метрик в мс (по умолчанию: 1000) + +### Примеры использования + +#### Базовый запуск с настройками по умолчанию: +```bash +python -m src topic-run grpc://localhost:2135 /local +``` + +#### Запуск с повышенной нагрузкой: +```bash +python -m src topic-run grpc://localhost:2135 /local \ + --topic-write-rps 100 \ + --topic-read-rps 200 \ + --topic-write-threads 4 \ + --topic-read-threads 8 \ + --time 300 +``` + +#### Запуск с большими сообщениями: +```bash +python -m src topic-run grpc://localhost:2135 /local \ + --topic-message-size 1024 \ + --topic-write-rps 10 \ + --topic-read-rps 20 +``` + +#### Запуск с кастомным топиком и консьюмером: +```bash +python -m src topic-run grpc://localhost:2135 /local \ + --topic-path /local/my_slo_topic \ + --topic-consumer my_consumer +``` + +## Архитектура + +### Компоненты + +1. **topic_jobs.py** - основная логика для работы с топиками: + - `run_topic_writes()` - цикл записи сообщений + - `run_topic_reads()` - цикл чтения сообщений + - `setup_topic()` - создание топика и консьюмера + - `cleanup_topic()` - очистка топика + +2. **metrics.py** - расширен для поддержки топик метрик: + - `OP_TYPE_TOPIC_WRITE` - метрики записи + - `OP_TYPE_TOPIC_READ` - метрики чтения + +3. **options.py** - добавлена команда `topic-run` с параметрами для топиков + +4. **runner.py** - добавлена функция `run_topic_slo()` для запуска топик тестов + +### Workflow + +1. **Инициализация**: создание топика и консьюмера (если не существуют) +2. **Запуск воркеров**: + - Потоки записи создают и отправляют сообщения + - Потоки чтения получают и коммитят сообщения +3. **Сбор метрик**: все операции измеряются и отправляются в Prometheus +4. **Завершение**: graceful shutdown всех воркеров + +## Метрики + +Топик SLO тесты создают следующие метрики: + +### Counters +- `sdk_operations_total{operation_type="topic_write"}` - общее количество операций записи +- `sdk_operations_total{operation_type="topic_read"}` - общее количество операций чтения +- `sdk_operations_success_total{operation_type="topic_write|topic_read"}` - успешные операции +- `sdk_operations_failure_total{operation_type="topic_write|topic_read"}` - неуспешные операции +- `sdk_errors_total{operation_type="topic_write|topic_read", error_type="ErrorType"}` - ошибки по типам + +### Histograms +- `sdk_operation_latency_seconds{operation_type="topic_write|topic_read", operation_status="success|err"}` - латентность операций + +### Gauges +- `sdk_pending_operations{operation_type="topic_write|topic_read"}` - количество операций в процессе +- `sdk_retry_attempts{operation_type="topic_write|topic_read"}` - количество попыток повтора + +## Мониторинг + +Для мониторинга рекомендуется использовать Grafana с Prometheus. Примеры запросов: + +### RPS записи в топик: +```promql +rate(sdk_operations_total{operation_type="topic_write"}[1m]) +``` + +### RPS чтения из топика: +```promql +rate(sdk_operations_total{operation_type="topic_read"}[1m]) +``` + +### Процент ошибок записи: +```promql +rate(sdk_operations_failure_total{operation_type="topic_write"}[1m]) / +rate(sdk_operations_total{operation_type="topic_write"}[1m]) * 100 +``` + +### 95-й перцентиль латентности чтения: +```promql +histogram_quantile(0.95, rate(sdk_operation_latency_seconds_bucket{operation_type="topic_read", operation_status="success"}[1m])) +``` + +## Troubleshooting + +### Часто встречающиеся проблемы: + +1. **Топик не создается**: проверьте права доступа и корректность пути к топику +2. **Таймауты чтения**: нормально, если нет новых сообщений; увеличьте `--topic-read-timeout` если нужно +3. **Высокий error rate**: проверьте подключение к YDB и лимиты топика +4. **Низкий RPS**: увеличьте количество потоков или RPS лимиты + +### Логи: +Тесты логируют в уровне INFO основные события. Для подробной диагностики включите DEBUG: +```bash +export PYTHONPATH=src +python -c "import logging; logging.basicConfig(level=logging.DEBUG)" -m src topic-run ... +``` diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 2433c730..0fe3b60d 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -10,6 +10,7 @@ from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway # noqa: E402 OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" +OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE = "topic_read", "topic_write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" REF = environ.get("REF", "main") diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 6c24422d..eeda2208 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -7,52 +7,96 @@ def add_common_options(parser): parser.add_argument("-t", "--table-name", default="key_value", help="Table name") -def make_create_parser(subparsers): - create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content") - add_common_options(create_parser) +def make_table_create_parser(subparsers): + table_create_parser = subparsers.add_parser("table-create", help="Create tables and fill with initial content") + add_common_options(table_create_parser) - create_parser.add_argument( + table_create_parser.add_argument( "-p-min", "--min-partitions-count", default=6, type=int, help="Minimum amount of partitions in table" ) - create_parser.add_argument( + table_create_parser.add_argument( "-p-max", "--max-partitions-count", default=1000, type=int, help="Maximum amount of partitions in table" ) - create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") - create_parser.add_argument( + table_create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") + table_create_parser.add_argument( "-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate" ) - create_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + table_create_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") - create_parser.add_argument( + table_create_parser.add_argument( "--batch-size", default=100, type=int, help="Number of new records in each create request" ) - create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") + table_create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") -def make_run_parser(subparsers, name="run"): - run_parser = subparsers.add_parser(name, help="Run measurable workload") - add_common_options(run_parser) +def make_table_run_parser(subparsers): + table_run_parser = subparsers.add_parser("table-run", help="Run table SLO workload") + add_common_options(table_run_parser) - run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - run_parser.add_argument("--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]") + table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") + table_run_parser.add_argument("--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]") - run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - run_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") + table_run_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") - run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") + table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") + table_run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") - run_parser.add_argument("--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway") - run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") + table_run_parser.add_argument("--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway") + table_run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") - run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") + table_run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") + table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") -def make_cleanup_parser(subparsers): - cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables") - add_common_options(cleanup_parser) +def make_topic_run_parser(subparsers): + """Создает парсер для команды topic-run - запуск SLO тестов для топиков""" + topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") + add_common_options(topic_parser) + + topic_parser.add_argument("--topic-read-rps", default=50, type=int, help="Topic read request rps") + topic_parser.add_argument("--topic-read-timeout", default=5000, type=int, help="Topic read timeout [ms]") + topic_parser.add_argument("--topic-write-rps", default=20, type=int, help="Topic write request rps") + topic_parser.add_argument("--topic-write-timeout", default=10000, type=int, help="Topic write timeout [ms]") + topic_parser.add_argument("--topic-read-threads", default=1, type=int, help="Number of threads for topic reading") + topic_parser.add_argument("--topic-write-threads", default=1, type=int, help="Number of threads for topic writing") + topic_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") + topic_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_parser.add_argument("--topic-message-size", default=100, type=int, help="Topic message size in bytes") + topic_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions") + topic_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions") + topic_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours") + + topic_parser.add_argument("--time", default=60, type=int, help="Time to run in seconds") + topic_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") + topic_parser.add_argument("--prom-pgw", default="", type=str, help="Prometheus push gateway (empty to disable)") + topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") + + +def make_topic_create_parser(subparsers): + """Создает парсер для команды topic-create""" + topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") + add_common_options(topic_create_parser) + + topic_create_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") + topic_create_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_create_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions") + topic_create_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions") + topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours") + + +def make_table_cleanup_parser(subparsers): + table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") + add_common_options(table_cleanup_parser) + + +def make_topic_cleanup_parser(subparsers): + """Создает парсер для команды topic-cleanup""" + topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") + add_common_options(topic_cleanup_parser) + + topic_cleanup_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") def get_root_parser(): @@ -67,9 +111,12 @@ def get_root_parser(): help="List of subcommands", ) - make_create_parser(subparsers) - make_run_parser(subparsers) - make_cleanup_parser(subparsers) + make_table_create_parser(subparsers) + make_table_run_parser(subparsers) + make_table_cleanup_parser(subparsers) + make_topic_create_parser(subparsers) + make_topic_run_parser(subparsers) + make_topic_cleanup_parser(subparsers) return parser diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index a636309e..f75cf146 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -14,11 +14,27 @@ run_write_jobs_query, run_metric_job, ) +from topic_jobs import ( + run_topic_write_jobs, + run_topic_read_jobs, + create_topic, + cleanup_topic, +) from metrics import Metrics, WORKLOAD logger = logging.getLogger(__name__) +class DummyMetrics: + """Заглушка для метрик, когда Prometheus отключен""" + + def start(self, labels): + return 0 + + def stop(self, labels, start_time, attempts=1, error=None): + pass + + INSERT_ROWS_TEMPLATE = """ DECLARE $items AS List Date: Thu, 11 Sep 2025 13:21:41 +0300 Subject: [PATCH 2/3] Update slo architecture --- .github/workflows/slo.yml | 35 ++- tests/slo/src/__main__.py | 2 +- tests/slo/src/core/__init__.py | 1 + tests/slo/src/core/dummy_metrics.py | 6 + tests/slo/src/{ => core}/generator.py | 0 tests/slo/src/{ => core}/metrics.py | 0 tests/slo/src/jobs.py | 337 ---------------------- tests/slo/src/jobs/__init__.py | 1 + tests/slo/src/jobs/base.py | 42 +++ tests/slo/src/jobs/table_jobs.py | 325 +++++++++++++++++++++ tests/slo/src/jobs/topic_jobs.py | 118 ++++++++ tests/slo/src/options.py | 25 +- tests/slo/src/runner.py | 294 +++---------------- tests/slo/src/runners/__init__.py | 1 + tests/slo/src/runners/base.py | 29 ++ tests/slo/src/runners/table_runner.py | 32 ++ tests/slo/src/runners/topic_runner.py | 32 ++ tests/slo/src/topic_jobs.py | 289 ------------------- tests/slo/src/workloads/__init__.py | 1 + tests/slo/src/workloads/base.py | 28 ++ tests/slo/src/workloads/table_workload.py | 107 +++++++ tests/slo/src/workloads/topic_workload.py | 102 +++++++ 22 files changed, 913 insertions(+), 894 deletions(-) create mode 100644 tests/slo/src/core/__init__.py create mode 100644 tests/slo/src/core/dummy_metrics.py rename tests/slo/src/{ => core}/generator.py (100%) rename tests/slo/src/{ => core}/metrics.py (100%) delete mode 100644 tests/slo/src/jobs.py create mode 100644 tests/slo/src/jobs/__init__.py create mode 100644 tests/slo/src/jobs/base.py create mode 100644 tests/slo/src/jobs/table_jobs.py create mode 100644 tests/slo/src/jobs/topic_jobs.py create mode 100644 tests/slo/src/runners/__init__.py create mode 100644 tests/slo/src/runners/base.py create mode 100644 tests/slo/src/runners/table_runner.py create mode 100644 tests/slo/src/runners/topic_runner.py delete mode 100644 tests/slo/src/topic_jobs.py create mode 100644 tests/slo/src/workloads/__init__.py create mode 100644 tests/slo/src/workloads/base.py create mode 100644 tests/slo/src/workloads/table_workload.py create mode 100644 tests/slo/src/workloads/topic_workload.py diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 7644e3a1..7e1382ae 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -33,8 +33,30 @@ jobs: include: - prefix: table workload: sync-table + create-args: grpc://localhost:2135 /Root/testdb + run-args: | + grpc://localhost:2135 /Root/testdb \ + --prom-pgw localhost:9091 \ + --report-period 250 \ + --time ${{inputs.slo_workload_duration_seconds || 600}} \ + --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ + --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + --read-timeout 1000 \ + --write-timeout 1000 + cleanup-args: grpc://localhost:2135 /Root/testdb - prefix: table workload: sync-query + create-args: grpc://localhost:2135 /Root/testdb + run-args: | + grpc://localhost:2135 /Root/testdb \ + --prom-pgw localhost:9091 \ + --report-period 250 \ + --time ${{inputs.slo_workload_duration_seconds || 600}} \ + --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ + --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + --read-timeout 1000 \ + --write-timeout 1000 + cleanup-args: grpc://localhost:2135 /Root/testdb concurrency: @@ -67,26 +89,19 @@ jobs: - name: Prepare SLO Database run: | - python ./tests/slo/src ${{ matrix.prefix }}-create grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-create ${{ matrix.create-args }} - name: Run SLO Tests env: REF: '${{ github.head_ref || github.ref }}' WORKLOAD: '${{ matrix.workload }}' run: | - python ./tests/slo/src ${{ matrix.prefix }}-run grpc://localhost:2135 /Root/testdb \ - --prom-pgw localhost:9091 \ - --report-period 250 \ - --time ${{inputs.slo_workload_duration_seconds || 600}} \ - --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ - --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ - --read-timeout 1000 \ - --write-timeout 1000 + python ./tests/slo/src ${{ matrix.prefix }}-run ${{ matrix.run-args }} - if: always() name: Cleanup SLO Database run: | - python ./tests/slo/src ${{ matrix.prefix }}-cleanup grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-cleanup ${{ matrix.cleanup-args }} - if: always() name: Store ydb chaos testing logs diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index a93f59ba..44f5c5db 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -4,7 +4,7 @@ from options import parse_options from runner import run_from_args -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") if __name__ == "__main__": diff --git a/tests/slo/src/core/__init__.py b/tests/slo/src/core/__init__.py new file mode 100644 index 00000000..8729095b --- /dev/null +++ b/tests/slo/src/core/__init__.py @@ -0,0 +1 @@ +# Core utilities diff --git a/tests/slo/src/core/dummy_metrics.py b/tests/slo/src/core/dummy_metrics.py new file mode 100644 index 00000000..451dfdfa --- /dev/null +++ b/tests/slo/src/core/dummy_metrics.py @@ -0,0 +1,6 @@ +class DummyMetrics: + def start(self, labels): + return 0 + + def stop(self, labels, start_time, attempts=1, error=None): + pass diff --git a/tests/slo/src/generator.py b/tests/slo/src/core/generator.py similarity index 100% rename from tests/slo/src/generator.py rename to tests/slo/src/core/generator.py diff --git a/tests/slo/src/metrics.py b/tests/slo/src/core/metrics.py similarity index 100% rename from tests/slo/src/metrics.py rename to tests/slo/src/core/metrics.py diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py deleted file mode 100644 index c9bd7316..00000000 --- a/tests/slo/src/jobs.py +++ /dev/null @@ -1,337 +0,0 @@ -import ydb -import time -import logging -import dataclasses -from random import randint -from typing import Callable, Tuple, Union -from ratelimiter import RateLimiter - -import threading - -from metrics import Metrics, OP_TYPE_WRITE, OP_TYPE_READ - -from generator import RowGenerator - - -READ_QUERY_TEMPLATE = """ -DECLARE $object_id AS Uint64; -SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); -""" - -WRITE_QUERY_TEMPLATE = """ -DECLARE $object_id AS Uint64; -DECLARE $payload_str AS Utf8; -DECLARE $payload_double AS Double; -DECLARE $payload_timestamp AS Timestamp; - -UPSERT INTO `{}` ( - object_id, object_hash, payload_str, payload_double, payload_timestamp -) VALUES ( - $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp -); -""" - - -logger = logging.getLogger(__name__) - - -@dataclasses.dataclass -class RequestParams: - pool: Union[ydb.SessionPool, ydb.QuerySessionPool] - query: str - params: dict - metrics: Metrics - labels: Tuple[str] - request_settings: ydb.BaseRequestSettings - retry_settings: ydb.RetrySettings - check_result_cb: Callable = None - - -def execute_query(params: RequestParams): - attempt = 0 - error = None - - def transaction(session): - nonlocal attempt - attempt += 1 - - result = session.transaction().execute( - params.query, - parameters=params.params, - commit_tx=True, - settings=params.request_settings, - ) - if params.check_result_cb: - params.check_result_cb(result) - - return result - - ts = params.metrics.start(params.labels) - - try: - params.pool.retry_operation_sync(transaction, retry_settings=params.retry_settings) - except ydb.Error as err: - error = err - logger.exception("[labels: %s] Cannot retry error:", params.labels) - except BaseException as err: - error = err - logger.exception("[labels: %s] Unexpected error:", params.labels) - - params.metrics.stop(params.labels, ts, attempts=attempt, error=error) - - -def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start read workload over table service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.SessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - params = {"$object_id": randint(1, max_id)} - with limiter: - - def check_result(result): - assert result[0].rows[0] - - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_READ,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop read workload") - - -def run_read_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs over table service") - - session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) - read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name)) - logger.info("Prepared write query") - - read_limiter = RateLimiter(max_calls=args.read_rps, period=1) - futures = [] - for _ in range(args.read_threads): - future = threading.Thread( - name="slo_run_read", - target=run_reads, - args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start read workload over query service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.QuerySessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)} - with limiter: - - def check_result(result): - with result: - pass - - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_READ,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop read workload") - - -def run_read_jobs_query(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs over query service") - - read_q = READ_QUERY_TEMPLATE.format(tb_name) - - read_limiter = RateLimiter(max_calls=args.read_rps, period=1) - futures = [] - for _ in range(args.read_threads): - future = threading.Thread( - name="slo_run_read_query", - target=run_reads_query, - args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start write workload over table service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.SessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - row = row_generator.get() - params = { - "$object_id": row.object_id, - "$payload_str": row.payload_str, - "$payload_double": row.payload_double, - "$payload_timestamp": row.payload_timestamp, - } - - with limiter: - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_WRITE,), - request_settings=request_settings, - retry_settings=retry_setting, - ) - execute_query(params) - - logger.info("Stop write workload") - - -def run_write_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs over table service") - - session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) - write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name)) - logger.info("Prepared write query") - - write_limiter = RateLimiter(max_calls=args.write_rps, period=1) - row_generator = RowGenerator(max_id) - - futures = [] - for _ in range(args.write_threads): - future = threading.Thread( - name="slo_run_write", - target=run_writes, - args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start write workload over query service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.QuerySessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - row = row_generator.get() - params = { - "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), - "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), - "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), - "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), - } - - def check_result(result): - # we have to close stream by reading it till the end - with result: - pass - - with limiter: - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_WRITE,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop write workload") - - -def run_write_jobs_query(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs for query service") - - write_q = WRITE_QUERY_TEMPLATE.format(tb_name) - - write_limiter = RateLimiter(max_calls=args.write_rps, period=1) - row_generator = RowGenerator(max_id) - - futures = [] - for _ in range(args.write_threads): - future = threading.Thread( - name="slo_run_write_query", - target=run_writes_query, - args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def push_metric(limiter, runtime, metrics): - start_time = time.time() - logger.info("Start push metrics") - - while time.time() - start_time < runtime: - with limiter: - metrics.push() - - logger.info("Stop push metrics") - - -def run_metric_job(args, metrics): - limiter = RateLimiter(max_calls=10**6 // args.report_period, period=1) - future = threading.Thread( - name="slo_run_metrics", - target=push_metric, - args=(limiter, args.time, metrics), - ) - future.start() - return future diff --git a/tests/slo/src/jobs/__init__.py b/tests/slo/src/jobs/__init__.py new file mode 100644 index 00000000..fecdf448 --- /dev/null +++ b/tests/slo/src/jobs/__init__.py @@ -0,0 +1 @@ +# Job modules diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py new file mode 100644 index 00000000..4a52999b --- /dev/null +++ b/tests/slo/src/jobs/base.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +import logging +import threading +import time +from ratelimiter import RateLimiter + +logger = logging.getLogger(__name__) + + +class BaseJobManager(ABC): + def __init__(self, driver, args, metrics): + self.driver = driver + self.args = args + self.metrics = metrics + + @abstractmethod + def run_tests(self): + pass + + def _run_metric_job(self): + if not self.args.prom_pgw: + return [] + + limiter = RateLimiter(max_calls=10**6 // self.args.report_period, period=1) + + future = threading.Thread( + name="slo_metrics_sender", + target=self._metric_sender, + args=(limiter, self.args.time), + ) + future.start() + return [future] + + def _metric_sender(self, limiter, runtime): + start_time = time.time() + logger.info("Start push metrics") + + while time.time() - start_time < runtime: + with limiter: + self.metrics.push() + + logger.info("Stop push metrics") diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py new file mode 100644 index 00000000..7947ba74 --- /dev/null +++ b/tests/slo/src/jobs/table_jobs.py @@ -0,0 +1,325 @@ +import ydb +import time +import logging +import threading +from random import randint +from ratelimiter import RateLimiter + +from .base import BaseJobManager +from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE +from core.generator import RowGenerator + +logger = logging.getLogger(__name__) + +READ_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); +""" + +WRITE_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +DECLARE $payload_str AS Utf8; +DECLARE $payload_double AS Double; +DECLARE $payload_timestamp AS Timestamp; + +UPSERT INTO `{}` ( + object_id, object_hash, payload_str, payload_double, payload_timestamp +) VALUES ( + $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp +); +""" + + +class RequestParams: + def __init__(self, pool, query, params, metrics, labels, request_settings, retry_settings, check_result_cb=None): + self.pool = pool + self.query = query + self.params = params + self.metrics = metrics + self.labels = labels + self.request_settings = request_settings + self.retry_settings = retry_settings + self.check_result_cb = check_result_cb + + +def execute_query(params: RequestParams): + attempt = 0 + error = None + + def transaction(session): + nonlocal attempt + attempt += 1 + + result = session.transaction().execute( + params.query, + parameters=params.params, + commit_tx=True, + settings=params.request_settings, + ) + if params.check_result_cb: + params.check_result_cb(result) + + return result + + ts = params.metrics.start(params.labels) + + try: + params.pool.retry_operation_sync(transaction, retry_settings=params.retry_settings) + except ydb.Error as err: + error = err + logger.exception("[labels: %s] Cannot retry error:", params.labels) + except BaseException as err: + error = err + logger.exception("[labels: %s] Unexpected error:", params.labels) + + params.metrics.stop(params.labels, ts, attempts=attempt, error=error) + + +class TableJobManager(BaseJobManager): + def __init__(self, driver, args, metrics, table_name, max_id): + super().__init__(driver, args, metrics) + self.table_name = table_name + self.max_id = max_id + + from core.metrics import WORKLOAD + + self.workload_type = WORKLOAD + + def run_tests(self): + if self.workload_type == "sync-table": + futures = [ + *self._run_table_read_jobs(), + *self._run_table_write_jobs(), + *self._run_metric_job(), + ] + elif self.workload_type == "sync-query": + futures = [ + *self._run_query_read_jobs(), + *self._run_query_write_jobs(), + *self._run_metric_job(), + ] + else: + raise ValueError(f"Unsupported workload type: {self.workload_type}") + + for future in futures: + future.join() + + def _run_table_read_jobs(self): + logger.info("Start table read jobs") + + session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create()) + read_query = session.prepare(READ_QUERY_TEMPLATE.format(self.table_name)) + + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) + + futures = [] + for i in range(self.args.read_threads): + future = threading.Thread( + name=f"slo_table_read_{i}", + target=self._run_table_reads, + args=(read_query, read_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_table_write_jobs(self): + logger.info("Start table write jobs") + + session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create()) + write_query = session.prepare(WRITE_QUERY_TEMPLATE.format(self.table_name)) + + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) + + futures = [] + for i in range(self.args.write_threads): + future = threading.Thread( + name=f"slo_table_write_{i}", + target=self._run_table_writes, + args=(write_query, write_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_query_read_jobs(self): + logger.info("Start query read jobs") + + read_query = READ_QUERY_TEMPLATE.format(self.table_name) + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) + + futures = [] + for i in range(self.args.read_threads): + future = threading.Thread( + name=f"slo_query_read_{i}", + target=self._run_query_reads, + args=(read_query, read_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_query_write_jobs(self): + logger.info("Start query write jobs") + + write_query = WRITE_QUERY_TEMPLATE.format(self.table_name) + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) + + futures = [] + for i in range(self.args.write_threads): + future = threading.Thread( + name=f"slo_query_write_{i}", + target=self._run_query_writes, + args=(write_query, write_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_table_reads(self, query, limiter): + start_time = time.time() + logger.info("Start table read workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.read_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.read_timeout / 1000, + ) + + with ydb.SessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + params = {"$object_id": randint(1, self.max_id)} + + with limiter: + + def check_result(result): + assert result[0].rows[0] + + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_READ,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop table read workload") + + def _run_table_writes(self, query, limiter): + start_time = time.time() + logger.info("Start table write workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.write_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.write_timeout / 1000, + ) + + row_generator = RowGenerator(self.max_id) + + with ydb.SessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + row = row_generator.get() + params = { + "$object_id": row.object_id, + "$payload_str": row.payload_str, + "$payload_double": row.payload_double, + "$payload_timestamp": row.payload_timestamp, + } + + with limiter: + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_WRITE,), + request_settings=request_settings, + retry_settings=retry_setting, + ) + execute_query(request_params) + + logger.info("Stop table write workload") + + def _run_query_reads(self, query, limiter): + start_time = time.time() + logger.info("Start query read workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.read_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.read_timeout / 1000, + ) + + with ydb.QuerySessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + params = {"$object_id": (randint(1, self.max_id), ydb.PrimitiveType.Uint64)} + + with limiter: + + def check_result(result): + with result: + pass + + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_READ,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop query read workload") + + def _run_query_writes(self, query, limiter): + start_time = time.time() + logger.info("Start query write workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.write_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.write_timeout / 1000, + ) + + row_generator = RowGenerator(self.max_id) + + with ydb.QuerySessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + row = row_generator.get() + params = { + "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), + "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), + "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), + "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), + } + + def check_result(result): + with result: + pass + + with limiter: + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_WRITE,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop query write workload") diff --git a/tests/slo/src/jobs/topic_jobs.py b/tests/slo/src/jobs/topic_jobs.py new file mode 100644 index 00000000..9392449d --- /dev/null +++ b/tests/slo/src/jobs/topic_jobs.py @@ -0,0 +1,118 @@ +import ydb +import time +import logging +import threading +from ratelimiter import RateLimiter + +from .base import BaseJobManager +from core.metrics import OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE + +logger = logging.getLogger(__name__) + + +class TopicJobManager(BaseJobManager): + def run_tests(self): + futures = [ + *self._run_topic_write_jobs(), + *self._run_topic_read_jobs(), + *self._run_metric_job(), + ] + + for future in futures: + future.join() + + def _run_topic_write_jobs(self): + logger.info("Start topic write jobs") + + write_limiter = RateLimiter(max_calls=self.args.topic_write_rps, period=1) + + futures = [] + for i in range(self.args.topic_write_threads): + future = threading.Thread( + name=f"slo_topic_write_{i}", + target=self._run_topic_writes, + args=(write_limiter,), + ) + future.start() + futures.append(future) + + return futures + + def _run_topic_read_jobs(self): + logger.info("Start topic read jobs") + + read_limiter = RateLimiter(max_calls=self.args.topic_read_rps, period=1) + + futures = [] + for i in range(self.args.topic_read_threads): + future = threading.Thread( + name=f"slo_topic_read_{i}", + target=self._run_topic_reads, + args=(read_limiter,), + ) + future.start() + futures.append(future) + + return futures + + def _run_topic_writes(self, limiter): + start_time = time.time() + logger.info("Start topic write workload") + + writer = self.driver.topic_client.writer(self.args.topic_path, codec=ydb.TopicCodec.GZIP) + logger.info("Topic writer created") + + try: + write_session = writer.__enter__() + + message_count = 0 + while time.time() - start_time < self.args.time: + with limiter: + message_count += 1 + + content = f"message_{message_count}_{threading.current_thread().name}".encode("utf-8") + + if len(content) < self.args.topic_message_size: + content += b"x" * (self.args.topic_message_size - len(content)) + + message = ydb.TopicWriterMessage(data=content) + + ts = self.metrics.start((OP_TYPE_TOPIC_WRITE,)) + try: + write_session.write(message) + self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts) + except Exception as e: + self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts, error=e) + logger.error("Write error: %s", e) + + finally: + writer.__exit__(None, None, None) + + logger.info("Stop topic write workload") + + def _run_topic_reads(self, limiter): + start_time = time.time() + logger.info("Start topic read workload") + + reader = self.driver.topic_client.reader(self.args.topic_consumer, self.args.topic_path) + logger.info("Topic reader created") + + try: + read_session = reader.__enter__() + + while time.time() - start_time < self.args.time: + with limiter: + ts = self.metrics.start((OP_TYPE_TOPIC_READ,)) + try: + batch = read_session.receive_message(timeout=self.args.topic_read_timeout / 1000) + if batch is not None: + read_session.commit_offset(batch.batches[-1].message_offset_end) + self.metrics.stop((OP_TYPE_TOPIC_READ,), ts) + except Exception as e: + self.metrics.stop((OP_TYPE_TOPIC_READ,), ts, error=e) + logger.debug("Read timeout or error: %s", e) + + finally: + reader.__exit__(None, None, None) + + logger.info("Stop topic read workload") diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index eeda2208..c4e5eaed 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -22,7 +22,9 @@ def make_table_create_parser(subparsers): "-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate" ) - table_create_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + table_create_parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]" + ) table_create_parser.add_argument( "--batch-size", default=100, type=int, help="Number of new records in each create request" @@ -35,10 +37,14 @@ def make_table_run_parser(subparsers): add_common_options(table_run_parser) table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - table_run_parser.add_argument("--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]") + table_run_parser.add_argument( + "--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]" + ) table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - table_run_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + table_run_parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]" + ) table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") table_run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") @@ -50,6 +56,11 @@ def make_table_run_parser(subparsers): table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") +def make_table_cleanup_parser(subparsers): + table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") + add_common_options(table_cleanup_parser) + + def make_topic_run_parser(subparsers): """Создает парсер для команды topic-run - запуск SLO тестов для топиков""" topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") @@ -75,7 +86,6 @@ def make_topic_run_parser(subparsers): def make_topic_create_parser(subparsers): - """Создает парсер для команды topic-create""" topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") add_common_options(topic_create_parser) @@ -86,13 +96,7 @@ def make_topic_create_parser(subparsers): topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours") -def make_table_cleanup_parser(subparsers): - table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") - add_common_options(table_cleanup_parser) - - def make_topic_cleanup_parser(subparsers): - """Создает парсер для команды topic-cleanup""" topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") add_common_options(topic_cleanup_parser) @@ -114,6 +118,7 @@ def get_root_parser(): make_table_create_parser(subparsers) make_table_run_parser(subparsers) make_table_cleanup_parser(subparsers) + make_topic_create_parser(subparsers) make_topic_run_parser(subparsers) make_topic_cleanup_parser(subparsers) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index f75cf146..3bf8a8a0 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -1,263 +1,63 @@ import ydb import logging +from typing import Dict -from os import path -from generator import batch_generator - -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor - -from jobs import ( - run_read_jobs, - run_write_jobs, - run_read_jobs_query, - run_write_jobs_query, - run_metric_job, -) -from topic_jobs import ( - run_topic_write_jobs, - run_topic_read_jobs, - create_topic, - cleanup_topic, -) -from metrics import Metrics, WORKLOAD +from runners.topic_runner import TopicRunner +from runners.table_runner import TableRunner +from runners.base import BaseRunner logger = logging.getLogger(__name__) -class DummyMetrics: - """Заглушка для метрик, когда Prometheus отключен""" - - def start(self, labels): - return 0 - - def stop(self, labels, start_time, attempts=1, error=None): - pass - - -INSERT_ROWS_TEMPLATE = """ -DECLARE $items AS List>; -UPSERT INTO `{}` -SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp -FROM AS_TABLE($items); -""" - - -def insert_rows(pool, prepared, data, timeout): - def transaction(session: ydb.table.Session): - session.transaction().execute( - prepared, - {"$items": data}, - commit_tx=True, - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - pool.retry_operation_sync(transaction) - logger.info("Insert %s rows", len(data)) - +class SLORunner: + def __init__(self): + self.runners: Dict[str, type(BaseRunner)] = {} -def run_create(args, driver, tb_name): - timeout = args.write_timeout / 1000 + def register_runner(self, prefix: str, runner_cls: type(BaseRunner)): + self.runners[prefix] = runner_cls - def create_table(session): - session.create_table( - tb_name, - ydb.TableDescription() - .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) - .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) - .with_primary_keys("object_hash", "object_id") - .with_uniform_partitions(args.min_partitions_count) - .with_partitioning_settings( - ydb.PartitioningSettings() - .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) - .with_min_partitions_count(args.min_partitions_count) - .with_max_partitions_count(args.max_partitions_count) - .with_partition_size_mb(args.partition_size) - ), - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - return session.prepare(INSERT_ROWS_TEMPLATE.format(tb_name)) - - with ydb.SessionPool(driver) as pool: - prepared = pool.retry_operation_sync(create_table) + def run_command(self, args): + subcommand_parts = args.subcommand.split("-", 1) + if len(subcommand_parts) < 2: + raise ValueError(f"Invalid subcommand format: {args.subcommand}. Expected 'prefix-command'") - futures = set() - with ThreadPoolExecutor(max_workers=args.threads, thread_name_prefix="slo_create") as executor: - for batch in batch_generator(args): - futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) - for f in concurrent.futures.as_completed(futures): - f.result() + prefix, command = subcommand_parts + if prefix not in self.runners: + raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}") - -def run_slo(args, driver, tb_name): - session = driver.table_client.session().create() - result = session.transaction().execute( - "SELECT MAX(`object_id`) as max_id FROM `{}`".format(tb_name), - commit_tx=True, - ) - max_id = result[0].rows[0]["max_id"] - logger.info("Max ID: %s", max_id) - - metrics = Metrics(args.prom_pgw) - if WORKLOAD == "sync-table": - futures = ( - *run_read_jobs(args, driver, tb_name, max_id, metrics), - *run_write_jobs(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), - ) - elif WORKLOAD == "sync-query": - futures = ( - *run_read_jobs_query(args, driver, tb_name, max_id, metrics), - *run_write_jobs_query(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), + runner_instance = self.runners[prefix]() + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, ) - else: - raise ValueError(f"Unsupported service: {WORKLOAD}") - - for future in futures: - future.join() - - metrics.reset() - - -def run_cleanup(args, driver, tb_name): - session = driver.table_client.session().create() - session.drop_table(tb_name) - -def run_topic_create(args, driver): - """ - Создает топик и консьюмера для SLO тестов. - - :param args: аргументы командной строки - :param driver: YDB driver - """ - logger.info("Creating topic for SLO tests") - - create_topic( - driver, - args.topic_path, - args.topic_consumer, - min_partitions=args.topic_min_partitions, - max_partitions=args.topic_max_partitions, - retention_hours=args.topic_retention_hours - ) - - logger.info("Topic creation completed") - - -def run_topic_cleanup(args, driver): - """ - Удаляет топик после SLO тестов. - - :param args: аргументы командной строки - :param driver: YDB driver - """ - logger.info("Cleaning up topic") - - cleanup_topic(driver, args.topic_path) - - logger.info("Topic cleanup completed") - - -def run_topic_slo(args, driver): - """ - Запускает SLO тесты для топиков. - Ожидает, что топик уже создан командой topic-create. - - :param args: аргументы командной строки - :param driver: YDB driver - """ - logger.info("Starting topic SLO test") - - # Проверяем, что топик существует - try: - description = driver.topic_client.describe_topic(args.topic_path) - logger.info("Topic exists: %s", args.topic_path) - - # Проверяем, есть ли консьюмер - consumer_exists = any(c.name == args.topic_consumer for c in description.consumers) - - if not consumer_exists: - logger.error("Consumer '%s' does not exist in topic '%s'", args.topic_consumer, args.topic_path) - logger.error("Please create the topic with consumer first using topic-create command") - raise RuntimeError(f"Consumer '{args.topic_consumer}' not found") - else: - logger.info("Consumer exists: %s", args.topic_consumer) - - except ydb.Error as e: - error_msg = str(e).lower() - if "does not exist" in error_msg: - logger.error("Topic does not exist: %s", args.topic_path) - logger.error("Please create the topic first using topic-create command") - raise RuntimeError(f"Topic '{args.topic_path}' not found") - else: - logger.error("Failed to check topic: %s", e) - raise - - # Создаем объект для сбора метрик (если Prometheus включен) - if args.prom_pgw: - metrics = Metrics(args.prom_pgw) - metric_futures = (run_metric_job(args, metrics),) - else: - logger.info("Prometheus disabled, creating dummy metrics") - metrics = DummyMetrics() - metric_futures = () - - # Запускаем задачи для записи и чтения - futures = ( - *run_topic_write_jobs(args, driver, args.topic_path, metrics), - *run_topic_read_jobs(args, driver, args.topic_path, args.topic_consumer, metrics), - *metric_futures, - ) - - # Ждем завершения всех задач - for future in futures: - future.join() - - # Сбрасываем метрики (если они реальные) - if hasattr(metrics, 'reset'): - metrics.reset() - - logger.info("Topic SLO test completed") + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=300) + try: + runner_instance.set_driver(driver) + if command == "create": + runner_instance.create(args) + elif command == "run": + runner_instance.run(args) + elif command == "cleanup": + runner_instance.cleanup(args) + else: + raise RuntimeError(f"Unknown command {command} for prefix {prefix}") + except BaseException: + logger.exception("Something went wrong") + raise + finally: + driver.stop(timeout=getattr(args, "shutdown_time", 10)) + + +def create_runner() -> SLORunner: + runner = SLORunner() + runner.register_runner("table", TableRunner) + runner.register_runner("topic", TopicRunner) + return runner def run_from_args(args): - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) - - table_name = path.join(args.db, args.table_name) - - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=300) - try: - if args.subcommand == "table-create": - run_create(args, driver, table_name) - elif args.subcommand == "table-run": - run_slo(args, driver, table_name) - elif args.subcommand == "table-cleanup": - run_cleanup(args, driver, table_name) - elif args.subcommand == "topic-create": - run_topic_create(args, driver) - elif args.subcommand == "topic-run": - run_topic_slo(args, driver) - elif args.subcommand == "topic-cleanup": - run_topic_cleanup(args, driver) - else: - raise RuntimeError(f"Unknown command {args.subcommand}") - except BaseException: - logger.exception("Something went wrong") - raise - finally: - driver.stop(timeout=getattr(args, "shutdown_time", 10)) + runner = create_runner() + runner.run_command(args) diff --git a/tests/slo/src/runners/__init__.py b/tests/slo/src/runners/__init__.py new file mode 100644 index 00000000..3f288356 --- /dev/null +++ b/tests/slo/src/runners/__init__.py @@ -0,0 +1 @@ +# Runner modules diff --git a/tests/slo/src/runners/base.py b/tests/slo/src/runners/base.py new file mode 100644 index 00000000..1f9eda2e --- /dev/null +++ b/tests/slo/src/runners/base.py @@ -0,0 +1,29 @@ +import ydb +import logging +from abc import ABC, abstractmethod + + +class BaseRunner(ABC): + def __init__(self): + self.logger = logging.getLogger(self.__class__.__module__) + self.driver = None + + @property + @abstractmethod + def prefix(self) -> str: + pass + + def set_driver(self, driver: ydb.Driver): + self.driver = driver + + @abstractmethod + def create(self, args): + pass + + @abstractmethod + def run(self, args): + pass + + @abstractmethod + def cleanup(self, args): + pass diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py new file mode 100644 index 00000000..973efe19 --- /dev/null +++ b/tests/slo/src/runners/table_runner.py @@ -0,0 +1,32 @@ +from .base import BaseRunner +from workloads.table_workload import TableWorkload +from core.metrics import Metrics +from core.dummy_metrics import DummyMetrics + + +class TableRunner(BaseRunner): + @property + def prefix(self) -> str: + return "table" + + def create(self, args): + workload = TableWorkload(self.driver, args) + workload.create() + + def run(self, args): + workload = TableWorkload(self.driver, args) + + if args.prom_pgw: + metrics = Metrics(args.prom_pgw) + else: + self.logger.info("Prometheus disabled, creating dummy metrics") + metrics = DummyMetrics() + + workload.run_slo(metrics) + + if hasattr(metrics, "reset"): + metrics.reset() + + def cleanup(self, args): + workload = TableWorkload(self.driver, args) + workload.cleanup() diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py new file mode 100644 index 00000000..401fab31 --- /dev/null +++ b/tests/slo/src/runners/topic_runner.py @@ -0,0 +1,32 @@ +from .base import BaseRunner +from workloads.topic_workload import TopicWorkload +from core.metrics import Metrics +from core.dummy_metrics import DummyMetrics + + +class TopicRunner(BaseRunner): + @property + def prefix(self) -> str: + return "topic" + + def create(self, args): + workload = TopicWorkload(self.driver, args) + workload.create() + + def run(self, args): + workload = TopicWorkload(self.driver, args) + + if args.prom_pgw: + metrics = Metrics(args.prom_pgw) + else: + self.logger.info("Prometheus disabled, creating dummy metrics") + metrics = DummyMetrics() + + workload.run_slo(metrics) + + if hasattr(metrics, "reset"): + metrics.reset() + + def cleanup(self, args): + workload = TopicWorkload(self.driver, args) + workload.cleanup() diff --git a/tests/slo/src/topic_jobs.py b/tests/slo/src/topic_jobs.py deleted file mode 100644 index ed135e2f..00000000 --- a/tests/slo/src/topic_jobs.py +++ /dev/null @@ -1,289 +0,0 @@ -import ydb -import time -import logging -import threading -from random import choice -from typing import Union, Optional -from ratelimiter import RateLimiter - -from metrics import Metrics, OP_TYPE_TOPIC_WRITE, OP_TYPE_TOPIC_READ - -logger = logging.getLogger(__name__) - - -def run_topic_writes(driver, topic_path, metrics, limiter, runtime, timeout, message_size=100): - """ - Запускает цикл записи сообщений в топик. - - :param driver: YDB driver - :param topic_path: путь к топику - :param metrics: объект для сбора метрик - :param limiter: лимитер для RPS - :param runtime: время работы в секундах - :param timeout: таймаут операций в секундах - :param message_size: размер сообщения в байтах - """ - start_time = time.time() - logger.info("Start topic write workload") - - with driver.topic_client.writer(topic_path) as writer: - logger.info("Topic writer created") - - while time.time() - start_time < runtime: - # Генерируем сообщение - message_data = "x" * message_size - - with limiter: - ts = metrics.start((OP_TYPE_TOPIC_WRITE,)) - error = None - attempt = 1 - - try: - # Записываем сообщение с подтверждением - writer.write(ydb.TopicWriterMessage(data=message_data), timeout=timeout) - logger.debug("Topic write message: %s", message_data) - except ydb.Error as err: - error = err - logger.debug("Topic write error: %s", err) - except BaseException as err: - error = err - logger.exception("Unexpected topic write error:") - - metrics.stop((OP_TYPE_TOPIC_WRITE,), ts, attempts=attempt, error=error) - - logger.info("Stop topic write workload") - - -def run_topic_reads(driver, topic_path, consumer_name, metrics, limiter, runtime, timeout): - """ - Запускает цикл чтения сообщений из топика. - - :param driver: YDB driver - :param topic_path: путь к топику - :param consumer_name: имя консьюмера - :param metrics: объект для сбора метрик - :param limiter: лимитер для RPS - :param runtime: время работы в секундах - :param timeout: таймаут операций в секундах - """ - start_time = time.time() - logger.info("Start topic read workload") - - with driver.topic_client.reader(topic_path, consumer_name) as reader: - logger.info("Topic reader created") - - while time.time() - start_time < runtime: - with limiter: - ts = metrics.start((OP_TYPE_TOPIC_READ,)) - error = None - attempt = 1 - - try: - # Читаем сообщение с таймаутом - message = reader.receive_message(timeout=timeout) - if message: - logger.debug("Topic read message: %s", message.data.decode()) - # Коммитим сообщение - reader.commit(message) - except ydb.Error as err: - error = err - logger.debug("Topic read error: %s", err) - except TimeoutError: - # Таймаут - нормальная ситуация, не считаем ошибкой - error = None - except BaseException as err: - error = err - logger.exception("Unexpected topic read error:") - - metrics.stop((OP_TYPE_TOPIC_READ,), ts, attempts=attempt, error=error) - - logger.info("Stop topic read workload") - - -def run_topic_write_jobs(args, driver, topic_path, metrics): - """ - Запускает потоки для записи в топик. - - :param args: аргументы командной строки - :param driver: YDB driver - :param topic_path: путь к топику - :param metrics: объект для сбора метрик - :return: список Future объектов - """ - logger.info("Start topic write jobs") - - write_limiter = RateLimiter(max_calls=args.topic_write_rps, period=1) - - futures = [] - for i in range(args.topic_write_threads): - future = threading.Thread( - name=f"slo_topic_write_{i}", - target=run_topic_writes, - args=( - driver, - topic_path, - metrics, - write_limiter, - args.time, - args.topic_write_timeout / 1000, - args.topic_message_size - ), - ) - future.start() - futures.append(future) - - return futures - - -def run_topic_read_jobs(args, driver, topic_path, consumer_name, metrics): - """ - Запускает потоки для чтения из топика. - - :param args: аргументы командной строки - :param driver: YDB driver - :param topic_path: путь к топику - :param consumer_name: имя консьюмера - :param metrics: объект для сбора метрик - :return: список Future объектов - """ - logger.info("Start topic read jobs") - - read_limiter = RateLimiter(max_calls=args.topic_read_rps, period=1) - - futures = [] - for i in range(args.topic_read_threads): - future = threading.Thread( - name=f"slo_topic_read_{i}", - target=run_topic_reads, - args=( - driver, - topic_path, - consumer_name, - metrics, - read_limiter, - args.time, - args.topic_read_timeout / 1000 - ), - ) - future.start() - futures.append(future) - - return futures - - -def create_topic(driver, topic_path, consumer_name, min_partitions=1, max_partitions=10, retention_hours=24): - """ - Создает топик и консьюмера для SLO тестов. - - :param driver: YDB driver - :param topic_path: путь к топику - :param consumer_name: имя консьюмера - :param min_partitions: минимальное количество активных партиций - :param max_partitions: максимальное количество активных партиций - :param retention_hours: время хранения данных в часах - """ - logger.info("Creating topic: %s", topic_path) - - import datetime - - try: - # Создаем топик - driver.topic_client.create_topic( - path=topic_path, - min_active_partitions=min_partitions, - max_active_partitions=max_partitions, - retention_period=datetime.timedelta(hours=retention_hours), - consumers=[consumer_name] - ) - logger.info("Topic created successfully: %s", topic_path) - logger.info("Consumer created: %s", consumer_name) - - except ydb.Error as e: - error_msg = str(e).lower() - if "already exists" in error_msg: - logger.info("Topic already exists: %s", topic_path) - - # Проверяем, есть ли консьюмер - try: - description = driver.topic_client.describe_topic(topic_path) - consumer_exists = any(c.name == consumer_name for c in description.consumers) - - if not consumer_exists: - logger.info("Adding consumer %s to existing topic", consumer_name) - driver.topic_client.alter_topic( - path=topic_path, - add_consumers=[consumer_name] - ) - logger.info("Consumer added successfully: %s", consumer_name) - else: - logger.info("Consumer already exists: %s", consumer_name) - - except Exception as alter_err: - logger.warning("Failed to add consumer: %s", alter_err) - raise - elif "storage pool" in error_msg or "pq" in error_msg: - logger.error("YDB instance does not support topics (PersistentQueues): %s", e) - logger.error("Please use YDB instance with topic support") - raise - else: - logger.error("Failed to create topic: %s", e) - raise - - -def setup_topic(driver, topic_path, consumer_name): - """ - Проверяет существование топика и консьюмера перед запуском SLO тестов. - - :param driver: YDB driver - :param topic_path: путь к топику - :param consumer_name: имя консьюмера - """ - logger.info("Checking topic setup: %s", topic_path) - - try: - description = driver.topic_client.describe_topic(topic_path) - logger.info("Topic exists: %s", topic_path) - - # Проверяем, есть ли консьюмер - consumer_exists = any(c.name == consumer_name for c in description.consumers) - - if consumer_exists: - logger.info("Consumer exists: %s", consumer_name) - else: - logger.error("Consumer '%s' does not exist in topic '%s'", consumer_name, topic_path) - logger.error("Please create the consumer first using topic-create command") - raise RuntimeError(f"Consumer '{consumer_name}' not found") - - except ydb.Error as e: - error_msg = str(e).lower() - if "does not exist" in error_msg: - logger.error("Topic does not exist: %s", topic_path) - logger.error("Please create the topic first using topic-create command") - raise RuntimeError(f"Topic '{topic_path}' not found") - elif "storage pool" in error_msg or "pq" in error_msg: - logger.error("YDB instance does not support topics (PersistentQueues): %s", e) - logger.error("Please use YDB instance with topic support") - raise - else: - logger.error("Failed to check topic: %s", e) - raise - - -def cleanup_topic(driver, topic_path): - """ - Удаляет топик. - - :param driver: YDB driver - :param topic_path: путь к топику - """ - logger.info("Cleaning up topic: %s", topic_path) - - try: - driver.topic_client.drop_topic(topic_path) - logger.info("Topic dropped: %s", topic_path) - except ydb.Error as e: - if "not found" in str(e).lower(): - logger.info("Topic does not exist: %s", topic_path) - else: - logger.error("Failed to drop topic: %s", e) - raise diff --git a/tests/slo/src/workloads/__init__.py b/tests/slo/src/workloads/__init__.py new file mode 100644 index 00000000..c3d8027a --- /dev/null +++ b/tests/slo/src/workloads/__init__.py @@ -0,0 +1 @@ +# Workload modules diff --git a/tests/slo/src/workloads/base.py b/tests/slo/src/workloads/base.py new file mode 100644 index 00000000..963627e9 --- /dev/null +++ b/tests/slo/src/workloads/base.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod +import logging + +logger = logging.getLogger(__name__) + + +class BaseWorkload(ABC): + def __init__(self, driver, args): + self.driver = driver + self.args = args + self.logger = logger + + @abstractmethod + def create(self): + pass + + @abstractmethod + def run_slo(self, metrics): + pass + + @abstractmethod + def cleanup(self): + pass + + @property + @abstractmethod + def name(self) -> str: + pass diff --git a/tests/slo/src/workloads/table_workload.py b/tests/slo/src/workloads/table_workload.py new file mode 100644 index 00000000..b9b61014 --- /dev/null +++ b/tests/slo/src/workloads/table_workload.py @@ -0,0 +1,107 @@ +import ydb +from os import path +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor + +from .base import BaseWorkload +from jobs.table_jobs import TableJobManager +from core.generator import batch_generator + + +class TableWorkload(BaseWorkload): + @property + def name(self) -> str: + return "table" + + def create(self): + table_name = path.join(self.args.db, self.args.table_name) + timeout = self.args.write_timeout / 1000 + + self.logger.info("Creating table: %s", table_name) + + INSERT_ROWS_TEMPLATE = """ + DECLARE $items AS List>; + UPSERT INTO `{}` + SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp + FROM AS_TABLE($items); + """ + + def create_table(session): + session.create_table( + table_name, + ydb.TableDescription() + .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) + .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) + .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) + .with_primary_keys("object_hash", "object_id") + .with_uniform_partitions(self.args.min_partitions_count) + .with_partitioning_settings( + ydb.PartitioningSettings() + .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) + .with_min_partitions_count(self.args.min_partitions_count) + .with_max_partitions_count(self.args.max_partitions_count) + .with_partition_size_mb(self.args.partition_size) + ), + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + return session.prepare(INSERT_ROWS_TEMPLATE.format(table_name)) + + def insert_rows(pool, prepared, data, timeout): + def transaction(session: ydb.table.Session): + session.transaction().execute( + prepared, + {"$items": data}, + commit_tx=True, + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + pool.retry_operation_sync(transaction) + self.logger.info("Insert %s rows", len(data)) + + with ydb.SessionPool(self.driver) as pool: + prepared = pool.retry_operation_sync(create_table) + self.logger.info("Table created: %s", table_name) + + self.logger.info("Filling table with initial data") + futures = set() + with ThreadPoolExecutor(max_workers=self.args.threads, thread_name_prefix="slo_create") as executor: + for batch in batch_generator(self.args): + futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) + for f in concurrent.futures.as_completed(futures): + f.result() + + self.logger.info("Table creation completed") + + def run_slo(self, metrics): + self.logger.info("Starting table SLO tests") + + table_name = path.join(self.args.db, self.args.table_name) + + session = self.driver.table_client.session().create() + result = session.transaction().execute( + "SELECT MAX(`object_id`) as max_id FROM `{}`".format(table_name), + commit_tx=True, + ) + max_id = result[0].rows[0]["max_id"] + self.logger.info("Max ID: %s", max_id) + + job_manager = TableJobManager(self.driver, self.args, metrics, table_name, max_id) + job_manager.run_tests() + + self.logger.info("Table SLO tests completed") + + def cleanup(self): + table_name = path.join(self.args.db, self.args.table_name) + self.logger.info("Cleaning up table: %s", table_name) + + session = self.driver.table_client.session().create() + session.drop_table(table_name) + + self.logger.info("Table dropped: %s", table_name) diff --git a/tests/slo/src/workloads/topic_workload.py b/tests/slo/src/workloads/topic_workload.py new file mode 100644 index 00000000..85bd498f --- /dev/null +++ b/tests/slo/src/workloads/topic_workload.py @@ -0,0 +1,102 @@ +import datetime +import ydb +from .base import BaseWorkload +from jobs.topic_jobs import TopicJobManager + + +class TopicWorkload(BaseWorkload): + @property + def name(self) -> str: + return "topic" + + def create(self): + self.logger.info("Creating topic: %s", self.args.topic_path) + + try: + self.driver.topic_client.create_topic( + path=self.args.topic_path, + min_active_partitions=self.args.topic_min_partitions, + max_active_partitions=self.args.topic_max_partitions, + retention_period=datetime.timedelta(hours=self.args.topic_retention_hours), + consumers=[self.args.topic_consumer], + ) + self.logger.info("Topic created successfully: %s", self.args.topic_path) + self.logger.info("Consumer created: %s", self.args.topic_consumer) + + except ydb.Error as e: + error_msg = str(e).lower() + if "already exists" in error_msg: + self.logger.info("Topic already exists: %s", self.args.topic_path) + + try: + description = self.driver.topic_client.describe_topic(self.args.topic_path) + consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers) + + if not consumer_exists: + self.logger.info("Adding consumer %s to existing topic", self.args.topic_consumer) + self.driver.topic_client.alter_topic( + path=self.args.topic_path, add_consumers=[self.args.topic_consumer] + ) + self.logger.info("Consumer added successfully: %s", self.args.topic_consumer) + else: + self.logger.info("Consumer already exists: %s", self.args.topic_consumer) + + except Exception as alter_err: + self.logger.warning("Failed to add consumer: %s", alter_err) + raise + elif "storage pool" in error_msg or "pq" in error_msg: + self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e) + self.logger.error("Please use YDB instance with topic support") + raise + else: + self.logger.error("Failed to create topic: %s", e) + raise + + def run_slo(self, metrics): + self.logger.info("Starting topic SLO tests") + + self._verify_topic_exists() + + job_manager = TopicJobManager(self.driver, self.args, metrics) + job_manager.run_tests() + + self.logger.info("Topic SLO tests completed") + + def cleanup(self): + self.logger.info("Cleaning up topic: %s", self.args.topic_path) + + try: + self.driver.topic_client.drop_topic(self.args.topic_path) + self.logger.info("Topic dropped: %s", self.args.topic_path) + except ydb.Error as e: + if "not found" in str(e).lower(): + self.logger.info("Topic does not exist: %s", self.args.topic_path) + else: + self.logger.error("Failed to drop topic: %s", e) + raise + + def _verify_topic_exists(self): + try: + description = self.driver.topic_client.describe_topic(self.args.topic_path) + self.logger.info("Topic exists: %s", self.args.topic_path) + + consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers) + + if not consumer_exists: + self.logger.error( + "Consumer '%s' does not exist in topic '%s'", self.args.topic_consumer, self.args.topic_path + ) + self.logger.error("Please create the topic with consumer first using topic-create command") + raise RuntimeError(f"Consumer '{self.args.topic_consumer}' not found") + else: + self.logger.info("Consumer exists: %s", self.args.topic_consumer) + + except ydb.Error as e: + error_msg = str(e).lower() + if "does not exist" in error_msg: + self.logger.error("Topic does not exist: %s", self.args.topic_path) + self.logger.error("Please create the topic first using topic-create command") + raise RuntimeError(f"Topic '{self.args.topic_path}' not found") + else: + self.logger.error("Failed to check topic: %s", e) + raise From dd8ff1d820c3922c96446db01b4c2ddc3b44b614 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 11 Sep 2025 16:19:19 +0300 Subject: [PATCH 3/3] enable topic slo --- .github/workflows/slo.yml | 20 ++ tests/slo/README.md | 142 ++++++++-- tests/slo/TOPIC_SLO.md | 173 ------------ tests/slo/playground/README.md | 40 +++ tests/slo/playground/configs/chaos.sh | 52 ++++ tests/slo/playground/configs/compose.yaml | 283 ++++++++++++++++++++ tests/slo/playground/configs/ydb.yaml | 68 +++++ tests/slo/slo_runner.sh | 8 + tests/slo/src/__main__.py | 8 +- tests/slo/src/core/dummy_metrics.py | 6 - tests/slo/src/core/metrics.py | 44 ++- tests/slo/src/jobs/base.py | 4 +- tests/slo/src/jobs/topic_jobs.py | 73 ++--- tests/slo/src/options.py | 54 ++-- tests/slo/src/{runner.py => root_runner.py} | 0 tests/slo/src/runners/table_runner.py | 109 +++++++- tests/slo/src/runners/topic_runner.py | 86 +++++- tests/slo/src/workloads/__init__.py | 1 - tests/slo/src/workloads/base.py | 28 -- tests/slo/src/workloads/table_workload.py | 107 -------- tests/slo/src/workloads/topic_workload.py | 102 ------- 21 files changed, 872 insertions(+), 536 deletions(-) delete mode 100644 tests/slo/TOPIC_SLO.md create mode 100644 tests/slo/playground/README.md create mode 100755 tests/slo/playground/configs/chaos.sh create mode 100644 tests/slo/playground/configs/compose.yaml create mode 100644 tests/slo/playground/configs/ydb.yaml create mode 100755 tests/slo/slo_runner.sh delete mode 100644 tests/slo/src/core/dummy_metrics.py rename tests/slo/src/{runner.py => root_runner.py} (100%) delete mode 100644 tests/slo/src/workloads/__init__.py delete mode 100644 tests/slo/src/workloads/base.py delete mode 100644 tests/slo/src/workloads/table_workload.py delete mode 100644 tests/slo/src/workloads/topic_workload.py diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 7e1382ae..76053349 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -57,6 +57,26 @@ jobs: --read-timeout 1000 \ --write-timeout 1000 cleanup-args: grpc://localhost:2135 /Root/testdb + # - prefix: topic + # workload: topic-basic + # create-args: | + # grpc://localhost:2135 /Root/testdb \ + # --path /Root/testdb/slo_topic \ + # --partitions-count 10 + # run-args: | + # grpc://localhost:2135 /Root/testdb \ + # --path /Root/testdb/slo_topic \ + # --prom-pgw localhost:9091 \ + # --partitions-count 10 \ + # --read-threads 10 \ + # --write-threads 10 \ + # --report-period 250 \ + # --time ${{inputs.slo_workload_duration_seconds || 600}} \ + # --read-rps ${{inputs.slo_workload_read_max_rps || 100}} \ + # --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + # --read-timeout 5000 \ + # --write-timeout 5000 + # cleanup-args: grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic concurrency: diff --git a/tests/slo/README.md b/tests/slo/README.md index 486cbe99..0cb2070c 100644 --- a/tests/slo/README.md +++ b/tests/slo/README.md @@ -3,42 +3,72 @@ SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network (that is possible situations for distributed DBs with hundreds of nodes) -### Implementations: +### Workload types: + +There are two workload types: + +- **Table SLO** - tests table operations (read/write) +- **Topic SLO** - tests topic operations (publish/consume) -There are two implementations: +### Implementations: - `sync` - `async` (now unimplemented) ### Usage: -It has 3 commands: +Each workload type has 3 commands: + +**Table commands:** +- `table-create` - creates table in database +- `table-cleanup` - drops table in database +- `table-run` - runs table workload (read and write to table with set RPS) -- `create` - creates table in database -- `cleanup` - drops table in database -- `run` - runs workload (read and write to table with sets RPS) +**Topic commands:** +- `topic-create` - creates topic with consumer in database +- `topic-cleanup` - drops topic in database +- `topic-run` - runs topic workload (publish and consume messages with set RPS) ### Run examples with all arguments: -create: -`python tests/slo/src/ create localhost:2136 /local -t tableName +**Table examples:** + +table-create: +`python tests/slo/src/ table-create localhost:2136 /local -t tableName --min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000 --write-timeout 10000` -cleanup: -`python tests/slo/src/ cleanup localhost:2136 /local -t tableName` +table-cleanup: +`python tests/slo/src/ table-cleanup localhost:2136 /local -t tableName` -run: -`python tests/slo/src/ run localhost:2136 /local -t tableName ---prom-pgw http://prometheus-pushgateway:9091 -report-period 250 +table-run: +`python tests/slo/src/ table-run localhost:2136 /local -t tableName +--prom-pgw http://prometheus-pushgateway:9091 --report-period 250 --read-rps 1000 --read-timeout 10000 --write-rps 100 --write-timeout 10000 --time 600 --shutdown-time 30` +**Topic examples:** + +topic-create: +`python tests/slo/src/ topic-create localhost:2136 /local +--topic-path /local/slo_topic --topic-consumer slo_consumer` + +topic-cleanup: +`python tests/slo/src/ topic-cleanup localhost:2136 /local --topic-path /local/slo_topic` + +topic-run: +`python tests/slo/src/ topic-run localhost:2136 /local +--topic-path /local/slo_topic --topic-consumer slo_consumer +--prom-pgw http://prometheus-pushgateway:9091 --report-period 250 +--topic-write-rps 50 --topic-read-rps 100 +--topic-write-timeout 5000 --topic-read-timeout 3000 +--time 600 --shutdown-time 30` + ## Arguments for commands: -### create -`python tests/slo/src/ create [options]` +### table-create +`python tests/slo/src/ table-create [options]` ``` Arguments: @@ -61,8 +91,8 @@ Options: ``` -### cleanup -`python tests/slo/src/ cleanup [options]` +### table-cleanup +`python tests/slo/src/ table-cleanup [options]` ``` Arguments: @@ -73,8 +103,8 @@ Options: -t --table-name table name to create ``` -### run -`python tests/slo/src/ run [options]` +### table-run +`python tests/slo/src/ table-run [options]` ``` Arguments: @@ -100,12 +130,70 @@ Options: --write-threads number of threads to use for read requests ``` +### topic-create +`python tests/slo/src/ topic-create [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path to create + --topic-consumer consumer name + --topic-min-partitions minimum active partitions + --topic-max-partitions maximum active partitions + --topic-retention-hours retention period in hours +``` + +### topic-cleanup +`python tests/slo/src/ topic-cleanup [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path to drop +``` + +### topic-run +`python tests/slo/src/ topic-run [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path + --topic-consumer consumer name + + --prom-pgw prometheus push gateway + --report-period prometheus push period in milliseconds + + --topic-read-rps read RPS for topics + --topic-read-timeout read timeout milliseconds for topics + --topic-write-rps write RPS for topics + --topic-write-timeout write timeout milliseconds for topics + + --topic-message-size message size in bytes + --topic-read-threads number of threads to use for read requests + --topic-write-threads number of threads to use for write requests + + --time run time in seconds + --shutdown-time graceful shutdown time in seconds +``` + ## Authentication Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication. ## What's inside -When running `run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. + +### Table workload +When running `table-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. - `readJob` reads rows from the table one by one with random identifiers generated by writeJob - `writeJob` generates and inserts rows @@ -120,6 +208,18 @@ Table have these fields: Primary key: `("object_hash", "object_id")` +### Topic workload +When running `topic-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. + +- `readJob` reads messages from topic using TopicReader and commits offsets +- `writeJob` generates and publishes messages to topic using TopicWriter +- `metricsJob` periodically sends metrics to Prometheus + +Messages contain: +- Sequential message ID +- Thread identifier +- Configurable payload size (padded with 'x' characters) + ## Collected metrics - `oks` - amount of OK requests - `not_oks` - amount of not OK requests @@ -127,6 +227,8 @@ Primary key: `("object_hash", "object_id")` - `latency` - summary of latencies in ms - `attempts` - summary of amount for request +Metrics are collected for both table operations (`read`, `write`) and topic operations (`read`, `write`). + > You must reset metrics to keep them `0` in prometheus and grafana before beginning and after ending of jobs ## Look at metrics in grafana diff --git a/tests/slo/TOPIC_SLO.md b/tests/slo/TOPIC_SLO.md deleted file mode 100644 index 50d22123..00000000 --- a/tests/slo/TOPIC_SLO.md +++ /dev/null @@ -1,173 +0,0 @@ -# YDB Python SDK Topic SLO Tests - -Этот документ описывает как использовать SLO тесты для топиков YDB. - -## Описание - -Topic SLO тесты измеряют производительность операций с топиками YDB: -- **TopicWriter** - запись сообщений в топик -- **TopicReader** - чтение сообщений из топика с коммитом - -Тесты собирают метрики: -- Latency (задержка операций) -- RPS (количество операций в секунду) -- Error rate (процент ошибок) -- Retry attempts (количество попыток повтора) - -## Использование - -### Запуск Topic SLO тестов - -Topic SLO тесты автоматически создают топик перед началом тестирования и удаляют его после завершения: - -```bash -cd tests/slo -python -m src topic-run grpc://localhost:2135 /local -``` - -### Параметры - -#### Основные параметры: -- `endpoint` - YDB endpoint (например: `grpc://localhost:2135`) -- `db` - база данных (например: `/local`) - -#### Параметры для топиков: -- `--topic-path` - путь к топику (по умолчанию: `/local/slo_topic`) -- `--topic-consumer` - имя консьюмера (по умолчанию: `slo_consumer`) -- `--topic-message-size` - размер сообщения в байтах (по умолчанию: 100) - -#### Параметры производительности: -- `--topic-write-rps` - RPS для записи (по умолчанию: 20) -- `--topic-read-rps` - RPS для чтения (по умолчанию: 50) -- `--topic-write-threads` - количество потоков записи (по умолчанию: 2) -- `--topic-read-threads` - количество потоков чтения (по умолчанию: 4) - -#### Таймауты: -- `--topic-write-timeout` - таймаут записи в мс (по умолчанию: 10000) -- `--topic-read-timeout` - таймаут чтения в мс (по умолчанию: 5000) - -#### Временные параметры: -- `--time` - время работы теста в секундах (по умолчанию: 60) -- `--shutdown-time` - время на graceful shutdown в секундах (по умолчанию: 10) - -#### Метрики: -- `--prom-pgw` - Prometheus push gateway (по умолчанию: `localhost:9091`) -- `--report-period` - период отправки метрик в мс (по умолчанию: 1000) - -### Примеры использования - -#### Базовый запуск с настройками по умолчанию: -```bash -python -m src topic-run grpc://localhost:2135 /local -``` - -#### Запуск с повышенной нагрузкой: -```bash -python -m src topic-run grpc://localhost:2135 /local \ - --topic-write-rps 100 \ - --topic-read-rps 200 \ - --topic-write-threads 4 \ - --topic-read-threads 8 \ - --time 300 -``` - -#### Запуск с большими сообщениями: -```bash -python -m src topic-run grpc://localhost:2135 /local \ - --topic-message-size 1024 \ - --topic-write-rps 10 \ - --topic-read-rps 20 -``` - -#### Запуск с кастомным топиком и консьюмером: -```bash -python -m src topic-run grpc://localhost:2135 /local \ - --topic-path /local/my_slo_topic \ - --topic-consumer my_consumer -``` - -## Архитектура - -### Компоненты - -1. **topic_jobs.py** - основная логика для работы с топиками: - - `run_topic_writes()` - цикл записи сообщений - - `run_topic_reads()` - цикл чтения сообщений - - `setup_topic()` - создание топика и консьюмера - - `cleanup_topic()` - очистка топика - -2. **metrics.py** - расширен для поддержки топик метрик: - - `OP_TYPE_TOPIC_WRITE` - метрики записи - - `OP_TYPE_TOPIC_READ` - метрики чтения - -3. **options.py** - добавлена команда `topic-run` с параметрами для топиков - -4. **runner.py** - добавлена функция `run_topic_slo()` для запуска топик тестов - -### Workflow - -1. **Инициализация**: создание топика и консьюмера (если не существуют) -2. **Запуск воркеров**: - - Потоки записи создают и отправляют сообщения - - Потоки чтения получают и коммитят сообщения -3. **Сбор метрик**: все операции измеряются и отправляются в Prometheus -4. **Завершение**: graceful shutdown всех воркеров - -## Метрики - -Топик SLO тесты создают следующие метрики: - -### Counters -- `sdk_operations_total{operation_type="topic_write"}` - общее количество операций записи -- `sdk_operations_total{operation_type="topic_read"}` - общее количество операций чтения -- `sdk_operations_success_total{operation_type="topic_write|topic_read"}` - успешные операции -- `sdk_operations_failure_total{operation_type="topic_write|topic_read"}` - неуспешные операции -- `sdk_errors_total{operation_type="topic_write|topic_read", error_type="ErrorType"}` - ошибки по типам - -### Histograms -- `sdk_operation_latency_seconds{operation_type="topic_write|topic_read", operation_status="success|err"}` - латентность операций - -### Gauges -- `sdk_pending_operations{operation_type="topic_write|topic_read"}` - количество операций в процессе -- `sdk_retry_attempts{operation_type="topic_write|topic_read"}` - количество попыток повтора - -## Мониторинг - -Для мониторинга рекомендуется использовать Grafana с Prometheus. Примеры запросов: - -### RPS записи в топик: -```promql -rate(sdk_operations_total{operation_type="topic_write"}[1m]) -``` - -### RPS чтения из топика: -```promql -rate(sdk_operations_total{operation_type="topic_read"}[1m]) -``` - -### Процент ошибок записи: -```promql -rate(sdk_operations_failure_total{operation_type="topic_write"}[1m]) / -rate(sdk_operations_total{operation_type="topic_write"}[1m]) * 100 -``` - -### 95-й перцентиль латентности чтения: -```promql -histogram_quantile(0.95, rate(sdk_operation_latency_seconds_bucket{operation_type="topic_read", operation_status="success"}[1m])) -``` - -## Troubleshooting - -### Часто встречающиеся проблемы: - -1. **Топик не создается**: проверьте права доступа и корректность пути к топику -2. **Таймауты чтения**: нормально, если нет новых сообщений; увеличьте `--topic-read-timeout` если нужно -3. **Высокий error rate**: проверьте подключение к YDB и лимиты топика -4. **Низкий RPS**: увеличьте количество потоков или RPS лимиты - -### Логи: -Тесты логируют в уровне INFO основные события. Для подробной диагностики включите DEBUG: -```bash -export PYTHONPATH=src -python -c "import logging; logging.basicConfig(level=logging.DEBUG)" -m src topic-run ... -``` diff --git a/tests/slo/playground/README.md b/tests/slo/playground/README.md new file mode 100644 index 00000000..eefb5cf0 --- /dev/null +++ b/tests/slo/playground/README.md @@ -0,0 +1,40 @@ +# SLO playground + +Playground may be used for testing SLO workloads locally + +It has several services: + +- `prometheus` - storage for metrics +- `prometheus-pushgateway` - push acceptor for prometheus +- `grafana` - provides chats for metrics +- `ydb` - local instance of ydb-database to run workload with + +## Network addresses + +- Grafana dashboard: http://localhost:3000 +- Prometheus pushgateway: http://localhost:9091 +- YDB monitoring: http://localhost:8765 +- YDB GRPC: grpc://localhost:2136 +- YDB GRPC TLS: grpcs://localhost:2135 + +## Start + +```shell +docker-compose up -d +``` + +## Stop + +```shell +docker-compose down +``` + +## Configs + +Grafana's dashboards stored in `configs/grafana/provisioning/dashboards` + +## Data + +YDB databases are not persistent + +All other data like metrics and certs stored in `data/` \ No newline at end of file diff --git a/tests/slo/playground/configs/chaos.sh b/tests/slo/playground/configs/chaos.sh new file mode 100755 index 00000000..550a6740 --- /dev/null +++ b/tests/slo/playground/configs/chaos.sh @@ -0,0 +1,52 @@ +#!/bin/sh -e + +get_random_container() { + # Get a list of all containers starting with ydb-database-* + containers=$(docker ps --format '{{.Names}}' | grep '^ydb-database-') + + # Convert the list to a newline-separated string + containers=$(echo "$containers" | tr ' ' '\n') + + # Count the number of containers + containersCount=$(echo "$containers" | wc -l) + + # Generate a random number between 0 and containersCount - 1 + randomIndex=$(shuf -i 0-$(($containersCount - 1)) -n 1) + + # Get the container name at the random index + nodeForChaos=$(echo "$containers" | sed -n "$(($randomIndex + 1))p") +} + + +sleep 20 + +echo "Start CHAOS YDB cluster!" + +for i in $(seq 1 1000) +do + echo "[$(date)]: docker stop/start iteration $i" + + get_random_container + + sh -c "docker stop ${nodeForChaos} -t 10" + sh -c "docker start ${nodeForChaos}" + + sleep 60 +done + +# for i in $(seq 1 3) +# do +# echo "[$(date)]: docker restart iteration $i" + +# get_random_container + +# sh -c "docker restart ${nodeForChaos} -t 0" + +# sleep 60 +# done + +# get_random_container + +# echo "[$(date)]: docker kill -s SIGKILL ${nodeForChaos}" + +# sh -c "docker kill -s SIGKILL ${nodeForChaos}" \ No newline at end of file diff --git a/tests/slo/playground/configs/compose.yaml b/tests/slo/playground/configs/compose.yaml new file mode 100644 index 00000000..eb09e406 --- /dev/null +++ b/tests/slo/playground/configs/compose.yaml @@ -0,0 +1,283 @@ +x-runtime: &runtime + hostname: localhost + platform: linux/amd64 + privileged: true + network_mode: host + +x-ydb-node: &ydb-node + image: cr.yandex/crptqonuodf51kdj7a7d/ydb:24.4.4.12 + restart: always + <<: *runtime + volumes: + - ./ydb.yaml:/opt/ydb/cfg/config.yaml + +name: ydb + +services: + static-0: + <<: *ydb-node + container_name: ydb-static-0 + command: + - /opt/ydb/bin/ydbd + - server + - --grpc-port + - "2135" + - --mon-port + - "8765" + - --ic-port + - "19001" + - --yaml-config + - /opt/ydb/cfg/config.yaml + - --node + - static + - --label + - deployment=docker + ports: + - 2135:2135 + - 8765:8765 + - 19001:19001 + healthcheck: + test: bash -c "exec 6<> /dev/tcp/localhost/2135" + interval: 10s + timeout: 1s + retries: 3 + start_period: 30s + + static-init: + <<: *ydb-node + restart: on-failure + container_name: ydb-static-init + command: + - /opt/ydb/bin/ydbd + - -s + - grpc://localhost:2135 + - admin + - blobstorage + - config + - init + - --yaml-file + - /opt/ydb/cfg/config.yaml + depends_on: + static-0: + condition: service_healthy + + tenant-init: + <<: *ydb-node + restart: on-failure + container_name: ydb-tenant-init + command: + - /opt/ydb/bin/ydbd + - -s + - grpc://localhost:2135 + - admin + - database + - /Root/testdb + - create + - ssd:1 + depends_on: + static-init: + condition: service_completed_successfully + + database-1: + <<: *ydb-node + container_name: ydb-database-1 + command: + - /opt/ydb/bin/ydbd + - server + - --grpc-port + - "2136" + - --mon-port + - "8766" + - --ic-port + - "19002" + - --yaml-config + - /opt/ydb/cfg/config.yaml + - --tenant + - /Root/testdb + - --node-broker + - grpc://localhost:2135 + - --label + - deployment=docker + ports: + - 2136:2136 + - 8766:8766 + - 19002:19002 + healthcheck: + test: bash -c "exec 6<> /dev/tcp/localhost/2136" + interval: 10s + timeout: 1s + retries: 3 + start_period: 30s + depends_on: + static-0: + condition: service_healthy + static-init: + condition: service_completed_successfully + tenant-init: + condition: service_completed_successfully + + # database-2: + # <<: *ydb-node + # container_name: ydb-database-2 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2137" + # - --mon-port + # - "8767" + # - --ic-port + # - "19003" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2137:2137 + # - 8767:8767 + # - 19003:19003 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2137" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-3: + # <<: *ydb-node + # container_name: ydb-database-3 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2138" + # - --mon-port + # - "8768" + # - --ic-port + # - "19004" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2138:2138 + # - 8768:8768 + # - 19004:19004 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2138" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-4: + # <<: *ydb-node + # container_name: ydb-database-4 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2139" + # - --mon-port + # - "8769" + # - --ic-port + # - "19005" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2139:2139 + # - 8769:8769 + # - 19005:19005 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2139" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-5: + # <<: *ydb-node + # container_name: ydb-database-5 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2140" + # - --mon-port + # - "8770" + # - --ic-port + # - "19006" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2140:2140 + # - 8770:8770 + # - 19006:19006 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2140" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + chaos: + image: docker:latest + restart: on-failure + container_name: ydb-chaos + <<: *runtime + entrypoint: ["/bin/sh", "-c", "chmod +x /opt/ydb/chaos.sh && ls -la /opt/ydb && /opt/ydb/chaos.sh"] + volumes: + - ./chaos.sh:/opt/ydb/chaos.sh + - ./ydb.yaml:/opt/ydb/cfg/config.yaml + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + static-0: + condition: service_healthy \ No newline at end of file diff --git a/tests/slo/playground/configs/ydb.yaml b/tests/slo/playground/configs/ydb.yaml new file mode 100644 index 00000000..eb27585d --- /dev/null +++ b/tests/slo/playground/configs/ydb.yaml @@ -0,0 +1,68 @@ +pqconfig: + require_credentials_in_new_protocol: false + +actor_system_config: + cpu_count: 1 + node_type: STORAGE + use_auto_config: true +blob_storage_config: + service_set: + groups: + - erasure_species: none + rings: + - fail_domains: + - vdisk_locations: + - node_id: 1 + path: SectorMap:1:64 + pdisk_category: SSD +# enable grpc server logs +#log_config: +# entry: +# - component: GRPC_SERVER +# level: 8 +channel_profile_config: + profile: + - channel: + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + profile_id: 0 +domains_config: + domain: + - name: Root + storage_pool_types: + - kind: ssd + pool_config: + box_id: 1 + erasure_species: none + kind: ssd + pdisk_filter: + - property: + - type: SSD + vdisk_kind: Default + state_storage: + - ring: + node: [ 1 ] + nto_select: 1 + ssid: 1 +host_configs: + - drive: + - path: SectorMap:1:64 + type: SSD + host_config_id: 1 +hosts: + - host: localhost + host_config_id: 1 + node_id: 1 + port: 19001 + walle_location: + body: 1 + data_center: az-1 + rack: "0" +static_erasure: none \ No newline at end of file diff --git a/tests/slo/slo_runner.sh b/tests/slo/slo_runner.sh new file mode 100755 index 00000000..d44729e7 --- /dev/null +++ b/tests/slo/slo_runner.sh @@ -0,0 +1,8 @@ +docker compose -f playground/configs/compose.yaml down -v +docker compose -f playground/configs/compose.yaml up -d --wait + +../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic + +../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10 + +../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --debug --time 600 \ No newline at end of file diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index 44f5c5db..dd1ae0b7 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -2,12 +2,14 @@ import logging from options import parse_options -from runner import run_from_args - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") +from root_runner import run_from_args if __name__ == "__main__": args = parse_options() gc.disable() + + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)-8s %(message)s") + run_from_args(args) diff --git a/tests/slo/src/core/dummy_metrics.py b/tests/slo/src/core/dummy_metrics.py deleted file mode 100644 index 451dfdfa..00000000 --- a/tests/slo/src/core/dummy_metrics.py +++ /dev/null @@ -1,6 +0,0 @@ -class DummyMetrics: - def start(self, labels): - return 0 - - def stop(self, labels, start_time, attempts=1, error=None): - pass diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index 0fe3b60d..8751eb2a 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -1,8 +1,10 @@ +from abc import ABC, abstractmethod + import time from contextlib import contextmanager from importlib.metadata import version from collections.abc import Iterable - +import logging from os import environ environ["PROMETHEUS_DISABLE_CREATED_SERIES"] = "True" @@ -10,14 +12,50 @@ from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway # noqa: E402 OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" -OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE = "topic_read", "topic_write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" REF = environ.get("REF", "main") WORKLOAD = environ.get("WORKLOAD", "sync-query") -class Metrics: +logger = logging.getLogger(__name__) + + +class BaseMetrics(ABC): + @abstractmethod + def start(self, labels): + pass + + @abstractmethod + def stop(self, labels, start_time, attempts=1, error=None): + pass + + @abstractmethod + def reset(self): + pass + + +def create_metrics(push_gateway) -> BaseMetrics: + if push_gateway: + logger.info("Creating metrics with push gateway: %s", push_gateway) + return Metrics(push_gateway) + else: + logger.info("Creating dummy metrics") + return DummyMetrics() + + +class DummyMetrics(BaseMetrics): + def start(self, labels): + return 0 + + def stop(self, labels, start_time, attempts=1, error=None): + pass + + def reset(self): + pass + + +class Metrics(BaseMetrics): def __init__(self, push_gateway): self._push_gtw = push_gateway self._registry = CollectorRegistry() diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 4a52999b..6cef5e14 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -4,12 +4,14 @@ import time from ratelimiter import RateLimiter +import ydb + logger = logging.getLogger(__name__) class BaseJobManager(ABC): def __init__(self, driver, args, metrics): - self.driver = driver + self.driver: ydb.Driver = driver self.args = args self.metrics = metrics diff --git a/tests/slo/src/jobs/topic_jobs.py b/tests/slo/src/jobs/topic_jobs.py index 9392449d..3d7af62d 100644 --- a/tests/slo/src/jobs/topic_jobs.py +++ b/tests/slo/src/jobs/topic_jobs.py @@ -5,7 +5,7 @@ from ratelimiter import RateLimiter from .base import BaseJobManager -from core.metrics import OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE +from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE logger = logging.getLogger(__name__) @@ -24,14 +24,17 @@ def run_tests(self): def _run_topic_write_jobs(self): logger.info("Start topic write jobs") - write_limiter = RateLimiter(max_calls=self.args.topic_write_rps, period=1) + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) futures = [] - for i in range(self.args.topic_write_threads): + for i in range(self.args.write_threads): future = threading.Thread( name=f"slo_topic_write_{i}", target=self._run_topic_writes, - args=(write_limiter,), + args=( + write_limiter, + i, + ), ) future.start() futures.append(future) @@ -41,10 +44,10 @@ def _run_topic_write_jobs(self): def _run_topic_read_jobs(self): logger.info("Start topic read jobs") - read_limiter = RateLimiter(max_calls=self.args.topic_read_rps, period=1) + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) futures = [] - for i in range(self.args.topic_read_threads): + for i in range(self.args.read_threads): future = threading.Thread( name=f"slo_topic_read_{i}", target=self._run_topic_reads, @@ -55,15 +58,16 @@ def _run_topic_read_jobs(self): return futures - def _run_topic_writes(self, limiter): + def _run_topic_writes(self, limiter, partition_id=None): start_time = time.time() logger.info("Start topic write workload") - writer = self.driver.topic_client.writer(self.args.topic_path, codec=ydb.TopicCodec.GZIP) - logger.info("Topic writer created") - - try: - write_session = writer.__enter__() + with self.driver.topic_client.writer( + self.args.path, + codec=ydb.TopicCodec.GZIP, + partition_id=partition_id, + ) as writer: + logger.info("Topic writer created") message_count = 0 while time.time() - start_time < self.args.time: @@ -72,47 +76,44 @@ def _run_topic_writes(self, limiter): content = f"message_{message_count}_{threading.current_thread().name}".encode("utf-8") - if len(content) < self.args.topic_message_size: - content += b"x" * (self.args.topic_message_size - len(content)) + if len(content) < self.args.message_size: + content += b"x" * (self.args.message_size - len(content)) message = ydb.TopicWriterMessage(data=content) - ts = self.metrics.start((OP_TYPE_TOPIC_WRITE,)) + ts = self.metrics.start((OP_TYPE_WRITE,)) try: - write_session.write(message) - self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts) + writer.write_with_ack(message) + logger.info("Write message: %s", content) + self.metrics.stop((OP_TYPE_WRITE,), ts) except Exception as e: - self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts, error=e) + self.metrics.stop((OP_TYPE_WRITE,), ts, error=e) logger.error("Write error: %s", e) - finally: - writer.__exit__(None, None, None) - logger.info("Stop topic write workload") def _run_topic_reads(self, limiter): start_time = time.time() logger.info("Start topic read workload") - reader = self.driver.topic_client.reader(self.args.topic_consumer, self.args.topic_path) - logger.info("Topic reader created") - - try: - read_session = reader.__enter__() + with self.driver.topic_client.reader( + self.args.path, + self.args.consumer, + ) as reader: + logger.info("Topic reader created") while time.time() - start_time < self.args.time: with limiter: - ts = self.metrics.start((OP_TYPE_TOPIC_READ,)) + ts = self.metrics.start((OP_TYPE_READ,)) try: - batch = read_session.receive_message(timeout=self.args.topic_read_timeout / 1000) - if batch is not None: - read_session.commit_offset(batch.batches[-1].message_offset_end) - self.metrics.stop((OP_TYPE_TOPIC_READ,), ts) - except Exception as e: - self.metrics.stop((OP_TYPE_TOPIC_READ,), ts, error=e) - logger.debug("Read timeout or error: %s", e) + msg = reader.receive_message() + if msg is not None: + logger.info("Read message: %s", msg.data.decode()) + reader.commit_with_ack(msg) - finally: - reader.__exit__(None, None, None) + self.metrics.stop((OP_TYPE_READ,), ts) + except Exception as e: + self.metrics.stop((OP_TYPE_READ,), ts, error=e) + logger.error("Read error: %s", e) logger.info("Stop topic read workload") diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index c4e5eaed..a634bc89 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -5,6 +5,7 @@ def add_common_options(parser): parser.add_argument("endpoint", help="YDB endpoint") parser.add_argument("db", help="YDB database name") parser.add_argument("-t", "--table-name", default="key_value", help="Table name") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") def make_table_create_parser(subparsers): @@ -61,46 +62,43 @@ def make_table_cleanup_parser(subparsers): add_common_options(table_cleanup_parser) +def make_topic_create_parser(subparsers): + topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") + add_common_options(topic_create_parser) + + topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") + topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") + + def make_topic_run_parser(subparsers): - """Создает парсер для команды topic-run - запуск SLO тестов для топиков""" topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") add_common_options(topic_parser) - topic_parser.add_argument("--topic-read-rps", default=50, type=int, help="Topic read request rps") - topic_parser.add_argument("--topic-read-timeout", default=5000, type=int, help="Topic read timeout [ms]") - topic_parser.add_argument("--topic-write-rps", default=20, type=int, help="Topic write request rps") - topic_parser.add_argument("--topic-write-timeout", default=10000, type=int, help="Topic write timeout [ms]") - topic_parser.add_argument("--topic-read-threads", default=1, type=int, help="Number of threads for topic reading") - topic_parser.add_argument("--topic-write-threads", default=1, type=int, help="Number of threads for topic writing") - topic_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") - topic_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_parser.add_argument("--topic-message-size", default=100, type=int, help="Topic message size in bytes") - topic_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions") - topic_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions") - topic_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours") - - topic_parser.add_argument("--time", default=60, type=int, help="Time to run in seconds") + topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") + topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") + topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps") + topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]") + topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps") + topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]") + topic_parser.add_argument("--read-threads", default=1, type=int, help="Number of threads for topic reading") + topic_parser.add_argument("--write-threads", default=1, type=int, help="Number of threads for topic writing") + topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes") + + topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") topic_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") - topic_parser.add_argument("--prom-pgw", default="", type=str, help="Prometheus push gateway (empty to disable)") + topic_parser.add_argument( + "--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway (empty to disable)" + ) topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") -def make_topic_create_parser(subparsers): - topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") - add_common_options(topic_create_parser) - - topic_create_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") - topic_create_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_create_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions") - topic_create_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions") - topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours") - - def make_topic_cleanup_parser(subparsers): topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") add_common_options(topic_cleanup_parser) - topic_cleanup_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path") + topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") def get_root_parser(): diff --git a/tests/slo/src/runner.py b/tests/slo/src/root_runner.py similarity index 100% rename from tests/slo/src/runner.py rename to tests/slo/src/root_runner.py diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py index 973efe19..797ae02e 100644 --- a/tests/slo/src/runners/table_runner.py +++ b/tests/slo/src/runners/table_runner.py @@ -1,7 +1,12 @@ +import ydb +from os import path +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor + from .base import BaseRunner -from workloads.table_workload import TableWorkload -from core.metrics import Metrics -from core.dummy_metrics import DummyMetrics +from jobs.table_jobs import TableJobManager +from core.metrics import create_metrics +from core.generator import batch_generator class TableRunner(BaseRunner): @@ -10,23 +15,99 @@ def prefix(self) -> str: return "table" def create(self, args): - workload = TableWorkload(self.driver, args) - workload.create() + table_name = path.join(args.db, args.table_name) + timeout = args.write_timeout / 1000 + + self.logger.info("Creating table: %s", table_name) + + INSERT_ROWS_TEMPLATE = """ + DECLARE $items AS List>; + UPSERT INTO `{}` + SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp + FROM AS_TABLE($items); + """ + + def create_table(session): + session.create_table( + table_name, + ydb.TableDescription() + .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) + .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) + .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) + .with_primary_keys("object_hash", "object_id") + .with_uniform_partitions(args.min_partitions_count) + .with_partitioning_settings( + ydb.PartitioningSettings() + .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) + .with_min_partitions_count(args.min_partitions_count) + .with_max_partitions_count(args.max_partitions_count) + .with_partition_size_mb(args.partition_size) + ), + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + return session.prepare(INSERT_ROWS_TEMPLATE.format(table_name)) + + def insert_rows(pool, prepared, data, timeout): + def transaction(session: ydb.table.Session): + session.transaction().execute( + prepared, + {"$items": data}, + commit_tx=True, + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + pool.retry_operation_sync(transaction) + self.logger.info("Insert %s rows", len(data)) + + with ydb.SessionPool(self.driver) as pool: + prepared = pool.retry_operation_sync(create_table) + self.logger.info("Table created: %s", table_name) + + self.logger.info("Filling table with initial data") + futures = set() + with ThreadPoolExecutor(max_workers=args.threads, thread_name_prefix="slo_create") as executor: + for batch in batch_generator(args): + futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) + for f in concurrent.futures.as_completed(futures): + f.result() + + self.logger.info("Table creation completed") def run(self, args): - workload = TableWorkload(self.driver, args) + metrics = create_metrics(args.prom_pgw) + + self.logger.info("Starting table SLO tests") + + table_name = path.join(args.db, args.table_name) - if args.prom_pgw: - metrics = Metrics(args.prom_pgw) - else: - self.logger.info("Prometheus disabled, creating dummy metrics") - metrics = DummyMetrics() + session = self.driver.table_client.session().create() + result = session.transaction().execute( + "SELECT MAX(`object_id`) as max_id FROM `{}`".format(table_name), + commit_tx=True, + ) + max_id = result[0].rows[0]["max_id"] + self.logger.info("Max ID: %s", max_id) - workload.run_slo(metrics) + job_manager = TableJobManager(self.driver, args, metrics, table_name, max_id) + job_manager.run_tests() + + self.logger.info("Table SLO tests completed") if hasattr(metrics, "reset"): metrics.reset() def cleanup(self, args): - workload = TableWorkload(self.driver, args) - workload.cleanup() + table_name = path.join(args.db, args.table_name) + self.logger.info("Cleaning up table: %s", table_name) + + session = self.driver.table_client.session().create() + session.drop_table(table_name) + + self.logger.info("Table dropped: %s", table_name) diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py index 401fab31..7a9be942 100644 --- a/tests/slo/src/runners/topic_runner.py +++ b/tests/slo/src/runners/topic_runner.py @@ -1,7 +1,9 @@ +import time +import ydb + from .base import BaseRunner -from workloads.topic_workload import TopicWorkload -from core.metrics import Metrics -from core.dummy_metrics import DummyMetrics +from jobs.topic_jobs import TopicJobManager +from core.metrics import create_metrics class TopicRunner(BaseRunner): @@ -10,23 +12,79 @@ def prefix(self) -> str: return "topic" def create(self, args): - workload = TopicWorkload(self.driver, args) - workload.create() + retry_no = 0 + while retry_no < 3: + self.logger.info("Creating topic: %s (retry no: %d)", args.path, retry_no) + + try: + self.driver.topic_client.create_topic( + path=args.path, + min_active_partitions=args.partitions_count, + max_active_partitions=args.partitions_count, + consumers=[args.consumer], + ) + + self.logger.info("Topic created successfully: %s", args.path) + self.logger.info("Consumer created: %s", args.consumer) + return + + except ydb.Error as e: + error_msg = str(e).lower() + if "already exists" in error_msg: + self.logger.info("Topic already exists: %s", args.path) + + try: + description = self.driver.topic_client.describe_topic(args.path) + consumer_exists = any(c.name == args.consumer for c in description.consumers) + + if not consumer_exists: + self.logger.info("Adding consumer %s to existing topic", args.consumer) + self.driver.topic_client.alter_topic(path=args.path, add_consumers=[args.consumer]) + self.logger.info("Consumer added successfully: %s", args.consumer) + return + else: + self.logger.info("Consumer already exists: %s", args.consumer) + return + + except Exception as alter_err: + self.logger.warning("Failed to add consumer: %s", alter_err) + raise + elif "storage pool" in error_msg or "pq" in error_msg: + self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e) + self.logger.error("Please use YDB instance with topic support") + raise + elif isinstance(e, ydb.Unavailable): + self.logger.info("YDB instance is not ready, retrying in 5 seconds...") + time.sleep(5) + retry_no += 1 + else: + self.logger.error("Failed to create topic: %s", e) + raise + + raise RuntimeError("Failed to create topic") def run(self, args): - workload = TopicWorkload(self.driver, args) + metrics = create_metrics(args.prom_pgw) - if args.prom_pgw: - metrics = Metrics(args.prom_pgw) - else: - self.logger.info("Prometheus disabled, creating dummy metrics") - metrics = DummyMetrics() + self.logger.info("Starting topic SLO tests") - workload.run_slo(metrics) + job_manager = TopicJobManager(self.driver, args, metrics) + job_manager.run_tests() + + self.logger.info("Topic SLO tests completed") if hasattr(metrics, "reset"): metrics.reset() def cleanup(self, args): - workload = TopicWorkload(self.driver, args) - workload.cleanup() + self.logger.info("Cleaning up topic: %s", args.path) + + try: + self.driver.topic_client.drop_topic(args.path) + self.logger.info("Topic dropped: %s", args.path) + except ydb.Error as e: + if "does not exist" in str(e).lower(): + self.logger.info("Topic does not exist: %s", args.path) + else: + self.logger.error("Failed to drop topic: %s", e) + raise diff --git a/tests/slo/src/workloads/__init__.py b/tests/slo/src/workloads/__init__.py deleted file mode 100644 index c3d8027a..00000000 --- a/tests/slo/src/workloads/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Workload modules diff --git a/tests/slo/src/workloads/base.py b/tests/slo/src/workloads/base.py deleted file mode 100644 index 963627e9..00000000 --- a/tests/slo/src/workloads/base.py +++ /dev/null @@ -1,28 +0,0 @@ -from abc import ABC, abstractmethod -import logging - -logger = logging.getLogger(__name__) - - -class BaseWorkload(ABC): - def __init__(self, driver, args): - self.driver = driver - self.args = args - self.logger = logger - - @abstractmethod - def create(self): - pass - - @abstractmethod - def run_slo(self, metrics): - pass - - @abstractmethod - def cleanup(self): - pass - - @property - @abstractmethod - def name(self) -> str: - pass diff --git a/tests/slo/src/workloads/table_workload.py b/tests/slo/src/workloads/table_workload.py deleted file mode 100644 index b9b61014..00000000 --- a/tests/slo/src/workloads/table_workload.py +++ /dev/null @@ -1,107 +0,0 @@ -import ydb -from os import path -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor - -from .base import BaseWorkload -from jobs.table_jobs import TableJobManager -from core.generator import batch_generator - - -class TableWorkload(BaseWorkload): - @property - def name(self) -> str: - return "table" - - def create(self): - table_name = path.join(self.args.db, self.args.table_name) - timeout = self.args.write_timeout / 1000 - - self.logger.info("Creating table: %s", table_name) - - INSERT_ROWS_TEMPLATE = """ - DECLARE $items AS List>; - UPSERT INTO `{}` - SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp - FROM AS_TABLE($items); - """ - - def create_table(session): - session.create_table( - table_name, - ydb.TableDescription() - .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) - .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) - .with_primary_keys("object_hash", "object_id") - .with_uniform_partitions(self.args.min_partitions_count) - .with_partitioning_settings( - ydb.PartitioningSettings() - .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) - .with_min_partitions_count(self.args.min_partitions_count) - .with_max_partitions_count(self.args.max_partitions_count) - .with_partition_size_mb(self.args.partition_size) - ), - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - return session.prepare(INSERT_ROWS_TEMPLATE.format(table_name)) - - def insert_rows(pool, prepared, data, timeout): - def transaction(session: ydb.table.Session): - session.transaction().execute( - prepared, - {"$items": data}, - commit_tx=True, - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - pool.retry_operation_sync(transaction) - self.logger.info("Insert %s rows", len(data)) - - with ydb.SessionPool(self.driver) as pool: - prepared = pool.retry_operation_sync(create_table) - self.logger.info("Table created: %s", table_name) - - self.logger.info("Filling table with initial data") - futures = set() - with ThreadPoolExecutor(max_workers=self.args.threads, thread_name_prefix="slo_create") as executor: - for batch in batch_generator(self.args): - futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) - for f in concurrent.futures.as_completed(futures): - f.result() - - self.logger.info("Table creation completed") - - def run_slo(self, metrics): - self.logger.info("Starting table SLO tests") - - table_name = path.join(self.args.db, self.args.table_name) - - session = self.driver.table_client.session().create() - result = session.transaction().execute( - "SELECT MAX(`object_id`) as max_id FROM `{}`".format(table_name), - commit_tx=True, - ) - max_id = result[0].rows[0]["max_id"] - self.logger.info("Max ID: %s", max_id) - - job_manager = TableJobManager(self.driver, self.args, metrics, table_name, max_id) - job_manager.run_tests() - - self.logger.info("Table SLO tests completed") - - def cleanup(self): - table_name = path.join(self.args.db, self.args.table_name) - self.logger.info("Cleaning up table: %s", table_name) - - session = self.driver.table_client.session().create() - session.drop_table(table_name) - - self.logger.info("Table dropped: %s", table_name) diff --git a/tests/slo/src/workloads/topic_workload.py b/tests/slo/src/workloads/topic_workload.py deleted file mode 100644 index 85bd498f..00000000 --- a/tests/slo/src/workloads/topic_workload.py +++ /dev/null @@ -1,102 +0,0 @@ -import datetime -import ydb -from .base import BaseWorkload -from jobs.topic_jobs import TopicJobManager - - -class TopicWorkload(BaseWorkload): - @property - def name(self) -> str: - return "topic" - - def create(self): - self.logger.info("Creating topic: %s", self.args.topic_path) - - try: - self.driver.topic_client.create_topic( - path=self.args.topic_path, - min_active_partitions=self.args.topic_min_partitions, - max_active_partitions=self.args.topic_max_partitions, - retention_period=datetime.timedelta(hours=self.args.topic_retention_hours), - consumers=[self.args.topic_consumer], - ) - self.logger.info("Topic created successfully: %s", self.args.topic_path) - self.logger.info("Consumer created: %s", self.args.topic_consumer) - - except ydb.Error as e: - error_msg = str(e).lower() - if "already exists" in error_msg: - self.logger.info("Topic already exists: %s", self.args.topic_path) - - try: - description = self.driver.topic_client.describe_topic(self.args.topic_path) - consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers) - - if not consumer_exists: - self.logger.info("Adding consumer %s to existing topic", self.args.topic_consumer) - self.driver.topic_client.alter_topic( - path=self.args.topic_path, add_consumers=[self.args.topic_consumer] - ) - self.logger.info("Consumer added successfully: %s", self.args.topic_consumer) - else: - self.logger.info("Consumer already exists: %s", self.args.topic_consumer) - - except Exception as alter_err: - self.logger.warning("Failed to add consumer: %s", alter_err) - raise - elif "storage pool" in error_msg or "pq" in error_msg: - self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e) - self.logger.error("Please use YDB instance with topic support") - raise - else: - self.logger.error("Failed to create topic: %s", e) - raise - - def run_slo(self, metrics): - self.logger.info("Starting topic SLO tests") - - self._verify_topic_exists() - - job_manager = TopicJobManager(self.driver, self.args, metrics) - job_manager.run_tests() - - self.logger.info("Topic SLO tests completed") - - def cleanup(self): - self.logger.info("Cleaning up topic: %s", self.args.topic_path) - - try: - self.driver.topic_client.drop_topic(self.args.topic_path) - self.logger.info("Topic dropped: %s", self.args.topic_path) - except ydb.Error as e: - if "not found" in str(e).lower(): - self.logger.info("Topic does not exist: %s", self.args.topic_path) - else: - self.logger.error("Failed to drop topic: %s", e) - raise - - def _verify_topic_exists(self): - try: - description = self.driver.topic_client.describe_topic(self.args.topic_path) - self.logger.info("Topic exists: %s", self.args.topic_path) - - consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers) - - if not consumer_exists: - self.logger.error( - "Consumer '%s' does not exist in topic '%s'", self.args.topic_consumer, self.args.topic_path - ) - self.logger.error("Please create the topic with consumer first using topic-create command") - raise RuntimeError(f"Consumer '{self.args.topic_consumer}' not found") - else: - self.logger.info("Consumer exists: %s", self.args.topic_consumer) - - except ydb.Error as e: - error_msg = str(e).lower() - if "does not exist" in error_msg: - self.logger.error("Topic does not exist: %s", self.args.topic_path) - self.logger.error("Please create the topic first using topic-create command") - raise RuntimeError(f"Topic '{self.args.topic_path}' not found") - else: - self.logger.error("Failed to check topic: %s", e) - raise