1
1
import datetime
2
+ import time
2
3
import ydb
3
4
from .base import BaseWorkload
4
5
from jobs .topic_jobs import TopicJobManager
@@ -10,47 +11,59 @@ def name(self) -> str:
10
11
return "topic"
11
12
12
13
def create (self ):
13
- self . logger . info ( "Creating topic: %s" , self . args . topic_path )
14
-
15
- try :
16
- self . driver . topic_client . create_topic (
17
- path = self . args . topic_path ,
18
- min_active_partitions = self .args . topic_min_partitions ,
19
- max_active_partitions = self .args .topic_max_partitions ,
20
- retention_period = datetime . timedelta ( hours = self .args .topic_retention_hours ) ,
21
- consumers = [ self .args .topic_consumer ] ,
22
- )
23
- self . logger . info ( "Topic created successfully: %s" , self .args .topic_path )
24
- self . logger . info ( "Consumer created: %s" , self . args . topic_consumer )
14
+ retry_no = 0
15
+ while retry_no < 3 :
16
+ self . logger . info ( "Creating topic: %s (retry no: %d)" , self . args . topic_path , retry_no )
17
+
18
+ try :
19
+ self .driver . topic_client . create_topic (
20
+ path = self .args .topic_path ,
21
+ min_active_partitions = self .args .topic_min_partitions ,
22
+ max_active_partitions = self .args .topic_max_partitions ,
23
+ retention_period = datetime . timedelta ( hours = self . args . topic_retention_hours ),
24
+ consumers = [ self .args .topic_consumer ],
25
+ )
25
26
26
- except ydb .Error as e :
27
- error_msg = str (e ).lower ()
28
- if "already exists" in error_msg :
29
- self .logger .info ("Topic already exists: %s" , self .args .topic_path )
30
-
31
- try :
32
- description = self .driver .topic_client .describe_topic (self .args .topic_path )
33
- consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
34
-
35
- if not consumer_exists :
36
- self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
37
- self .driver .topic_client .alter_topic (
38
- path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
39
- )
40
- self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
41
- else :
42
- self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
43
-
44
- except Exception as alter_err :
45
- self .logger .warning ("Failed to add consumer: %s" , alter_err )
27
+ self .logger .info ("Topic created successfully: %s" , self .args .topic_path )
28
+ self .logger .info ("Consumer created: %s" , self .args .topic_consumer )
29
+ return
30
+
31
+ except ydb .Error as e :
32
+ error_msg = str (e ).lower ()
33
+ if "already exists" in error_msg :
34
+ self .logger .info ("Topic already exists: %s" , self .args .topic_path )
35
+
36
+ try :
37
+ description = self .driver .topic_client .describe_topic (self .args .topic_path )
38
+ consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
39
+
40
+ if not consumer_exists :
41
+ self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
42
+ self .driver .topic_client .alter_topic (
43
+ path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
44
+ )
45
+ self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
46
+ return
47
+ else :
48
+ self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
49
+ return
50
+
51
+ except Exception as alter_err :
52
+ self .logger .warning ("Failed to add consumer: %s" , alter_err )
53
+ raise
54
+ elif "storage pool" in error_msg or "pq" in error_msg :
55
+ self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
56
+ self .logger .error ("Please use YDB instance with topic support" )
46
57
raise
47
- elif "storage pool" in error_msg or "pq" in error_msg :
48
- self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
49
- self .logger .error ("Please use YDB instance with topic support" )
50
- raise
51
- else :
52
- self .logger .error ("Failed to create topic: %s" , e )
53
- raise
58
+ elif isinstance (e , ydb .Unavailable ):
59
+ self .logger .info ("YDB instance is not ready, retrying in 5 seconds..." )
60
+ time .sleep (5 )
61
+ retry_no += 1
62
+ else :
63
+ self .logger .error ("Failed to create topic: %s" , e )
64
+ raise
65
+
66
+ raise RuntimeError ("Failed to create topic" )
54
67
55
68
def run_slo (self , metrics ):
56
69
self .logger .info ("Starting topic SLO tests" )
0 commit comments