Sprawdzimy dziś możliwości, jakie mamy używając Spring JMS Session Acknowledge Mode jeśli chodzi o ponowne dostarczenie wiadomości w przypadku gdy coś poszło nie tak. SessionAcknowledgeMode to ustawienie SessionAcknowledgeMode, które wspiera 'redelivery’.
Do wyboru mamy cztery tryby powiadomień:
Session.AUTO_ACKNOWLEDGE– DEFAULT- Potwierdzenie wysłane zaraz po otrzymaniu wiadomości i przed procesowniem jej przez warstwę aplikacji.
- Więcej informacji można uzyskać https://docs.spring.io/spring-framework/docs/4.3.x/spring-framework-reference/html/jms.html
Session.DUPS_OK_ACKNOWLEDGE- Potwierdzenie obsługiwane jest przez JMS automatycznie. Może być wysłane zbiorczo dopiero po kilku otrzymanych wiadomościach. Możliwe jest otrzymanie duplikatów.
- Jest to bardzo wydajna metoda potwierdzania wiadomości jednak aplikacja musi być przygotowana na radzenie sobie z duplikatami – np. poprzez rozpoznawanie ich i odrzucanie.
Session.CLIENT_ACKNOWLEDGE- Odbiorca może zdecydować kiedy potwierdza otrzymanie wiadomości.
- Np.: Jeśli zakładamy, że dobrze sparsowana wiadomość jest wystarczająca do bezproblemowego dalszego procesowania to warto wysłać acknowlege tuż po etapie parsowania.
Session.SESSION_TRANSACTED- Tryb ten oznacza że sesja jest transakcyjna – jest to wartość informacyjna. Aktywacja tego trybu i tak wymaga ustawienia flagi
setSessionTransactednatrue. W tym trybie wywołaniecommitna obiekciesessionjest równoznaczne z wysłaniem potwierdzenia.
- Tryb ten oznacza że sesja jest transakcyjna – jest to wartość informacyjna. Aktywacja tego trybu i tak wymaga ustawienia flagi
Ustawienia czy procesowanie ma być transakcyjne:
factory.setSessionTransacted(true);
Ustawienie tej zmiennej na true powoduje użycie lokalnej transakcji oraz ignorowanie powyższych trybów sessionAcknowledgeMode!
Ostatnia opcja to ustawienie managera transakcji:
factory.setTransactionManager(jmsTransactionManager);
ma to najwyższy priorytet i powoduje zignorowanie poprzednich ustawień:
sessionAcknowledgeMode oraz sessionTransacted.
Kiedy używać tranzakcyjności?
Transakcje JMS warto stosować jeśli zamierzamy zatwierdzać grupę otrzymanych wiadomości a nie pojedyncze sztuki. Tranzakcyjność użyta przy każdej pojedynczej wiadomości wprowadzi nam niepotrzebny narzut związany z zatwierdzaniem (commit) i cofaniem (rollback) transakcji.
Przygotowanie środowiska:
Konfiguracja JMS:
@Configuration
@EnableJms
public class JmsConfig implements JmsListenerConfigurer {
@Value("${hostname}")
private String hostname;
@Value("${port}")
private int port;
@Value("${queueManager}")
private String queueManager;
@Value("${channel}")
private String channel;
@Value("${destination}")
private String destination;
@Autowired
private JmsMessageListener jmsMessageListener;
@Autowired
private ApplicationContext applicationContext;
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(destination);
endpoint.setDestination(destination);
endpoint.setMessageListener(jmsMessageListener);
DefaultJmsListenerContainerFactory factory =
applicationContext.getBean("jmsFactory", DefaultJmsListenerContainerFactory.class);
registrar.registerEndpoint(endpoint, factory);
}
@Bean
public ConnectionFactory connectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName(hostname);
factory.setPort(port);
factory.setQueueManager(queueManager);
factory.setChannel(channel);
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
return factory;
}
@Bean(name = "jmsFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
JmsTransactionManager jmsTransactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
factory.setReceiveTimeout(60000L);
return factory;
}
// Used for Producer only:
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}Konfiugracja Listener-a:
@Slf4j
@Component
public class JmsMessageListener implements MessageListener {
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Override
public void onMessage(Message message) {
try {
String textMessage = ((TextMessage) message).getText();
log.info("Received message: " + textMessage);
JmsMessageListener.counter++;
} catch (JMSException e) {
log.error("Failed: ", e);
}
}
// for verification
static int counter = 0;
}Konfiguracja nadawcy:
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = SpringjmsApplication.class)
@ContextConfiguration(classes = JmsConfig.class)
class JmsProducerTest {
@Autowired
private JmsTemplate jmsTemplate;
@Value("${destination}")
private String destination;
@Test
public void sendMessageTest() throws InterruptedException {
jmsTemplate.send(destination, messageCreator -> messageCreator.createTextMessage(createMessage()));
TimeUnit.SECONDS.sleep(10);
Assert.assertEquals(1, JmsMessageListener.counter);
}
private String createMessage() {
String sentTime = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT).withZone(ZoneOffset.UTC)
.format(Instant.now());
return "Message content. Sent at: " + sentTime;
}
}Poprawne wykonanie
Po odpaleniu powyższego kodu, wiadomość zostanie wysłana jeden raz i test zakończy się sukcesem.
1. AUTO_ACKNOWLEDGE
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
1.1 Rzucenie wyjątku w onMessage – AUTO ACK
W przypadku auto acknowledge mamy ograniczone możliwości jeśli chodzi o obsługę powiadomień JMS. Spróbujmy rzucić exception w Message Listenerze:
public void onMessage(Message message) {
try {
String textMessage = ((TextMessage) message).getText();
log.info("Received message: " + textMessage);
JmsMessageListener.counter++;
throw new RuntimeException("Message has not been processed correctly");
} catch (JMSException e) {
log.error("Failed: ", e);
}
}2019-01-04 12:12:24.985 INFO 1724 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:12 AM java.lang.RuntimeException: Message has not been processed correctly
Potwierdzenie zostało wysłane automatycznie. Exception nie spowodował wysłania wiadomości ponownie.
2. CLIENT_ACKNOWLEDGE
Zmieńmy teraz rodzaj potwierdzenia na CLIENT_ACKNOWLEDGE:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
2.1 Rzucenie wyjątku w onMessage – CLIENT ACK
Tym razem test sfailował, gdyż ilość odebranych message będzie dużo większa. Rzucamy exception w onMessage, więc wiadomość jest uznawana za niedostarczoną i następuje redelivery:
2019-01-04 12:21:27.376 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 12:21:28.820 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 12:21:29.174 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly ...
java.lang.AssertionError: expected:<1> but was:<332> Expected :1 Actual :332
2.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Spróbujmy to naprawić dodając wywołanie message.acknowledge() przed rzuceniem wyjątku:
@Override
public void onMessage(Message message) {
try {
String textMessage = ((TextMessage) message).getText();
log.info("Received message: " + textMessage);
JmsMessageListener.counter++;
message.acknowledge();
throw new RuntimeException("Message has not been processed correctly");
} catch (JMSException e) {
log.error("Failed: ", e);
}
}Tym razem wiadomość otrzymamy tylko raz. Wywołanie potwierdzenia odebrania wiadomości spowodowało, że nawet jeśli jest potem rzucony wyjątek to i tak nie ma to wpływu na redelivery tej wiadomości ponownie.
3. DUPS_OK_ACKNOWLEDGE
W przypadku zmiany na DUPS_OK_ACKNOWLEDGE zgadzamy się na zarządzanie powiadomieniami przez JMS i nie mamy wpływu na to, kiedy zostanie potwierdzenie wysłane. Z tą opcją uruchamiając test kilka razy pod rząd otrzymamy duplikaty. Dopiero po pewnym czasie otrzymanie wiadomości zostanie potwierdzone – leniwe potwierdzenie.
4. SESSION_TRANSACTED
Sama zmiana na SESSION TRANSACTED nie jest wystarczająca. Przy próbie odebrania wiadomości dostaniemy WARNING oraz wiadomości nie będą odbierane:
2019-01-04 13:07:46.069 WARN 7896 --- [nerContainer-18] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: JMSCC0097: Bad acknowlegement mode '0'. Connection.createSession() was asked to create a non-transacted session (transacted was false) but the acknowledgement mode was not one of AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE.
Niezbędne tutaj jest włączenie transakcyjności.
5. Lokalna transakcyjność
Włączenie lokalnej transakcyjności:
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setSessionTransacted(true);5.1 Rzucenie wyjątku w onMessage – TRANSACTED=true
Przy rzuceniu wyjątku tutaj będziemy mieć taką samą sytuację jak w punkcie 2.1. Wiadomość nie zostanie potwierdzona i JMS będzie nam próbował ją wysłać w kółko, aż do skutku.
5.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Niestety tym razem (w przeciwieństwie do punktu 2.2) potwierdzenie przez message.acknowledge() nam nie pomoże:
2019-01-04 13:39:00.974 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.329 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.689 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly
6. Użycie managera transakcji a cache level.
Przy użyciu JmsTransactionManager możemy ustawić odpowiedni CacheLevel. Mamy następujące opcje do wyboru:
- CACHE_AUTO
- CACHE_NONE
- CACHE_CONNECTION
- CACHE_SESSION
- CACHE_CONSUMER
Ustawienie cache level:
Dopóki nie używamy managera transakcji cache level ustawiamy na CACHE_CONSUMER – jest to default dla lokalnych zasobów JMS.Jeśli natomiast używamy zewnętrznego managera transakcji default to CACHE_NONE. Możemy jednak bez obaw zmienić tą wartość na co najmniej CACHE_CONNECTION.
@Bean(name = "jmsFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
JmsTransactionManager jmsTransactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setTransactionManager(jmsTransactionManager);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
factory.setReceiveTimeout(60000L);
return factory;
}6.1 Rzucenie wyjątku w onMessage – JmsTransactionManager
Cache Level: CACHE_NONE, CACHE_CONNECTION
Podobnie jak wcześniej oraz w punkcie 2.1. wiadomość nie zostanie potwierdzona i będzie dostarczana aż do skutku.
2019-01-04 13:39:00.974 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.329 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.689 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly
6.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Niestety potwierdzenie przez message.acknowledge() i zacommitowanie transakcji przed rzuceniem wyjątku też nam nic nie pomoże. Message będzie na kolejce.
6.3 Rollback na jmsTransactionManager
Dodatkowo w tym przypadku możemy zawołać rollback na JmsTransactionManager. Dzięki temu wycofamy transakcje i wiadomość wróci na kolekę do ponownego dostarczenia:
public class JmsMessageListener implements MessageListener {
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Override
public void onMessage(Message message) {
try {
TransactionStatus transactionStatus = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
String textMessage = ((TextMessage) message).getText();
log.info("Received message: " + textMessage);
JmsMessageListener.counter++;
log.info("Is tx completed: " + transactionStatus.isCompleted());
jmsTransactionManager.rollback(transactionStatus);
log.info("Is tx completed: " + transactionStatus.isCompleted());
} catch (JMSException e) {
log.error("Failed: ", e);
}
}2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 1:41 PM 2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Is tx completed: false 2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Is tx completed: true 2019-01-04 14:43:15.280 WARN 15100 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: Transaction rolled back because it has been marked as rollback-only 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 1:41 PM 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Is tx completed: false 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Is tx completed: true ...