Skip to content

Commit 0bf6b1d

Browse files
authored
JMS multiConsumerQueue bug fixes for polyglot inventory-springboot (#228)
* Update OracleAQConfiguration.java * jms multiconsumer bug fixes * JMS changes to handle topic and queues * update sender as topic while consumer as queue
1 parent f70085f commit 0bf6b1d

File tree

3 files changed

+78
-58
lines changed

3 files changed

+78
-58
lines changed

grabdish/inventory-springboot/src/main/java/com/springboot/inventory/InventoryApplication.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
import org.springframework.context.annotation.Configuration;
99
import org.springframework.jms.core.JmsTemplate;
1010

11-
import com.springboot.inventory.model.Order;
12-
import com.springboot.inventory.util.JsonUtils;
13-
14-
1511
@Configuration
1612
@EnableAutoConfiguration
1713
@ComponentScan
@@ -20,7 +16,7 @@ public class InventoryApplication {
2016

2117
public static void main(String[] args) {
2218
ConfigurableApplicationContext context = SpringApplication.run(InventoryApplication.class, args);
23-
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
24-
}
25-
19+
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
20+
}
21+
2622
}

grabdish/inventory-springboot/src/main/java/com/springboot/inventory/config/OracleAQConfiguration.java

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,61 +2,85 @@
22

33
import java.sql.SQLException;
44

5-
import javax.jms.ConnectionFactory;
5+
import javax.jms.Destination;
66
import javax.jms.JMSException;
77
import javax.jms.QueueConnectionFactory;
8+
import javax.jms.Session;
89
import javax.sql.DataSource;
910

1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
1213
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
1315
import org.springframework.context.annotation.Bean;
1416
import org.springframework.context.annotation.Configuration;
15-
import org.springframework.jms.core.JmsTemplate;
17+
import org.springframework.core.env.Environment;
18+
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
19+
import org.springframework.jms.config.JmsListenerContainerFactory;
20+
import org.springframework.jms.support.destination.DynamicDestinationResolver;
1621

1722
import oracle.jdbc.pool.OracleDataSource;
1823
import oracle.jms.AQjmsFactory;
19-
import oracle.ucp.jdbc.PoolDataSource;
20-
import org.springframework.core.env.Environment;
2124

