2727import jakarta .jms .MessageProducer ;
2828import jakarta .jms .Session ;
2929
30+ import java .util .concurrent .atomic .AtomicInteger ;
3031import junit .framework .TestCase ;
3132
3233import org .apache .activemq .ActiveMQConnectionFactory ;
3334import org .apache .activemq .broker .BrokerService ;
3435import org .apache .activemq .command .ActiveMQQueue ;
36+ import org .apache .activemq .util .DefaultIOExceptionHandler ;
37+ import org .apache .activemq .util .IOExceptionHandler ;
38+ import org .apache .activemq .util .Wait ;
3539import org .apache .logging .log4j .Level ;
3640import org .apache .logging .log4j .LogManager ;
3741import org .apache .logging .log4j .core .LogEvent ;
@@ -236,6 +240,69 @@ public void append(LogEvent event) {
236240 assertFalse ("Did not replay any records from the journal" , didSomeRecovery .get ());
237241 }
238242
243+
244+ /**
245+ * Test the checkpoint runner task continues to run if the configured
246+ * IOExceptionHandler throws a runtime exception while processing
247+ * the IOException it is handling and the broker is not shut down. This
248+ * could happen if using the DefaultIOExceptionHandler and startStopConnectors
249+ * is set to true, or if a user provides their own IOExceptionHandler that
250+ * throws an exception.
251+ */
252+ public void testCheckpointExceptionKeepRunning () throws Exception {
253+ testCheckpointIOException (true );
254+ }
255+
256+ /**
257+ * Test the broker shuts down by when DefaultIOExceptionHandler
258+ * handles an IOException thrown by the checkpoint runner task. This is the
259+ * default behavior of the broker if not configured with a custom
260+ * IOExceptionHandler and startStopConnectors is false
261+ */
262+ public void testCheckpointExceptionShutdown () throws Exception {
263+ testCheckpointIOException (false );
264+ }
265+
266+ private void testCheckpointIOException (boolean startStopConnectors ) throws Exception {
267+ final AtomicInteger iterations = new AtomicInteger ();
268+ // Create a store that always throws an IOException when checkpoint is called
269+ final KahaDBStore kaha = new KahaDBStore () {
270+ @ Override
271+ protected void checkpointCleanup (boolean cleanup ) throws IOException {
272+ iterations .incrementAndGet ();
273+ throw new IOException ("fail" );
274+ }
275+ };
276+ kaha .setDirectory (new File ("target/activemq-data/kahadb" ));
277+ kaha .deleteAllMessages ();
278+ // Set the checkpoint interval to be very short so we can quickly
279+ // check number of iterations
280+ kaha .setCheckpointInterval (100 );
281+
282+ BrokerService broker = createBroker (kaha );
283+ DefaultIOExceptionHandler ioExceptionHandler = new DefaultIOExceptionHandler ();
284+ ioExceptionHandler .setStopStartConnectors (startStopConnectors );
285+ broker .setIoExceptionHandler (ioExceptionHandler );
286+ broker .start ();
287+
288+ try {
289+ if (startStopConnectors ) {
290+ // If startStopConnectors is true, the task should continue with future iterations
291+ // as the SuppressReplyException that will be thrown is now handled so just verify
292+ // we see 10 iterations which should happen quickly
293+ assertTrue (Wait .waitFor (() -> iterations .get () == 10 , 2000 , 100 ));
294+ // broker should not be stopped
295+ assertFalse (broker .isStopped ());
296+ } else {
297+ // If startStopConnectors is false, an IOException should shut down the broker
298+ // which is the normal behavior
299+ assertTrue (Wait .waitFor (broker ::isStopped , 2000 , 100 ));
300+ }
301+ } finally {
302+ broker .stop ();
303+ }
304+ }
305+
239306 private void assertExistsAndDelete (File file ) {
240307 assertTrue (file .exists ());
241308 file .delete ();
@@ -281,4 +348,4 @@ private String createContent(int i) {
281348 return sb .toString ();
282349 }
283350
284- }
351+ }
0 commit comments