Skip to content

Commit 361bc1c

Browse files
authored
ParsingFailureAction and related API changes (#7)
* sealed trait Delivery - Ok and MalformedContent * PullResult instead of Option for RabbitMQPullConsumer * ConversionException to api module
1 parent ecd16f5 commit 361bc1c

File tree

24 files changed

+421
-228
lines changed

24 files changed

+421
-228
lines changed

Migration-5-6.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ Changes in Scala API:
1111
1. The whole API is _finally tagless_ - all methods return `F[_]`. See [related section](README.md#scala-usage) in docs.
1212
1. The API now uses type-conversions - provide type and related converter when creating producer/consumer.
1313
See [related section](README.md#providing-converters-for-producer/consumer) in docs.
14-
1. The `Delivery` is now generic - e.g. `Delivery[Bytes]` (depends on type-conversion).
14+
1. The `Delivery` is now sealed trait - there are `Delivery.Ok[A]` (e.g. `Delivery[Bytes]`, depends on type-conversion) and `Delivery.MalformedContent`.
15+
After getting the `Delivery[A]` you should pattern-match it.
1516
1. The API now requires an implicit `monix.execution.Scheduler` instead of `ExecutionContext`.
1617
1. Methods like `RabbitMQConnection.declareQueue` now return `F[Unit]` (was `Try[Done]` before).
1718
1. Possibility to pass manually created configurations (`ProducerConfig` etc.) is now gone. The only option is to use TypeSafe config.
1819
1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing.
1920
1. There are new methods in [`RabbitMQConnection`](core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala): `newChannel` and `withChannel`.
21+
1. [`RabbitMQPullConsumer`](README.md#pull-consumer) was added
2022

2123
Changes in Java API:
2224

@@ -27,3 +29,4 @@ Changes in Java API:
2729
1. Method `RabbitMQProducer.send` now returns `CompletableFuture[Void]` (was `void` before) - ***it's not blocking anymore!***
2830
1. `RabbitMQConsumer` and `RabbitMQProducer` (`api` module) are now traits and have their `Default*` counterparts in `core` module
2931
1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing.
32+
1. `RabbitMQPullConsumer` was added

README.md

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,13 @@ val monitor: Monitor = ???
182182
// if you expect very high load, you can use separate connections for each producer/consumer, but it's usually not needed
183183
val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, blockingExecutor) // DefaultRabbitMQConnection[Task]
184184

185-
val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { (delivery: Delivery[Bytes]) =>
186-
println(delivery)
187-
Task.now(DeliveryResult.Ack)
185+
val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) {
186+
case delivery: Delivery.Ok[Bytes] =>
187+
println(delivery)
188+
Task.now(DeliveryResult.Ack)
189+
190+
case _: Delivery.MalformedContent =>
191+
Task.now(DeliveryResult.Reject)
188192
} // DefaultRabbitMQConsumer
189193

190194
val sender = rabbitConnection.newProducer("producer", monitor) // DefaultRabbitMQProducer[Task]
@@ -295,6 +299,9 @@ public class ExampleJava {
295299
}
296300
```
297301

302+
The Java API has some limitations compared to the Scala one - mainly it does not support [types conversions](#providing-converters-for-producer/consumer)
303+
and it offers only asynchronous version with `CompletableFuture` as result of all operations.
304+
298305
See [full example](core/src/test/java/ExampleJava.java)
299306

300307
## Notes
@@ -360,7 +367,12 @@ Check [reference.conf](core/src/main/resources/reference.conf) for all options o
360367
Sometimes your use-case just doesn't fit the _normal_ consumer scenario. Here you can use the _pull consumer_ which gives you much more
361368
control over the received messages. You _pull_ new message from the queue and acknowledge (reject, ...) it somewhere in the future.
362369

363-
The pull consumer operates with `Option` which is used for expressing either getting the delivery _or_ detecting an empty queue.
370+
The pull consumer uses `PullResult` as return type:
371+
* Ok - contains `DeliveryWithHandle` instance
372+
* EmptyQueue - there was no message in the queue available
373+
374+
Additionally you can call `.toOption` method on the `PullResult`.
375+
364376

365377
A simplified example:
366378
```scala
@@ -377,14 +389,14 @@ val consumer = connection.newPullConsumer[Bytes](???, ???)
377389

378390

379391
// receive "up to" 100 deliveries
380-
val deliveries: Future[Seq[Option[DeliveryWithHandle[Future, Bytes]]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) }
392+
val deliveries: Future[Seq[PullResult[Future, Bytes]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) }
381393

382394
// do your stuff!
383395

384396
???
385397

386-
// "handle" all deliveries
387-
val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatten.map(_.handle(DeliveryResult.Ack))).map(_ => Unit))
398+
// "handle" all deliveries, ignore failures and "empty queue" results
399+
val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatMap(_.toOption).map(_.handle(DeliveryResult.Ack))).map(_ => Unit))
388400

