Skip to content

Commit 4477c06

Browse files
committed
enable topic slo
1 parent 2e5502c commit 4477c06

File tree

5 files changed

+87
-78
lines changed

5 files changed

+87
-78
lines changed

.github/workflows/slo.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ jobs:
5757
--read-timeout 1000 \
5858
--write-timeout 1000
5959
cleanup-args: grpc://localhost:2135 /Root/testdb
60+
- prefix: topic
61+
workload: topic-basic
62+
create-args: grpc://localhost:2135 /Root/testdb --topic-path /Root/testdb/slo_topic
63+
run-args: grpc://localhost:2135 /Root/testdb --topic-path /Root/testdb/slo_topic --prom-pgw localhost:9091 --report-period 250
64+
cleanup-args: grpc://localhost:2135 /Root/testdb --topic-path /Root/testdb/slo_topic
6065

6166

6267
concurrency:

tests/slo/src/core/metrics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway # noqa: E402
1111

1212
OP_TYPE_READ, OP_TYPE_WRITE = "read", "write"
13-
OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE = "topic_read", "topic_write"
1413
OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err"
1514

1615
REF = environ.get("REF", "main")

tests/slo/src/jobs/topic_jobs.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from ratelimiter import RateLimiter
66

77
from .base import BaseJobManager
8-
from core.metrics import OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE
8+
from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE
99

1010
logger = logging.getLogger(__name__)
1111

@@ -59,11 +59,8 @@ def _run_topic_writes(self, limiter):
5959
start_time = time.time()
6060
logger.info("Start topic write workload")
6161

62-
writer = self.driver.topic_client.writer(self.args.topic_path, codec=ydb.TopicCodec.GZIP)
63-
logger.info("Topic writer created")
64-
65-
try:
66-
write_session = writer.__enter__()
62+
with self.driver.topic_client.writer(self.args.topic_path, codec=ydb.TopicCodec.GZIP) as writer:
63+
logger.info("Topic writer created")
6764

6865
message_count = 0
6966
while time.time() - start_time < self.args.time:
@@ -77,42 +74,37 @@ def _run_topic_writes(self, limiter):
7774

7875
message = ydb.TopicWriterMessage(data=content)
7976

80-
ts = self.metrics.start((OP_TYPE_TOPIC_WRITE,))
77+
ts = self.metrics.start((OP_TYPE_WRITE,))
8178
try:
82-
write_session.write(message)
83-
self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts)
79+
writer.write(message)
80+
if message_count % 100 == 0:
81+
writer.flush()
82+
83+
self.metrics.stop((OP_TYPE_WRITE,), ts)
8484
except Exception as e:
85-
self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts, error=e)
85+
self.metrics.stop((OP_TYPE_WRITE,), ts, error=e)
8686
logger.error("Write error: %s", e)
8787

88-
finally:
89-
writer.__exit__(None, None, None)
9088

9189
logger.info("Stop topic write workload")
9290

9391
def _run_topic_reads(self, limiter):
9492
start_time = time.time()
9593
logger.info("Start topic read workload")
9694

97-
reader = self.driver.topic_client.reader(self.args.topic_consumer, self.args.topic_path)
98-
logger.info("Topic reader created")
99-
100-
try:
101-
read_session = reader.__enter__()
95+
with self.driver.topic_client.reader(self.args.topic_consumer, self.args.topic_path) as reader:
96+
logger.info("Topic reader created")
10297

10398
while time.time() - start_time < self.args.time:
10499
with limiter:
105-
ts = self.metrics.start((OP_TYPE_TOPIC_READ,))
100+
ts = self.metrics.start((OP_TYPE_READ,))
106101
try:
107-
batch = read_session.receive_message(timeout=self.args.topic_read_timeout / 1000)
108-
if batch is not None:
109-
read_session.commit_offset(batch.batches[-1].message_offset_end)
110-
self.metrics.stop((OP_TYPE_TOPIC_READ,), ts)
102+
msg = reader.receive_message(timeout=self.args.topic_read_timeout / 1000)
103+
if msg is not None:
104+
reader.commit(msg)
105+
self.metrics.stop((OP_TYPE_READ,), ts)
111106
except Exception as e:
112-
self.metrics.stop((OP_TYPE_TOPIC_READ,), ts, error=e)
107+
self.metrics.stop((OP_TYPE_READ,), ts, error=e)
113108
logger.debug("Read timeout or error: %s", e)
114109

115-
finally:
116-
reader.__exit__(None, None, None)
117-
118110
logger.info("Stop topic read workload")

tests/slo/src/options.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,29 @@ def make_table_cleanup_parser(subparsers):
6161
add_common_options(table_cleanup_parser)
6262

6363

64+
def make_topic_create_parser(subparsers):
65+
topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer")
66+
add_common_options(topic_create_parser)
67+
68+
topic_create_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
69+
topic_create_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name")
70+
topic_create_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions")
71+
topic_create_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions")
72+
topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours")
73+
74+
6475
def make_topic_run_parser(subparsers):
6576
"""Создает парсер для команды topic-run - запуск SLO тестов для топиков"""
6677
topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload")
6778
add_common_options(topic_parser)
6879

