|
| 1 | +== Camel Kafka and SQL transactions example |
| 2 | + |
| 3 | +=== Introduction |
| 4 | + |
| 5 | +An example that shows how https://camel.apache.org/components/next/kafka-component.html[Camel Kafka] transaction works by leveraging the kafka client transaction capability, note that however this quickstart makes use of the camel-sql component, the JDBC driver doesn't enlish in a global transaction, since kafka transaction support is not compatible with JTA. |
| 6 | + |
| 7 | +The https://camel.apache.org/components/next/kafka-component.html#_kafka_transaction[camel-kafka component has support for transactions], since camel 4.13 you can use the `transacted=true` parameter in either the kafka endpoint or in the `application.properties`. If you use camel 4.12 or before, then you have to use the `additionalProperties[transactional.id]` parameter. |
| 8 | + |
| 9 | +The main use case of this quickstart is a route that sends to a kafka topic and inserts a row to ta sql table, then in case of failure in the SQL operation, there is a rollback and the message is not sent to the kafka topic. |
| 10 | + |
| 11 | +To simulate the rollback, the table `foo` has an unique constraint in the `name` column, so for the example we will try to insert a duplicate name, causing the sql `insert` operation to fail and the exchange route marked for rollback. |
| 12 | + |
| 13 | +This example requires docker and https://camel.apache.org/manual/camel-jbang.html#_installation[camel-jbang]. |
| 14 | + |
| 15 | +NOTE: This example makes use of the local transaction manager in Kafka client and the JDBC driver, this is not a JTA managed transaction, given that kafka doesn't support a JTA transaction api, as such for the 3rd example, if there is an error in the kafka delivery, then the SQL insert operation is not rolled back. |
| 16 | + |
| 17 | + |
| 18 | +=== Start the Kafka and PostgreSQL server |
| 19 | + |
| 20 | +* Use camel-jbang |
| 21 | + |
| 22 | +In camel-jbang there is a `camel infra` command to start services, before camel 4.13 the postgresql and kafka services were bound to random ports, but since camel 4.13 the service is bound to a fixed port. Then we suggest to use the latest camel-jbang to launch the service with a fixed port, so you don't have to manually update the port in `src/main/resources/application.yaml`. |
| 23 | + |
| 24 | +To start the postgresql server |
| 25 | +``` |
| 26 | +camel infra run postgres |
| 27 | +``` |
| 28 | +It will output this: |
| 29 | +``` |
| 30 | +Starting service postgres |
| 31 | +{ |
| 32 | + "getServiceAddress" : "localhost:5432", |
| 33 | + "host" : "localhost", |
| 34 | + "password" : "test", |
| 35 | + "port" : 5432, |
| 36 | + "userName" : "test" |
| 37 | +} |
| 38 | +Press any key to stop the execution |
| 39 | +``` |
| 40 | + |
| 41 | +If the port is different than `5432` then you should update the `src/main/resources/application.yaml`. |
| 42 | + |
| 43 | +To start the kafka server |
| 44 | +``` |
| 45 | +camel infra run kafka |
| 46 | +``` |
| 47 | + |
| 48 | +It will output this: |
| 49 | +``` |
| 50 | +Starting service kafka |
| 51 | +{ |
| 52 | + "brokers" : "localhost:9092", |
| 53 | + "getBootstrapServers" : "localhost:9092" |
| 54 | +} |
| 55 | +Press any key to stop the execution |
| 56 | +``` |
| 57 | + |
| 58 | +If the port is different than `9092` then you should update the `src/main/resources/application.yaml`. |
| 59 | + |
| 60 | +* Use docker |
| 61 | + |
| 62 | +You can use docker in case you don't want to use camel-jbang. |
| 63 | + |
| 64 | +To start the postgresql server |
| 65 | +``` |
| 66 | +docker run --rm --name postgresql -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test -p 5432:5432 mirror.gcr.io/library/postgres:latest |
| 67 | +``` |
| 68 | + |
| 69 | +To start the kafka server |
| 70 | +``` |
| 71 | +docker run --rm --name kafka -p 9092:9092 mirror.gcr.io/apache/kafka:3.9.1 |
| 72 | +``` |
| 73 | + |
| 74 | +* Log the messages sent to the topic |
| 75 | + |
| 76 | +You can follow the messages sent to the kafka topic to make sure which messages were commited to the topic, note the isolation level of the consumer to read only the commited messages. |
| 77 | + |
| 78 | +``` |
| 79 | +docker exec -it `docker ps|grep '9092->9092'|awk '{print $1}'` /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo --isolation-level read_committed |
| 80 | +``` |
| 81 | + |
| 82 | +=== Build and run |
| 83 | + |
| 84 | +Build and run the quickstart. |
| 85 | + |
| 86 | +``` |
| 87 | +mvn compile spring-boot:run |
| 88 | +``` |
| 89 | + |
| 90 | +There are 4 routes represented by the HTTP endpoints: |
| 91 | + |
| 92 | +``` |
| 93 | +1. http://localhost:8080/send/{word} |
| 94 | +2. http://localhost:8080/send2/{word} |
| 95 | +3. http://localhost:8080/sendtx/{word} |
| 96 | +4. http://localhost:8080/sendtx2/{word} |
| 97 | +``` |
| 98 | + |
| 99 | +* 1 - No transaction - SQL Insert and send message to kafka topic |
| 100 | + |
| 101 | +The 1st route has no transaction support, once it receives the message from the consumer it executes the SQL insert, then sends the message to the kafka topic. If the SQL operation fails, the exception handling mechanism will interrupt the code execution and the kafka producer won't send the message to the kafka topic. |
| 102 | + |
| 103 | +Run the following command: |
| 104 | +``` |
| 105 | +curl http://localhost:8080/send/bar1 |
| 106 | +``` |
| 107 | + |
| 108 | +You should see compreensive log in the example terminal with `http word: bar1` and the `{"foo": "bar1"}` in the kafka-consumer log terminal. |
| 109 | + |
| 110 | +Then if you re-run the curl command, it should show the error in the example terminal: |
| 111 | +``` |
| 112 | +org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; ERROR: duplicate key value violates unique constraint "foo_name_key" |
| 113 | + Detail: Key (name)=(bar1) already exists. |
| 114 | +``` |
| 115 | + |
| 116 | +And no message show in the kafka-consumer terminal, as if you examine the route, the kafka producer is not run, since the SQL insert fails before. |
| 117 | +``` |
| 118 | +from("platform-http:/send/{word}") |
| 119 | + .log("http word: ${header.word}") |
| 120 | + .to("sql:" + insert) |
| 121 | + .setBody(simple("{\"foo\": \"${header.word}\"}")) |
| 122 | + .to("kafka:foo"); |
| 123 | +``` |
| 124 | + |
| 125 | +* 2 - No transaction - Send message to kafka topic and SQL Insert |
| 126 | + |
| 127 | +The 2nd route has no transaction support, once it receives the message from the consumer it sends the message to the kafka topic then executes the SQL insert command. If the SQL operation fails, as there is no transaction the message is not marked for rollback. |
| 128 | + |
| 129 | +Run the following command: |
| 130 | +``` |
| 131 | +curl http://localhost:8080/send2/bar2 |
| 132 | +``` |
| 133 | + |
| 134 | +You should see compreensive log in the example terminal with `http word: bar2` and the `{"foo": "bar2"}` in the kafka-consumer log terminal. |
| 135 | + |
| 136 | +Then if you re-run the curl command, it should show the error in the example terminal: |
| 137 | +``` |
| 138 | +org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; ERROR: duplicate key value violates unique constraint "foo_name_key" |
| 139 | + Detail: Key (name)=(bar2) already exists. |
| 140 | +``` |
| 141 | + |
| 142 | +The `{"foo": "bar2"}` message show in the kafka-consumer terminal shows that the message is in the kafka topic, since there is no transaction in the kafka client, there is no rollback to perform. |
| 143 | +``` |
| 144 | +from("platform-http:/send2/{word}") |
| 145 | + .log("http word: ${header.word}") |
| 146 | + .setBody(simple("{\"foo\": \"${header.word}\"}"))jiuredhat |
| 147 | + .to("kafka:foo") |
| 148 | + .to("sql:" + insert); |
| 149 | +``` |
| 150 | + |
| 151 | +* 3 - With transaction - SQL Insert and send message to kafka topic |
| 152 | + |
| 153 | +The 3rd route has transaction support, once it receives the message from the consumer it executes the SQL insert, then sends the message to the kafka topic. If the SQL operation fails, the exception handling mechanism will interrupt the code execution and the kafka producer won't send the message to the kafka topic. |
| 154 | + |
| 155 | +Run the following command: |
| 156 | +``` |
| 157 | +curl http://localhost:8080/sendtx/bar3 |
| 158 | +``` |
| 159 | + |
| 160 | +You should see compreensive log in the example terminal with `http word: bar3` message content and the kafka producer commit like `Commit kafka transaction endpoint27-route16 with exchange F865E9F937249D7-0000000000000001` and the `{"foo": "bar3"}` in the kafka-consumer log terminal. |
| 161 | + |
| 162 | +Then if you re-run the curl command, it should show the error in the example terminal: |
| 163 | +``` |
| 164 | +org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; ERROR: duplicate key value violates unique constraint "foo_name_key" |
| 165 | + Detail: Key (name)=(bar3) already exists. |
| 166 | +``` |
| 167 | + |
| 168 | +And no message show in the kafka-consumer terminal, as if you examine the route, the kafka producer doesn't run, since the SQL insert fails before. |
| 169 | + |
| 170 | +You can see there are additional code in comparison to the first route, the `onException` that marks the route for rollback and the `transacted=true` parameter of the kafka endpoint. |
| 171 | + |
| 172 | +``` |
| 173 | +from("platform-http:/sendtx/{word}") |
| 174 | + .onException(Exception.class) |
| 175 | + .handled(true) |
| 176 | + .rollback("Expected error when trying to insert duplicate values in the unique column.") |
| 177 | + .end() |
| 178 | + .log("http word: ${header.word}") |
| 179 | + .to("sql:" + insert) |
| 180 | + .setBody(simple("{\"foo\": \"${header.word}\"}")) |
| 181 | + .to("kafka:foo?transacted=true"); |
| 182 | +``` |
| 183 | + |
| 184 | +* 4 - With transaction - Send message to kafka topic and SQL Insert |
| 185 | + |
| 186 | +The 4th route has transaction support, once it receives the message from the consumer it sends the message to the kafka topic and executes the SQL insert. you can note the kafka delivery occurs before the SQL operation, so if the SQL operation fails, the `onException` handling mechanism will catch the error and will mark the route exchange to rollback, then cascades to the kafka client to rollback the message delivery to the topic. |
| 187 | + |
| 188 | +Run the following command: |
| 189 | +``` |
| 190 | +curl http://localhost:8080/sendtx2/bar4 |
| 191 | +``` |
| 192 | + |
| 193 | +You should see compreensive log in the example terminal with `http word: bar4` message content and the kafka producer commit like `Commit kafka transaction endpoint3-route16 with exchange F865E9F937249D7-0000000000000001` and the `{"foo": "bar4"}` in the kafka-consumer log terminal. |
| 194 | + |
| 195 | + |
| 196 | +Then if you re-run the curl command, it should show the error in the example terminal: |
| 197 | +``` |
| 198 | +org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; ERROR: duplicate key value violates unique constraint "foo_name_key" |
| 199 | + Detail: Key (name)=(bar4) already exists. |
| 200 | +``` |
| 201 | + |
| 202 | +And no message show in the kafka-consumer terminal, as if you examNine the route, the kafka producer runs but the route exchange is marked for rollback, so the message is not commited to the topic. |
| 203 | + |
| 204 | +You can see there are additional code in comparison to the first route, the `onException` that marks the route for rollback and the `transacted=true` parameter of the kafka endpoint. |
| 205 | + |
| 206 | +``` |
| 207 | +from("platform-http:/sendtx2/{word}") |
| 208 | + .onException(Exception.class) |
| 209 | + .handled(true) |
| 210 | + .rollback("Expected error when trying to insert duplicate values in the unique column.") |
| 211 | + .end() |
| 212 | + .log("http word: ${header.word}") |
| 213 | + .setBody(simple("{\"foo\": \"${header.word}\"}")) |
| 214 | + .to("kafka:foo?transacted=true") |
| 215 | + .to("sql:" + insert); |
| 216 | +``` |
| 217 | + |
| 218 | +Press `Ctrl-C` to exit. |
| 219 | + |
| 220 | +=== Help and contributions |
| 221 | + |
| 222 | +If you hit any problem using Camel or have some feedback, |
| 223 | +then please https://camel.apache.org/community/support/[let us know]. |
| 224 | + |
| 225 | +We also love contributors, |
| 226 | +so https://camel.apache.org/community/contributing/[get involved] :-) |
| 227 | + |
| 228 | +The Camel riders! |
0 commit comments