diff --git a/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php b/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php index cb320e98d28c5..e905e643a4ca5 100644 --- a/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php +++ b/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php @@ -1,7 +1,7 @@ getMaxMessages() ?? $maxMessages, $maxMessages); + $arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages); } $command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s' @@ -155,7 +154,7 @@ public function run(): void ]; if ($maxMessages) { - $arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages); + $arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages); } $command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s' @@ -166,6 +165,38 @@ public function run(): void } } + /** + * Add max-messages argument and log warning if exceeds default + * + * @param array $arguments Arguments array to append to + * @param ConsumerConfigItemInterface $consumer + * @param int $defaultMaxMessages + * @return array + */ + private function addMaxMessagesArgument( + array $arguments, + ConsumerConfigItemInterface $consumer, + int $defaultMaxMessages + ): array { + $consumerMaxMessages =$consumer->getMaxMessages() ?? $defaultMaxMessages; + + if ($consumerMaxMessages > $defaultMaxMessages) { + $this->logger->warning( + __( + 'Consumer "%1" has max-messages=%2 which exceeds the configured default (%3). ' + . 'This may probably cause high memory usage or long processing times.', + $consumer->getName(), + $consumerMaxMessages, + $defaultMaxMessages + ) + ); + } + + $arguments[] = '--max-messages=' . $consumerMaxMessages; + + return $arguments; + } + /** * Checks that the consumer can be run * diff --git a/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php b/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php index 9c68c5f523063..6df6772694409 100644 --- a/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php +++ b/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php @@ -1,7 +1,7 @@ getMock(); $this->connectionTypeResolver->method('getConnectionType')->willReturn('something'); + $this->loggerMock = $this->getMockBuilder(LoggerInterface::class) + ->disableOriginalConstructor() + ->getMock(); + $this->consumersRunner = new ConsumersRunner( $this->phpExecutableFinderMock, $this->consumerConfigMock, @@ -96,7 +106,7 @@ protected function setUp(): void $this->shellBackgroundMock, $this->lockManagerMock, $this->connectionTypeResolver, - null, + $this->loggerMock, $this->checkIsAvailableMessagesMock ); } @@ -124,6 +134,7 @@ public function testRunDisabled() /** * @param int $maxMessages + * @param int $maxMessagesConsumer * @param bool $isLocked * @param string $php * @param string $command @@ -135,6 +146,7 @@ public function testRunDisabled() */ public function testRun( $maxMessages, + $maxMessagesConsumer, $isLocked, $php, $command, @@ -161,6 +173,7 @@ public function testRun( $consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class) ->getMockForAbstractClass(); $consumer->method('getName')->willReturn($consumerName); + $consumer->method('getMaxMessages')->willReturn($maxMessagesConsumer); $this->phpExecutableFinderMock->expects($this->once()) ->method('find') @@ -190,6 +203,7 @@ public static function runDataProvider() return [ [ 'maxMessages' => 20000, + 'maxMessagesConsumer' => 20000, 'isLocked' => false, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', @@ -200,16 +214,18 @@ public static function runDataProvider() ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 30000, 'isLocked' => false, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', - 'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'], + 'arguments' => ['consumerName', '--single-thread', '--max-messages=30000'], 'allowedConsumers' => [], 'shellBackgroundExpects' => 1, 'isRunExpects' => 1, ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 10000, 'isLocked' => false, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', @@ -220,6 +236,7 @@ public static function runDataProvider() ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 10000, 'isLocked' => true, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', @@ -230,6 +247,7 @@ public static function runDataProvider() ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 10000, 'isLocked' => true, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', @@ -240,6 +258,7 @@ public static function runDataProvider() ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 10000, 'isLocked' => true, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', @@ -250,16 +269,18 @@ public static function runDataProvider() ], [ 'maxMessages' => 10000, + 'maxMessagesConsumer' => 500, 'isLocked' => false, 'php' => '', 'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s', - 'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'], + 'arguments' => ['consumerName', '--single-thread', '--max-messages=500'], 'allowedConsumers' => ['consumerName'], 'shellBackgroundExpects' => 1, 'isRunExpects' => 1, ], [ 'maxMessages' => 0, + 'maxMessagesConsumer' => 0, 'isLocked' => false, 'php' => '/bin/php', 'command' => '/bin/php ' . BP . '/bin/magento queue:consumers:start %s %s',