2225
@Configuration
2326
public class OracleAQConfiguration {
24-
Logger logger = LoggerFactory.getLogger(OracleAQConfiguration.class);
25-
26-
@Autowired
27-
private Environment environment;
28-
29-
@Bean
30-
public DataSource dataSource() throws SQLException {
31-
OracleDataSource ds = new OracleDataSource();
32-
33-
ds.setUser(environment.getProperty("db_user"));
34-
logger.info("USER: "+ environment.getProperty("db_user"));
35-
36-
ds.setPassword(environment.getProperty("db_password"));
37-
logger.info("Password: "+ environment.getProperty("db_password"));
38-
39-
ds.setURL(environment.getProperty("db_url"));
40-
logger.info("URL: "+ environment.getProperty("db_url"));
41-
42-
logger.info("OracleAQConfiguration-->dataSource success"+ds);
43-
return ds;
44-
}
45-
46-
@Bean
47-
public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException, SQLException {
48-
logger.info("OracleAQConfiguration-->AQ factory success");
49-
return AQjmsFactory.getQueueConnectionFactory(dataSource);
50-
}
51-
52-
@Bean
53-
public JmsTemplate jmsTemplate(ConnectionFactory conFactory) throws Exception{
54-
JmsTemplate jmsTemplate = new JmsTemplate();
55-
jmsTemplate.setDefaultDestinationName("inventoryqueue");
56-
jmsTemplate.setSessionTransacted(true);
57-
jmsTemplate.setConnectionFactory(conFactory);
58-
59-
logger.info("Jms Configuration-->jms template success");
60-
return jmsTemplate;
61-
}
27+
Logger logger = LoggerFactory.getLogger(OracleAQConfiguration.class);
28+
29+
@Autowired
30+
private Environment environment;
31+
32+
@Bean
33+
public DataSource dataSource() throws SQLException {
34+
OracleDataSource ds = new OracleDataSource();
35+
36+
ds.setUser(environment.getProperty("db_user"));
37+
logger.info("USER: " + environment.getProperty("db_user"));
38+
39+
ds.setPassword(environment.getProperty("db_password"));
40+
logger.info("Password: " + environment.getProperty("db_password"));
41+
42+
ds.setURL(environment.getProperty("db_url"));
43+
logger.info("URL: " + environment.getProperty("db_url"));
44+
45+
logger.info("OracleAQConfiguration: dataSource success" + ds);
46+
return ds;
47+
}
48+
49+
@Bean
50+
public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException, SQLException {
51+
logger.info("OracleAQConfiguration: connectionFactory success");
52+
return AQjmsFactory.getQueueConnectionFactory(dataSource);
53+
}
54+
55+
@Bean
56+
public JmsListenerContainerFactory<?> queueConnectionFactory(QueueConnectionFactory connectionFactory,
57+
DefaultJmsListenerContainerFactoryConfigurer configurer) {
58+
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
59+
configurer.configure(factory, connectionFactory);
60+
factory.setPubSubDomain(false);
61+
return factory;
62+
}
63+
64+
@Bean
65+
public JmsListenerContainerFactory<?> topicConnectionFactory(QueueConnectionFactory connectionFactory,
66+
DefaultJmsListenerContainerFactoryConfigurer configurer) {
67+
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
68+
configurer.configure(factory, connectionFactory);
69+
factory.setPubSubDomain(true);
70+
return factory;
71+
}
72+
73+
@Bean
74+
public DynamicDestinationResolver destinationResolver() {
75+
return new DynamicDestinationResolver() {
76+
@Override
77+
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
78+
throws JMSException {
79+
if (destinationName.contains("INVENTORY")) {
80+
pubSubDomain = true;
81+
}
82+
return super.resolveDestinationName(session, destinationName, pubSubDomain);
83+
}
84+
};
85+
}
6286
}

grabdish/inventory-springboot/src/main/java/com/springboot/inventory/listener/JMSReceiver.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.springboot.inventory.listener;
22

3+
import javax.jms.JMSException;
4+
35
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
57
import org.springframework.beans.factory.annotation.Autowired;
@@ -25,27 +27,26 @@ public class JMSReceiver {
2527

2628
Logger logger = LoggerFactory.getLogger(JMSReceiver.class);
2729

28-
@JmsListener(destination = "orderqueue")
29-
public void listenOrderEvent(String message, AQjmsSession session) {
30+
@JmsListener(destination = "ORDER_QUEUE", containerFactory = "queueConnectionFactory")
31+
public void listenOrderEvent(String message, AQjmsSession session) throws JMSException {
3032
Order order = JsonUtils.read(message, Order.class);
3133

32-
logger.info("ListenOrderEvenet orderMessage:" + message);
33-
logger.info("ListenOrderEvenet Session" + session);
34+
logger.info("ListenOrderEvenet orderMessage :" + message);
3435

3536
String location = evaluateInventory(order, session);
3637
inventoryEvent(order.getOrderid(), order.getItemid(), location);
38+
3739
logger.info("Received Message Session: " + session);
3840
}
3941

40-
public void inventoryEvent(String orderId, String itemId, String location) {
42+
public void inventoryEvent(String orderId, String itemId, String location) throws JMSException {
4143

4244
InventoryTable inventory = new InventoryTable(orderId, itemId, location, "beer");
4345
String jsonString = JsonUtils.writeValueAsString(inventory);
44-
logger.info("Inventory msg" + jsonString + "\n");
4546

46-
jmsTemplate.convertAndSend("inventoryqueue", jsonString);
47-
logger.info(jmsTemplate.getDefaultDestinationName());
47+
jmsTemplate.convertAndSend("INVENTORY_QUEUE", jsonString);
4848

49+
logger.info("Inventory template" + jsonString + "\n");
4950
}
5051

5152
public String evaluateInventory(Order order, AQjmsSession session) {
@@ -64,4 +65,3 @@ public String evaluateInventory(Order order, AQjmsSession session) {
6465
}
6566

6667
}
67-

0 commit comments

Comments
 (0)