80+
topic_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
6981
topic_parser.add_argument("--topic-read-rps", default=50, type=int, help="Topic read request rps")
7082
topic_parser.add_argument("--topic-read-timeout", default=5000, type=int, help="Topic read timeout [ms]")
7183
topic_parser.add_argument("--topic-write-rps", default=20, type=int, help="Topic write request rps")
7284
topic_parser.add_argument("--topic-write-timeout", default=10000, type=int, help="Topic write timeout [ms]")
7385
topic_parser.add_argument("--topic-read-threads", default=1, type=int, help="Number of threads for topic reading")
7486
topic_parser.add_argument("--topic-write-threads", default=1, type=int, help="Number of threads for topic writing")
75-
topic_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
7687
topic_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name")
7788
topic_parser.add_argument("--topic-message-size", default=100, type=int, help="Topic message size in bytes")
7889
topic_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions")
@@ -85,17 +96,6 @@ def make_topic_run_parser(subparsers):
8596
topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]")
8697

8798

88-
def make_topic_create_parser(subparsers):
89-
topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer")
90-
add_common_options(topic_create_parser)
91-
92-
topic_create_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
93-
topic_create_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name")
94-
topic_create_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions")
95-
topic_create_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions")
96-
topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours")
97-
98-
9999
def make_topic_cleanup_parser(subparsers):
100100
topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic")
101101
add_common_options(topic_cleanup_parser)

tests/slo/src/workloads/topic_workload.py

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
import time
23
import ydb
34
from .base import BaseWorkload
45
from jobs.topic_jobs import TopicJobManager
@@ -10,47 +11,59 @@ def name(self) -> str:
1011
return "topic"
1112

1213
def create(self):
13-
self.logger.info("Creating topic: %s", self.args.topic_path)
14-
15-
try:
16-
self.driver.topic_client.create_topic(
17-
path=self.args.topic_path,
18-
min_active_partitions=self.args.topic_min_partitions,
19-
max_active_partitions=self.args.topic_max_partitions,
20-
retention_period=datetime.timedelta(hours=self.args.topic_retention_hours),
21-
consumers=[self.args.topic_consumer],
22-
)
23-
self.logger.info("Topic created successfully: %s", self.args.topic_path)
24-
self.logger.info("Consumer created: %s", self.args.topic_consumer)
14+
retry_no = 0
15+
while retry_no < 3:
16+
self.logger.info("Creating topic: %s (retry no: %d)", self.args.topic_path, retry_no)
17+
18+
try:
19+
self.driver.topic_client.create_topic(
20+
path=self.args.topic_path,
21+
min_active_partitions=self.args.topic_min_partitions,
22+
max_active_partitions=self.args.topic_max_partitions,
23+
retention_period=datetime.timedelta(hours=self.args.topic_retention_hours),
24+
consumers=[self.args.topic_consumer],
25+
)
2526

26-
except ydb.Error as e:
27-
error_msg = str(e).lower()
28-
if "already exists" in error_msg:
29-
self.logger.info("Topic already exists: %s", self.args.topic_path)
30-
31-
try:
32-
description = self.driver.topic_client.describe_topic(self.args.topic_path)
33-
consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers)
34-
35-
if not consumer_exists:
36-
self.logger.info("Adding consumer %s to existing topic", self.args.topic_consumer)
37-
self.driver.topic_client.alter_topic(
38-
path=self.args.topic_path, add_consumers=[self.args.topic_consumer]
39-
)
40-
self.logger.info("Consumer added successfully: %s", self.args.topic_consumer)
41-
else:
42-
self.logger.info("Consumer already exists: %s", self.args.topic_consumer)
43-
44-
except Exception as alter_err:
45-
self.logger.warning("Failed to add consumer: %s", alter_err)
27+
self.logger.info("Topic created successfully: %s", self.args.topic_path)
28+
self.logger.info("Consumer created: %s", self.args.topic_consumer)
29+
return
30+
31+
except ydb.Error as e:
32+
error_msg = str(e).lower()
33+
if "already exists" in error_msg:
34+
self.logger.info("Topic already exists: %s", self.args.topic_path)
35+
36+
try:
37+
description = self.driver.topic_client.describe_topic(self.args.topic_path)
38+
consumer_exists = any(c.name == self.args.topic_consumer for c in description.consumers)
39+
40+
if not consumer_exists:
41+
self.logger.info("Adding consumer %s to existing topic", self.args.topic_consumer)
42+
self.driver.topic_client.alter_topic(
43+
path=self.args.topic_path, add_consumers=[self.args.topic_consumer]
44+
)
45+
self.logger.info("Consumer added successfully: %s", self.args.topic_consumer)
46+
return
47+
else:
48+
self.logger.info("Consumer already exists: %s", self.args.topic_consumer)
49+
return
50+
51+
except Exception as alter_err:
52+
self.logger.warning("Failed to add consumer: %s", alter_err)
53+
raise
54+
elif "storage pool" in error_msg or "pq" in error_msg:
55+
self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e)
56+
self.logger.error("Please use YDB instance with topic support")
4657
raise
47-
elif "storage pool" in error_msg or "pq" in error_msg:
48-
self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e)
49-
self.logger.error("Please use YDB instance with topic support")
50-
raise
51-
else:
52-
self.logger.error("Failed to create topic: %s", e)
53-
raise
58+
elif isinstance(e, ydb.Unavailable):
59+
self.logger.info("YDB instance is not ready, retrying in 5 seconds...")
60+
time.sleep(5)
61+
retry_no += 1
62+
else:
63+
self.logger.error("Failed to create topic: %s", e)
64+
raise
65+
66+
raise RuntimeError("Failed to create topic")
5467

5568
def run_slo(self, metrics):
5669
self.logger.info("Starting topic SLO tests")

0 commit comments

Comments
 (0)