@@ -9,14 +9,7 @@ use stackable_operator::{
99use crate :: {
1010 crd:: {
1111 KafkaPodDescriptor , STACKABLE_CONFIG_DIR , STACKABLE_KERBEROS_KRB5_PATH ,
12- STACKABLE_LISTENER_BOOTSTRAP_DIR , STACKABLE_LISTENER_BROKER_DIR ,
13- listener:: { KafkaListenerConfig , KafkaListenerName , node_address_cmd} ,
14- role:: {
15- KAFKA_ADVERTISED_LISTENERS , KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS ,
16- KAFKA_CONTROLLER_QUORUM_VOTERS , KAFKA_LISTENER_SECURITY_PROTOCOL_MAP , KAFKA_LISTENERS ,
17- KAFKA_NODE_ID , KAFKA_NODE_ID_OFFSET , KafkaRole , broker:: BROKER_PROPERTIES_FILE ,
18- controller:: CONTROLLER_PROPERTIES_FILE ,
19- } ,
12+ role:: { broker:: BROKER_PROPERTIES_FILE , controller:: CONTROLLER_PROPERTIES_FILE } ,
2013 security:: KafkaTlsSecurity ,
2114 v1alpha1,
2215 } ,
@@ -28,8 +21,6 @@ pub fn broker_kafka_container_commands(
2821 kafka : & v1alpha1:: KafkaCluster ,
2922 cluster_id : & str ,
3023 controller_descriptors : Vec < KafkaPodDescriptor > ,
31- kafka_listeners : & KafkaListenerConfig ,
32- opa_connect_string : Option < & str > ,
3324 kafka_security : & KafkaTlsSecurity ,
3425 product_version : & str ,
3526) -> String {
@@ -51,88 +42,45 @@ pub fn broker_kafka_container_commands(
5142 true => format!( "export KERBEROS_REALM=$(grep -oP 'default_realm = \\ K.*' {STACKABLE_KERBEROS_KRB5_PATH})" ) ,
5243 false => "" . to_string( ) ,
5344 } ,
54- broker_start_command = broker_start_command( kafka, cluster_id, controller_descriptors, kafka_listeners , opa_connect_string , kafka_security , product_version) ,
45+ broker_start_command = broker_start_command( kafka, cluster_id, controller_descriptors, product_version) ,
5546 }
5647}
5748
5849fn broker_start_command (
5950 kafka : & v1alpha1:: KafkaCluster ,
6051 cluster_id : & str ,
6152 controller_descriptors : Vec < KafkaPodDescriptor > ,
62- kafka_listeners : & KafkaListenerConfig ,
63- opa_connect_string : Option < & str > ,
64- kafka_security : & KafkaTlsSecurity ,
6553 product_version : & str ,
6654) -> String {
67- let opa_config = match opa_connect_string {
68- None => "" . to_string ( ) ,
69- Some ( opa_connect_string) => {
70- format ! ( " --override \" opa.authorizer.url={opa_connect_string}\" " )
71- }
72- } ;
73-
74- let jaas_config = match kafka_security. has_kerberos_enabled ( ) {
75- true => {
76- formatdoc ! { "
77- --override \" {client_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\ \" /stackable/kerberos/keytab\\ \" principal=\\ \" {service_name}/{broker_address}@$KERBEROS_REALM\\ \" ;\" \
78- --override \" {bootstrap_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\ \" /stackable/kerberos/keytab\\ \" principal=\\ \" {service_name}/{bootstrap_address}@$KERBEROS_REALM\\ \" ;\"
79- " ,
80- client_jaas_config = KafkaListenerName :: Client . listener_gssapi_sasl_jaas_config( ) ,
81- bootstrap_jaas_config = KafkaListenerName :: Bootstrap . listener_gssapi_sasl_jaas_config( ) ,
82- service_name = KafkaRole :: Broker . kerberos_service_name( ) ,
83- broker_address = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
84- bootstrap_address = node_address_cmd( STACKABLE_LISTENER_BOOTSTRAP_DIR ) ,
85- }
86- }
87- false => "" . to_string ( ) ,
88- } ;
89-
90- let client_port = kafka_security. client_port ( ) ;
91-
92- // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties
93- // This should be improved:
94- // - mount emptyDir as readWriteConfig
95- // - use config-utils for proper replacements?
96- // - should we print the adapted properties file at startup?
9755 if kafka. is_controller_configured ( ) {
9856 formatdoc ! { "
99- export REPLICA_ID=$(echo \" $POD_NAME\" | grep -oE '[0-9]+$')
57+ POD_INDEX=$(echo \" $POD_NAME\" | grep -oE '[0-9]+$')
58+ export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))
59+
10060 cp {config_dir}/{properties_file} /tmp/{properties_file}
61+ config-utils template /tmp/{properties_file}
10162
102- echo \" {KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file}
103- echo \" {KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file}
104- echo \" {KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file}
105- echo \" {KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" >> /tmp/{properties_file}
106- echo \" {KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file}
107- echo \" {KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file}
63+ cp {config_dir}/jaas.properties /tmp/jaas.properties
64+ config-utils template /tmp/jaas.properties
10865
10966 bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
110- bin/kafka-server-start.sh /tmp/{properties_file} {opa_config}{jaas_config} &
67+ bin/kafka-server-start.sh /tmp/{properties_file} &
11168 " ,
11269 config_dir = STACKABLE_CONFIG_DIR ,
11370 properties_file = BROKER_PROPERTIES_FILE ,
114- bootstrap_servers = to_bootstrap_servers( & controller_descriptors, client_port) ,
115- listeners = kafka_listeners. listeners( ) ,
116- advertised_listeners = kafka_listeners. advertised_listeners( ) ,
117- listener_security_protocol_map = kafka_listeners. listener_security_protocol_map( ) ,
118- controller_quorum_voters = to_quorum_voters( & controller_descriptors, client_port) ,
119- initial_controller_command = initial_controllers_command( & controller_descriptors, product_version, client_port) ,
71+ initial_controller_command = initial_controllers_command( & controller_descriptors, product_version) ,
12072 }
12173 } else {
12274 formatdoc ! { "
123- bin/kafka-server-start.sh {config_dir}/{properties_file} \
124- --override \" zookeeper.connect=$ZOOKEEPER\" \
125- --override \" {KAFKA_LISTENERS}={listeners}\" \
126- --override \" {KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" \
127- --override \" {KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" \
128- {opa_config} \
129- {jaas_config} \
130- &",
75+ cp {config_dir}/{properties_file} /tmp/{properties_file}
76+ config-utils template /tmp/{properties_file}
77+
78+ cp {config_dir}/jaas.properties /tmp/jaas.properties
79+ config-utils template /tmp/jaas.properties
80+
81+ bin/kafka-server-start.sh /tmp/{properties_file} &" ,
13182 config_dir = STACKABLE_CONFIG_DIR ,
13283 properties_file = BROKER_PROPERTIES_FILE ,
133- listeners = kafka_listeners. listeners( ) ,
134- advertised_listeners = kafka_listeners. advertised_listeners( ) ,
135- listener_security_protocol_map = kafka_listeners. listener_security_protocol_map( ) ,
13684 }
13785 }
13886}
@@ -182,31 +130,20 @@ wait_for_termination()
182130pub fn controller_kafka_container_command (
183131 cluster_id : & str ,
184132 controller_descriptors : Vec < KafkaPodDescriptor > ,
185- kafka_listeners : & KafkaListenerConfig ,
186- kafka_security : & KafkaTlsSecurity ,
187133 product_version : & str ,
188134) -> String {
189- let client_port = kafka_security. client_port ( ) ;
190-
191- // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties
192- // This should be improved:
193- // - mount emptyDir as readWriteConfig
194- // - use config-utils for proper replacements?
195- // - should we print the adapted properties file at startup?
196135 formatdoc ! { "
197136 {BASH_TRAP_FUNCTIONS}
198137 {remove_vector_shutdown_file_command}
199138 prepare_signal_handlers
200139 containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &
201140
202- export REPLICA_ID=$(echo \" $POD_NAME\" | grep -oE '[0-9]+$')
141+ POD_INDEX=$(echo \" $POD_NAME\" | grep -oE '[0-9]+$')
142+ export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))
143+
203144 cp {config_dir}/{properties_file} /tmp/{properties_file}
204145
205- echo \" {KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file}
206- echo \" {KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file}
207- echo \" {KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file}
208- echo \" {KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file}
209- echo \" {KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file}
146+ config-utils template /tmp/{properties_file}
210147
211148 bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
212149 bin/kafka-server-start.sh /tmp/{properties_file} &
@@ -217,64 +154,28 @@ pub fn controller_kafka_container_command(
217154 remove_vector_shutdown_file_command = remove_vector_shutdown_file_command( STACKABLE_LOG_DIR ) ,
218155 config_dir = STACKABLE_CONFIG_DIR ,
219156 properties_file = CONTROLLER_PROPERTIES_FILE ,
220- bootstrap_servers = to_bootstrap_servers( & controller_descriptors, client_port) ,
221- listeners = to_listeners( client_port) ,
222- listener_security_protocol_map = to_listener_security_protocol_map( kafka_listeners) ,
223- initial_controller_command = initial_controllers_command( & controller_descriptors, product_version, client_port) ,
224- controller_quorum_voters = to_quorum_voters( & controller_descriptors, client_port) ,
157+ initial_controller_command = initial_controllers_command( & controller_descriptors, product_version) ,
225158 create_vector_shutdown_file_command = create_vector_shutdown_file_command( STACKABLE_LOG_DIR )
226159 }
227160}
228161
229- fn to_listeners ( port : u16 ) -> String {
230- // The environment variables are set in the statefulset of the controller
231- format ! (
232- "{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}" ,
233- listener_name = KafkaListenerName :: Controller
234- )
235- }
236-
237- fn to_listener_security_protocol_map ( kafka_listeners : & KafkaListenerConfig ) -> String {
238- kafka_listeners
239- . listener_security_protocol_map_for_listener ( & KafkaListenerName :: Controller )
240- . unwrap_or ( "" . to_string ( ) )
241- }
242-
243- fn to_initial_controllers ( controller_descriptors : & [ KafkaPodDescriptor ] , port : u16 ) -> String {
244- controller_descriptors
245- . iter ( )
246- . map ( |desc| desc. as_voter ( port) )
247- . collect :: < Vec < String > > ( )
248- . join ( "," )
249- }
250-
251- // TODO: This can be removed once 3.7.2 is removed. Used in command.rs.
252- fn to_quorum_voters ( controller_descriptors : & [ KafkaPodDescriptor ] , port : u16 ) -> String {
253- controller_descriptors
254- . iter ( )
255- . map ( |desc| desc. as_quorum_voter ( port) )
256- . collect :: < Vec < String > > ( )
257- . join ( "," )
258- }
259-
260- fn to_bootstrap_servers ( controller_descriptors : & [ KafkaPodDescriptor ] , port : u16 ) -> String {
162+ fn to_initial_controllers ( controller_descriptors : & [ KafkaPodDescriptor ] ) -> String {
261163 controller_descriptors
262164 . iter ( )
263- . map ( |desc| format ! ( "{fqdn}:{port}" , fqdn = desc. fqdn ( ) ) )
165+ . map ( |desc| desc. as_voter ( ) )
264166 . collect :: < Vec < String > > ( )
265167 . join ( "," )
266168}
267169
268170fn initial_controllers_command (
269171 controller_descriptors : & [ KafkaPodDescriptor ] ,
270172 product_version : & str ,
271- client_port : u16 ,
272173) -> String {
273174 match product_version. starts_with ( "3.7" ) {
274175 true => "" . to_string ( ) ,
275176 false => format ! (
276177 "--initial-controllers {initial_controllers}" ,
277- initial_controllers = to_initial_controllers( controller_descriptors, client_port ) ,
178+ initial_controllers = to_initial_controllers( controller_descriptors) ,
278179 ) ,
279180 }
280181}
0 commit comments