389401
consumer.close()
390402
connection.close()
@@ -425,10 +437,7 @@ case class NewFileSourceAdded(fileSources: Seq[FileSource])
425437
val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
426438
JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
427439
GpbDeliveryConverter[NewFileSourceAddedGpb].derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
428-
)(
429-
businessLogic.processMessage,
430-
failureHandler
431-
)
440+
)(businessLogic.processMessage)
432441
```
433442
(see [unit test](core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala) for full example)
434443

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
11
package com.avast.clients.rabbitmq.api
22

3-
case class Delivery[A](body: A, properties: MessageProperties, routingKey: String)
3+
import com.avast.bytes.Bytes
4+
5+
sealed trait Delivery[+A] {
6+
def properties: MessageProperties
7+
8+
def routingKey: String
9+
}
10+
11+
object Delivery {
12+
13+
case class Ok[+A](body: A, properties: MessageProperties, routingKey: String) extends Delivery[A]
14+
15+
case class MalformedContent(body: Bytes, properties: MessageProperties, routingKey: String, ce: ConversionException)
16+
extends Delivery[Nothing]
17+
18+
def apply[A](body: A, properties: MessageProperties, routingKey: String): Delivery.Ok[A] = {
19+
Delivery.Ok(body, properties, routingKey)
20+
}
21+
}

api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQPullConsumer.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,30 @@ import scala.language.higherKinds
55
trait RabbitMQPullConsumer[F[_], A] {
66

77
/** Retrieves one message from the queue, if there is any.
8-
*
9-
* @return Some(DeliveryHandle[A]) when there was a message available; None otherwise.
108
*/
11-
def pull(): F[Option[DeliveryWithHandle[F, A]]]
9+
def pull(): F[PullResult[F, A]]
1210
}
1311

1412
/** Trait which contains `Delivery` and it's _handle through which it can be *acked*, *rejected* etc.
1513
*/
16-
trait DeliveryWithHandle[F[_], A] {
14+
trait DeliveryWithHandle[+F[_], +A] {
1715
def delivery: Delivery[A]
1816

1917
def handle(result: DeliveryResult): F[Unit]
2018
}
19+
20+
sealed trait PullResult[+F[_], +A] {
21+
def toOption: Option[DeliveryWithHandle[F, A]]
22+
}
23+
24+
object PullResult {
25+
26+
case class Ok[F[_], A](deliveryWithHandle: DeliveryWithHandle[F, A]) extends PullResult[F, A] {
27+
override def toOption: Option[DeliveryWithHandle[F, A]] = Some(deliveryWithHandle)
28+
}
29+
30+
case object EmptyQueue extends PullResult[Nothing, Nothing] {
31+
override def toOption: Option[DeliveryWithHandle[Nothing, Nothing]] = None
32+
}
33+
34+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.avast.clients.rabbitmq.api
2+
3+
case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)

api/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQPullConsumer.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,40 @@ trait RabbitMQPullConsumer extends AutoCloseable {
77

88
/**
99
* Retrieves one message from the queue, if there is any.
10-
*
11-
* @return Some(DeliveryHandle[A]) when there was a message available; None otherwise.
1210
*/
13-
def pull(): CompletableFuture[Optional[DeliveryWithHandle]]
11+
def pull(): CompletableFuture[PullResult]
1412
}
1513

1614
trait DeliveryWithHandle {
1715
def delivery: Delivery
1816

1917
def handle(result: DeliveryResult): CompletableFuture[Void]
2018
}
19+
20+
sealed trait PullResult {
21+
def toOptional: Optional[DeliveryWithHandle]
22+
23+
def isOk: Boolean
24+
25+
def isEmptyQueue: Boolean
26+
}
27+
28+
object PullResult {
29+
30+
/* These two are not _case_ intentionally - it pollutes the API for Java users */
31+
32+
class Ok(deliveryWithHandle: DeliveryWithHandle) extends PullResult {
33+
def getDeliveryWithHandle: DeliveryWithHandle = deliveryWithHandle
34+
35+
override val toOptional: Optional[DeliveryWithHandle] = Optional.of(deliveryWithHandle)
36+
override val isOk: Boolean = true
37+
override val isEmptyQueue: Boolean = false
38+
}
39+
40+
object EmptyQueue extends PullResult {
41+
override val toOptional: Optional[DeliveryWithHandle] = Optional.empty()
42+
override val isOk: Boolean = false
43+
override val isEmptyQueue: Boolean = true
44+
}
45+
46+
}

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,14 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
117117

118118
object Consumer {
119119

120-
def fromConfig[F[_]: ToTask, A: DeliveryConverter](providedConfig: Config,
121-
channel: ServerChannel,
122-
channelFactoryInfo: RabbitMQConnectionInfo,
123-
blockingScheduler: Scheduler,
124-
monitor: Monitor,
125-
consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])(
126-
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
120+
def fromConfig[F[_]: ToTask, A: DeliveryConverter](
121+
providedConfig: Config,
122+
channel: ServerChannel,
123+
channelFactoryInfo: RabbitMQConnectionInfo,
124+
blockingScheduler: Scheduler,
125+
monitor: Monitor,
126+
consumerListener: ConsumerListener,
127+
readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
127128

128129
val mergedConfig = providedConfig.withFallback(ConsumerDefaultConfig)
129130

@@ -140,24 +141,25 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
140141

141142
val consumerConfig = updatedConfig.wrapped.as[ConsumerConfig]("root")
142143

143-
create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener)(readAction)
144+
create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener, readAction)
144145
}
145146

146-
def create[F[_]: ToTask, A: DeliveryConverter](consumerConfig: ConsumerConfig,
147-
channel: ServerChannel,
148-
channelFactoryInfo: RabbitMQConnectionInfo,
149-
blockingScheduler: Scheduler,
150-
monitor: Monitor,
151-
consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])(
152-
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
147+
def create[F[_]: ToTask, A: DeliveryConverter](
148+
consumerConfig: ConsumerConfig,
149+
channel: ServerChannel,
150+
channelFactoryInfo: RabbitMQConnectionInfo,
151+
blockingScheduler: Scheduler,
152+
monitor: Monitor,
153+
consumerListener: ConsumerListener,
154+
readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
153155

154156
prepareConsumer(consumerConfig, readAction, channelFactoryInfo, channel, consumerListener, blockingScheduler, monitor)
155157
}
156158
}
157159

158160
object PullConsumer {
159161

160-
def fromConfig[F[_]: FromTask, A: DeliveryConverter](
162+
def fromConfig[F[_]: FromTask: ToTask, A: DeliveryConverter](
161163
providedConfig: Config,
162164
channel: ServerChannel,
163165
channelFactoryInfo: RabbitMQConnectionInfo,
@@ -182,11 +184,12 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
182184
create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor)
183185
}
184186

185-
def create[F[_]: FromTask, A: DeliveryConverter](consumerConfig: PullConsumerConfig,
186-
channel: ServerChannel,
187-
channelFactoryInfo: RabbitMQConnectionInfo,
188-
blockingScheduler: Scheduler,
189-
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = {
187+
def create[F[_]: FromTask: ToTask, A: DeliveryConverter](
188+
consumerConfig: PullConsumerConfig,
189+
channel: ServerChannel,
190+
channelFactoryInfo: RabbitMQConnectionInfo,
191+
blockingScheduler: Scheduler,
192+
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = {
190193

191194
preparePullConsumer(consumerConfig, channelFactoryInfo, channel, blockingScheduler, monitor)
192195
}
@@ -326,7 +329,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
326329
prepareConsumer(consumerConfig, channelFactoryInfo, channel, readAction, consumerListener, blockingScheduler, monitor)
327330
}
328331

329-
private def preparePullConsumer[F[_]: FromTask, A: DeliveryConverter](
332+
private def preparePullConsumer[F[_]: FromTask: ToTask, A: DeliveryConverter](
330333
consumerConfig: PullConsumerConfig,
331334
channelFactoryInfo: RabbitMQConnectionInfo,
332335
channel: ServerChannel,
@@ -435,15 +438,16 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
435438
}
436439

437440
val readAction: DefaultDeliveryReadAction = {
438-
val convAction: DefaultDeliveryReadAction = { (d: Delivery[Bytes]) =>
441+
val convAction: DefaultDeliveryReadAction = { d: Delivery[Bytes] =>
439442
try {
440-
implicitly[DeliveryConverter[A]].convert(d.body) match {
441-
case Right(a) =>
442-
val devA = d.copy(body = a)
443-
implicitly[ToTask[F]].apply(userReadAction(devA))
444-
445-
case Left(ce) => Task.raiseError(ce)
443+
val devA = d.flatMap { d =>
444+
implicitly[DeliveryConverter[A]].convert(d.body) match {
445+
case Right(a) => d.mapBody(_ => a)
446+
case Left(ce) => Delivery.MalformedContent(d.body, d.properties, d.routingKey, ce)
447+
}
446448
}
449+
450+
implicitly[ToTask[F]].apply(userReadAction(devA))
447451
} catch {
448452
case NonFatal(e) =>
449453
Task.raiseError(e)
@@ -468,7 +472,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
468472
implicit callbackScheduler: Scheduler): DefaultDeliveryReadAction = {
469473
import consumerConfig._
470474

471-
(delivery: Delivery[Bytes]) =>
475+
delivery: Delivery[Bytes] =>
472476
try {
473477
// we try to catch also long-lasting synchronous work on the thread
474478
val action = Task.deferFuture {
@@ -481,18 +485,18 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
481485

482486
action
483487
.timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS))
484-
.onErrorRecover {
488+
.onErrorRecoverWith {
485489
case e: TimeoutException =>
486490
traceId.foreach(Kluzo.setTraceId)
487491

488492
logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e)
489-
consumerConfig.timeoutAction
493+
Task.now(consumerConfig.timeoutAction)
490494

491495
case NonFatal(e) =>
492496
traceId.foreach(Kluzo.setTraceId)
493497

494498
logger.warn(s"[$name] Error while executing callback, applying DeliveryResult.${consumerConfig.failureAction}", e)
495-
consumerConfig.failureAction
499+
Task.now(consumerConfig.failureAction)
496500
}
497501
.executeOn(callbackScheduler)
498502
} catch {
@@ -503,6 +507,23 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
503507

504508
}
505509

510+
private def executeParsingFailureAction[A](consumerConfig: ConsumerConfig,
511+
ce: ConversionException,
512+
delivery: Delivery[Bytes],
513+
parsingFailureAction: ParsingFailureAction[Task]): Task[DeliveryResult] = {
514+
import consumerConfig._
515+
516+
// logging on DEBUG, user should provide own higher-level logging in the callback
517+
logger.debug(s"[$name] ${ce.getMessage}, executing parsingFailureAction", ce)
518+
519+
parsingFailureAction(name, delivery, ce)
520+
.onErrorRecover {
521+
case NonFatal(ex) =>
522+
logger.warn(s"[$name] Error while executing parsingFailureAction, applying DeliveryResult.${consumerConfig.failureAction}", ex)
523+
consumerConfig.failureAction
524+
}
525+
}
526+
506527
implicit class WrapConfig(val c: Config) extends AnyVal {
507528
def wrapped: Config = {
508529
// we need to wrap it with one level, to be able to parse it with Ficus

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec
5050
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
5151
addAutoCloseable {
5252
DefaultRabbitMQClientFactory.Consumer
53-
.fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener)(readAction)
53+
.fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener, readAction)
5454
}
5555
}
5656

@@ -141,3 +141,5 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec
141141
}
142142

143143
}
144+
145+
object DefaultRabbitMQConnection extends StrictLogging {}

0 commit comments

Comments
 (0)