Sprawdzimy dziś możliwości, jakie mamy używając Spring JMS Session Acknowledge Mode jeśli chodzi o ponowne dostarczenie wiadomości w przypadku gdy coś poszło nie tak. SessionAcknowledgeMode to ustawienie SessionAcknowledgeMode, które wspiera 'redelivery’.
Do wyboru mamy cztery tryby powiadomień:
Session.AUTO_ACKNOWLEDGE
– DEFAULT- Potwierdzenie wysłane zaraz po otrzymaniu wiadomości i przed procesowniem jej przez warstwę aplikacji.
- Więcej informacji można uzyskać https://docs.spring.io/spring-framework/docs/4.3.x/spring-framework-reference/html/jms.html
Session.DUPS_OK_ACKNOWLEDGE
- Potwierdzenie obsługiwane jest przez JMS automatycznie. Może być wysłane zbiorczo dopiero po kilku otrzymanych wiadomościach. Możliwe jest otrzymanie duplikatów.
- Jest to bardzo wydajna metoda potwierdzania wiadomości jednak aplikacja musi być przygotowana na radzenie sobie z duplikatami – np. poprzez rozpoznawanie ich i odrzucanie.
Session.CLIENT_ACKNOWLEDGE
- Odbiorca może zdecydować kiedy potwierdza otrzymanie wiadomości.
- Np.: Jeśli zakładamy, że dobrze sparsowana wiadomość jest wystarczająca do bezproblemowego dalszego procesowania to warto wysłać acknowlege tuż po etapie parsowania.
Session.SESSION_TRANSACTED
- Tryb ten oznacza że sesja jest transakcyjna – jest to wartość informacyjna. Aktywacja tego trybu i tak wymaga ustawienia flagi
setSessionTransacted
natrue
. W tym trybie wywołaniecommit
na obiekciesession
jest równoznaczne z wysłaniem potwierdzenia.
- Tryb ten oznacza że sesja jest transakcyjna – jest to wartość informacyjna. Aktywacja tego trybu i tak wymaga ustawienia flagi
Ustawienia czy procesowanie ma być transakcyjne:
factory.setSessionTransacted(true);
Ustawienie tej zmiennej na true powoduje użycie lokalnej transakcji oraz ignorowanie powyższych trybów sessionAcknowledgeMode!
Ostatnia opcja to ustawienie managera transakcji:
factory.setTransactionManager(jmsTransactionManager);
ma to najwyższy priorytet i powoduje zignorowanie poprzednich ustawień:
sessionAcknowledgeMode oraz sessionTransacted.
Kiedy używać tranzakcyjności?
Transakcje JMS warto stosować jeśli zamierzamy zatwierdzać grupę otrzymanych wiadomości a nie pojedyncze sztuki. Tranzakcyjność użyta przy każdej pojedynczej wiadomości wprowadzi nam niepotrzebny narzut związany z zatwierdzaniem (commit) i cofaniem (rollback) transakcji.
Przygotowanie środowiska:
Konfiguracja JMS:
@Configuration @EnableJms public class JmsConfig implements JmsListenerConfigurer { @Value("${hostname}") private String hostname; @Value("${port}") private int port; @Value("${queueManager}") private String queueManager; @Value("${channel}") private String channel; @Value("${destination}") private String destination; @Autowired private JmsMessageListener jmsMessageListener; @Autowired private ApplicationContext applicationContext; @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); endpoint.setId(destination); endpoint.setDestination(destination); endpoint.setMessageListener(jmsMessageListener); DefaultJmsListenerContainerFactory factory = applicationContext.getBean("jmsFactory", DefaultJmsListenerContainerFactory.class); registrar.registerEndpoint(endpoint, factory); } @Bean public ConnectionFactory connectionFactory() throws JMSException { MQQueueConnectionFactory factory = new MQQueueConnectionFactory(); factory.setHostName(hostname); factory.setPort(port); factory.setQueueManager(queueManager); factory.setChannel(channel); factory.setTransportType(WMQConstants.WMQ_CM_CLIENT); return factory; } @Bean(name = "jmsFactory") public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, JmsTransactionManager jmsTransactionManager) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); factory.setReceiveTimeout(60000L); return factory; } // Used for Producer only: @Bean public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) { return new JmsTemplate(connectionFactory); } @Bean public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) { return new JmsTransactionManager(connectionFactory); } }
Konfiugracja Listener-a:
@Slf4j @Component public class JmsMessageListener implements MessageListener { @Autowired private JmsTransactionManager jmsTransactionManager; @Override public void onMessage(Message message) { try { String textMessage = ((TextMessage) message).getText(); log.info("Received message: " + textMessage); JmsMessageListener.counter++; } catch (JMSException e) { log.error("Failed: ", e); } } // for verification static int counter = 0; }
Konfiguracja nadawcy:
@ExtendWith(SpringExtension.class) @SpringBootTest(classes = SpringjmsApplication.class) @ContextConfiguration(classes = JmsConfig.class) class JmsProducerTest { @Autowired private JmsTemplate jmsTemplate; @Value("${destination}") private String destination; @Test public void sendMessageTest() throws InterruptedException { jmsTemplate.send(destination, messageCreator -> messageCreator.createTextMessage(createMessage())); TimeUnit.SECONDS.sleep(10); Assert.assertEquals(1, JmsMessageListener.counter); } private String createMessage() { String sentTime = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT).withZone(ZoneOffset.UTC) .format(Instant.now()); return "Message content. Sent at: " + sentTime; } }
Poprawne wykonanie
Po odpaleniu powyższego kodu, wiadomość zostanie wysłana jeden raz i test zakończy się sukcesem.
1. AUTO_ACKNOWLEDGE
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
1.1 Rzucenie wyjątku w onMessage – AUTO ACK
W przypadku auto acknowledge mamy ograniczone możliwości jeśli chodzi o obsługę powiadomień JMS. Spróbujmy rzucić exception w Message Listenerze:
public void onMessage(Message message) { try { String textMessage = ((TextMessage) message).getText(); log.info("Received message: " + textMessage); JmsMessageListener.counter++; throw new RuntimeException("Message has not been processed correctly"); } catch (JMSException e) { log.error("Failed: ", e); } }
2019-01-04 12:12:24.985 INFO 1724 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:12 AM java.lang.RuntimeException: Message has not been processed correctly
Potwierdzenie zostało wysłane automatycznie. Exception nie spowodował wysłania wiadomości ponownie.
2. CLIENT_ACKNOWLEDGE
Zmieńmy teraz rodzaj potwierdzenia na CLIENT_ACKNOWLEDGE:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
2.1 Rzucenie wyjątku w onMessage – CLIENT ACK
Tym razem test sfailował, gdyż ilość odebranych message będzie dużo większa. Rzucamy exception w onMessage, więc wiadomość jest uznawana za niedostarczoną i następuje redelivery:
2019-01-04 12:21:27.376 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 12:21:28.820 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 12:21:29.174 INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 11:21 AM java.lang.RuntimeException: Message has not been processed correctly ...
java.lang.AssertionError: expected:<1> but was:<332> Expected :1 Actual :332
2.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Spróbujmy to naprawić dodając wywołanie message.acknowledge()
przed rzuceniem wyjątku:
@Override public void onMessage(Message message) { try { String textMessage = ((TextMessage) message).getText(); log.info("Received message: " + textMessage); JmsMessageListener.counter++; message.acknowledge(); throw new RuntimeException("Message has not been processed correctly"); } catch (JMSException e) { log.error("Failed: ", e); } }
Tym razem wiadomość otrzymamy tylko raz. Wywołanie potwierdzenia odebrania wiadomości spowodowało, że nawet jeśli jest potem rzucony wyjątek to i tak nie ma to wpływu na redelivery tej wiadomości ponownie.
3. DUPS_OK_ACKNOWLEDGE
W przypadku zmiany na DUPS_OK_ACKNOWLEDGE zgadzamy się na zarządzanie powiadomieniami przez JMS i nie mamy wpływu na to, kiedy zostanie potwierdzenie wysłane. Z tą opcją uruchamiając test kilka razy pod rząd otrzymamy duplikaty. Dopiero po pewnym czasie otrzymanie wiadomości zostanie potwierdzone – leniwe potwierdzenie.
4. SESSION_TRANSACTED
Sama zmiana na SESSION TRANSACTED nie jest wystarczająca. Przy próbie odebrania wiadomości dostaniemy WARNING oraz wiadomości nie będą odbierane:
2019-01-04 13:07:46.069 WARN 7896 --- [nerContainer-18] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: JMSCC0097: Bad acknowlegement mode '0'. Connection.createSession() was asked to create a non-transacted session (transacted was false) but the acknowledgement mode was not one of AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE.
Niezbędne tutaj jest włączenie transakcyjności.
5. Lokalna transakcyjność
Włączenie lokalnej transakcyjności:
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED); factory.setSessionTransacted(true);
5.1 Rzucenie wyjątku w onMessage – TRANSACTED=true
Przy rzuceniu wyjątku tutaj będziemy mieć taką samą sytuację jak w punkcie 2.1. Wiadomość nie zostanie potwierdzona i JMS będzie nam próbował ją wysłać w kółko, aż do skutku.
5.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Niestety tym razem (w przeciwieństwie do punktu 2.2) potwierdzenie przez message.acknowledge() nam nie pomoże:
2019-01-04 13:39:00.974 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.329 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.689 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly
6. Użycie managera transakcji a cache level.
Przy użyciu JmsTransactionManager możemy ustawić odpowiedni CacheLevel. Mamy następujące opcje do wyboru:
- CACHE_AUTO
- CACHE_NONE
- CACHE_CONNECTION
- CACHE_SESSION
- CACHE_CONSUMER
Ustawienie cache level:
Dopóki nie używamy managera transakcji cache level ustawiamy na CACHE_CONSUMER – jest to default dla lokalnych zasobów JMS.Jeśli natomiast używamy zewnętrznego managera transakcji default to CACHE_NONE. Możemy jednak bez obaw zmienić tą wartość na co najmniej CACHE_CONNECTION.
@Bean(name = "jmsFactory") public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, JmsTransactionManager jmsTransactionManager) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setTransactionManager(jmsTransactionManager); factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION); factory.setReceiveTimeout(60000L); return factory; }
6.1 Rzucenie wyjątku w onMessage – JmsTransactionManager
Cache Level: CACHE_NONE, CACHE_CONNECTION
Podobnie jak wcześniej oraz w punkcie 2.1. wiadomość nie zostanie potwierdzona i będzie dostarczana aż do skutku.
2019-01-04 13:39:00.974 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.329 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly 2019-01-04 13:39:01.689 INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 12:38 PM java.lang.RuntimeException: Message has not been processed correctly
6.2 Potwierdzenie odebrania przed rzuceniem wyjątku:
Niestety potwierdzenie przez message.acknowledge() i zacommitowanie transakcji przed rzuceniem wyjątku też nam nic nie pomoże. Message będzie na kolejce.
6.3 Rollback na jmsTransactionManager
Dodatkowo w tym przypadku możemy zawołać rollback na JmsTransactionManager. Dzięki temu wycofamy transakcje i wiadomość wróci na kolekę do ponownego dostarczenia:
public class JmsMessageListener implements MessageListener { @Autowired private JmsTransactionManager jmsTransactionManager; @Override public void onMessage(Message message) { try { TransactionStatus transactionStatus = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition()); String textMessage = ((TextMessage) message).getText(); log.info("Received message: " + textMessage); JmsMessageListener.counter++; log.info("Is tx completed: " + transactionStatus.isCompleted()); jmsTransactionManager.rollback(transactionStatus); log.info("Is tx completed: " + transactionStatus.isCompleted()); } catch (JMSException e) { log.error("Failed: ", e); } }
2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 1:41 PM 2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Is tx completed: false 2019-01-04 14:43:09.425 INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener : Is tx completed: true 2019-01-04 14:43:15.280 WARN 15100 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: Transaction rolled back because it has been marked as rollback-only 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Received message: Message content. Sent at: 1/4/19 1:41 PM 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Is tx completed: false 2019-01-04 14:43:20.319 INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener : Is tx completed: true ...