8
8
import org .slf4j .Logger ;
9
9
import org .slf4j .LoggerFactory ;
10
10
11
+ import java .nio .charset .StandardCharsets ;
11
12
import java .util .Properties ;
12
- import java .util .concurrent .ExecutionException ;
13
13
14
14
/**
15
15
* Example application that uses the Kafka Clients API to produce messages.
16
16
* Run with:
17
- * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar
17
+ * java -javaagent:path/to/superstream-clients-1.0.0.jar
18
+ * -Dlogback.configurationFile=logback.xml -jar
19
+ * kafka-clients-example-1.0.0-jar-with-dependencies.jar
18
20
*
19
21
* Prerequisites:
20
22
* 1. A Kafka server with the following topics:
21
- * - superstream.metadata_v1 - with a configuration message
22
- * - superstream.clients - for client reports
23
- * - example-topic - for test messages
23
+ * - superstream.metadata_v1 - with a configuration message
24
+ * - superstream.clients - for client reports
25
+ * - example-topic - for test messages
24
26
*
25
27
* Environment variables:
26
- * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092)
27
- * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic)
28
+ * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default:
29
+ * localhost:9092)
30
+ * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for
31
+ * (default: example-topic)
28
32
*/
29
33
public class KafkaProducerExample {
30
34
private static final Logger logger = LoggerFactory .getLogger (KafkaProducerExample .class );
@@ -33,61 +37,85 @@ public class KafkaProducerExample {
33
37
private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092" ;
34
38
35
39
private static final String CLIENT_ID = "superstream-example-producer" ;
36
- private static final String COMPRESSION_TYPE = "gzip" ;
37
- private static final Integer BATCH_SIZE = 16384 ;
40
+ private static final String COMPRESSION_TYPE = "zstd" ; // Changed from gzip to snappy for better visibility
41
+ private static final Integer BATCH_SIZE = 1048576 ; // 1MB batch size
38
42
39
43
private static final String TOPIC_NAME = "example-topic" ;
40
44
private static final String MESSAGE_KEY = "test-key" ;
41
- private static final String MESSAGE_VALUE = "Hello, Superstream!" ;
45
+ // Create a larger message that will compress well
46
+ private static final String MESSAGE_VALUE = generateLargeCompressibleMessage ();
42
47
43
48
public static void main (String [] args ) {
44
- String bootstrapServers = System .getenv ("KAFKA_BOOTSTRAP_SERVERS" );
45
- if (bootstrapServers == null || bootstrapServers .isEmpty ()) {
46
- bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS ;
47
- }
48
-
49
- // Configure the producer
50
49
Properties props = new Properties ();
51
- props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
50
+ props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , DEFAULT_BOOTSTRAP_SERVERS );
52
51
props .put ("client.id" , CLIENT_ID );
53
52
props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
54
53
props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
55
-
56
- // Set some basic configuration
57
54
props .put (ProducerConfig .COMPRESSION_TYPE_CONFIG , COMPRESSION_TYPE );
58
55
props .put (ProducerConfig .BATCH_SIZE_CONFIG , BATCH_SIZE );
56
+ props .put (ProducerConfig .LINGER_MS_CONFIG , 500 ); // Force batching by waiting 500ms
59
57
60
- logger .info ("Creating producer with bootstrap servers: {}" , bootstrapServers );
61
- logger .info ("Original producer configuration:" );
62
- props .forEach ((k , v ) -> logger .info (" {} = {}" , k , v ));
63
-
58
+ long recordCount = 50 ; // Number of messages to send
64
59
try (Producer <String , String > producer = new KafkaProducer <>(props )) {
65
- // The Superstream Agent should have intercepted the producer creation
66
- // and potentially optimized the configuration
67
-
68
- // Log the actual configuration used by the producer
69
- logger .info ("Actual producer configuration (after potential Superstream optimization):" );
70
-
71
- // Get the actual configuration from the producer via reflection
72
- java .lang .reflect .Field configField = producer .getClass ().getDeclaredField ("producerConfig" );
73
- configField .setAccessible (true );
74
- org .apache .kafka .clients .producer .ProducerConfig actualConfig =
75
- (org .apache .kafka .clients .producer .ProducerConfig ) configField .get (producer );
60
+ while (true ) {
61
+ // Send 50 large messages to see compression benefits
62
+ logger .info ("Starting to send {} large messages..." , recordCount );
63
+ for (int i = 1 ; i <= recordCount ; i ++) {
64
+ String messageKey = MESSAGE_KEY + "-" + i ;
65
+ String messageValue = MESSAGE_VALUE + "-" + i + "-" + System .currentTimeMillis ();
66
+ producer .send (new ProducerRecord <>(TOPIC_NAME , messageKey , messageValue ));
67
+ }
76
68
77
- logger .info (" compression.type = {}" , actualConfig .getString (ProducerConfig .COMPRESSION_TYPE_CONFIG ));
78
- logger .info (" batch.size = {}" , actualConfig .getInt (ProducerConfig .BATCH_SIZE_CONFIG ));
79
-
80
- // Send a test message
81
- logger .info ("Sending message to topic {}: key={}, value={}" , TOPIC_NAME , MESSAGE_KEY , MESSAGE_VALUE );
82
- producer .send (new ProducerRecord <>(TOPIC_NAME , MESSAGE_KEY , MESSAGE_VALUE )).get ();
83
- logger .info ("Message sent successfully!" );
84
- } catch (InterruptedException e ) {
85
- Thread .currentThread ().interrupt ();
86
- logger .error ("Interrupted while sending message" , e );
87
- } catch (ExecutionException e ) {
88
- logger .error ("Error sending message" , e );
69
+ logger .info ("All 50 large messages queued successfully! Adding a producer.flush() to send them all at once..." );
70
+ producer .flush ();
71
+ Thread .sleep (7000000 );
72
+ logger .info ("Waking up and preparing to send the next batch of messages" );
73
+ // return;
74
+ }
89
75
} catch (Exception e ) {
90
- logger .error ("Unexpected error" , e );
76
+ logger .error ("Error sending message" , e );
77
+ }
78
+ }
79
+
80
+ private static String generateLargeCompressibleMessage () {
81
+ // Return a 1KB JSON string with repeating data that can be compressed well
82
+ StringBuilder json = new StringBuilder ();
83
+ json .append ("{\n " );
84
+ json .append (" \" metadata\" : {\n " );
85
+ json .append (" \" id\" : \" 12345\" ,\n " );
86
+ json .append (" \" type\" : \" example\" ,\n " );
87
+ json .append (" \" timestamp\" : 1635954438000\n " );
88
+ json .append (" },\n " );
89
+ json .append (" \" data\" : {\n " );
90
+ json .append (" \" metrics\" : [\n " );
91
+
92
+ // Add repeating metrics data to reach ~1KB
93
+ for (int i = 0 ; i < 15 ; i ++) {
94
+ if (i > 0 ) json .append (",\n " );
95
+ json .append (" {\n " );
96
+ json .append (" \" name\" : \" metric" ).append (i ).append ("\" ,\n " );
97
+ json .append (" \" value\" : " ).append (i * 10 ).append (",\n " );
98
+ json .append (" \" tags\" : [\" tag1\" , \" tag2\" , \" tag3\" ],\n " );
99
+ json .append (" \" properties\" : {\n " );
100
+ json .append (" \" property1\" : \" value1\" ,\n " );
101
+ json .append (" \" property2\" : \" value2\" \n " );
102
+ json .append (" }\n " );
103
+ json .append (" }" );
91
104
}
105
+
106
+ json .append ("\n ]\n " );
107
+ json .append (" },\n " );
108
+ json .append (" \" config\" : {\n " );
109
+ json .append (" \" sampling\" : \" full\" ,\n " );
110
+ json .append (" \" retention\" : \" 30d\" ,\n " );
111
+ json .append (" \" compression\" : true,\n " );
112
+ json .append (" \" encryption\" : false\n " );
113
+ json .append (" }\n " );
114
+ json .append ("}" );
115
+
116
+ String result = json .toString ();
117
+ logger .debug ("Generated compressible message of {} bytes" , result .getBytes (StandardCharsets .UTF_8 ).length );
118
+
119
+ return result ;
92
120
}
93
121
}
0 commit comments