|
23 | 23 | import jakarta.jms.CompletionListener; |
24 | 24 | import jakarta.jms.Destination; |
25 | 25 | import jakarta.jms.IllegalStateException; |
| 26 | +import jakarta.jms.IllegalStateRuntimeException; |
26 | 27 | import jakarta.jms.InvalidDestinationException; |
27 | 28 | import jakarta.jms.JMSException; |
28 | 29 | import jakarta.jms.Message; |
29 | | - |
30 | 30 | import org.apache.activemq.command.ActiveMQDestination; |
31 | 31 | import org.apache.activemq.command.ProducerAck; |
32 | 32 | import org.apache.activemq.command.ProducerId; |
@@ -83,11 +83,13 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl |
83 | 83 | private final long startTime; |
84 | 84 | private MessageTransformer transformer; |
85 | 85 | private MemoryUsage producerWindow; |
| 86 | + private final ThreadLocal<Boolean> inCompletionListenerCallback = new ThreadLocal<>(); |
86 | 87 |
|
87 | 88 | protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { |
88 | 89 | super(session); |
89 | 90 | this.info = new ProducerInfo(producerId); |
90 | 91 | this.info.setWindowSize(session.connection.getProducerWindowSize()); |
| 92 | + inCompletionListenerCallback.set(false); |
91 | 93 | // Allows the options on the destination to configure the producerInfo |
92 | 94 | if (destination != null && destination.getOptions() != null) { |
93 | 95 | Map<String, Object> options = IntrospectionSupport.extractProperties( |
@@ -168,6 +170,9 @@ public Destination getDestination() throws JMSException { |
168 | 170 | */ |
169 | 171 | @Override |
170 | 172 | public void close() throws JMSException { |
| 173 | + if (inCompletionListenerCallback.get()) { |
| 174 | + throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener"); |
| 175 | + } |
171 | 176 | if (!closed) { |
172 | 177 | dispose(); |
173 | 178 | this.session.asyncSendPacket(info.createRemoveCommand()); |
@@ -239,27 +244,88 @@ public void send(Destination destination, Message message, int deliveryMode, int |
239 | 244 | */ |
240 | 245 | @Override |
241 | 246 | public void send(Message message, CompletionListener completionListener) throws JMSException { |
242 | | - throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported"); |
| 247 | + this.send(getDestination(), |
| 248 | + message, |
| 249 | + defaultDeliveryMode, |
| 250 | + defaultPriority, |
| 251 | + defaultTimeToLive, |
| 252 | + completionListener); |
243 | 253 | } |
244 | 254 |
|
| 255 | + |
245 | 256 | @Override |
246 | 257 | public void send(Message message, int deliveryMode, int priority, long timeToLive, |
247 | 258 | CompletionListener completionListener) throws JMSException { |
248 | | - throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); |
| 259 | + this.send(this.getDestination(), |
| 260 | + message, |
| 261 | + deliveryMode, |
| 262 | + priority, |
| 263 | + timeToLive, |
| 264 | + completionListener); |
249 | 265 | } |
250 | 266 |
|
251 | 267 | @Override |
252 | 268 | public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { |
253 | | - throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported"); |
| 269 | + this.send(destination, |
| 270 | + message, |
| 271 | + defaultDeliveryMode, |
| 272 | + defaultPriority, |
| 273 | + defaultTimeToLive, |
| 274 | + completionListener); |
254 | 275 | } |
255 | 276 |
|
256 | 277 | @Override |
257 | 278 | public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, |
258 | 279 | CompletionListener completionListener) throws JMSException { |
259 | | - throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); |
| 280 | + this.send(destination, message, deliveryMode, priority, timeToLive, |
| 281 | + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); |
| 282 | + } |
| 283 | + |
| 284 | + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, |
| 285 | + boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException { |
| 286 | + checkClosed(); |
| 287 | + if (destination == null) { |
| 288 | + if (info.getDestination() == null) { |
| 289 | + throw new UnsupportedOperationException("A destination must be specified."); |
| 290 | + } |
| 291 | + throw new InvalidDestinationException("Don't understand null destinations"); |
| 292 | + } |
| 293 | + |
| 294 | + ActiveMQDestination dest; |
| 295 | + if (destination.equals(info.getDestination())) { |
| 296 | + dest = (ActiveMQDestination)destination; |
| 297 | + } else if (info.getDestination() == null) { |
| 298 | + dest = ActiveMQDestination.transform(destination); |
| 299 | + } else { |
| 300 | + throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); |
| 301 | + } |
| 302 | + if (dest == null) { |
| 303 | + throw new JMSException("No destination specified"); |
| 304 | + } |
| 305 | + |
| 306 | + if (transformer != null) { |
| 307 | + Message transformedMessage = transformer.producerTransform(session, this, message); |
| 308 | + if (transformedMessage != null) { |
| 309 | + message = transformedMessage; |
| 310 | + } |
| 311 | + } |
| 312 | + |
| 313 | + if (producerWindow != null) { |
| 314 | + try { |
| 315 | + producerWindow.waitForSpace(); |
| 316 | + } catch (InterruptedException e) { |
| 317 | + throw new JMSException("Send aborted due to thread interrupt."); |
| 318 | + } |
| 319 | + } |
| 320 | + |
| 321 | + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, |
| 322 | + disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback); |
| 323 | + |
| 324 | + stats.onMessage(); |
260 | 325 | } |
261 | 326 |
|
262 | | - public void send(Message message, AsyncCallback onComplete) throws JMSException { |
| 327 | + |
| 328 | + public void send(Message message, AsyncCallback onComplete) throws JMSException { |
263 | 329 | this.send(this.getDestination(), |
264 | 330 | message, |
265 | 331 | this.defaultDeliveryMode, |
|
0 commit comments