Skip to content
37 changes: 34 additions & 3 deletions app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ public function run(): void
];

if ($maxMessages) {
$arguments[] =
'--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
$arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages);
}

$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
Expand All @@ -155,7 +154,7 @@ public function run(): void
];

if ($maxMessages) {
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
$arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages);
}

$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
Expand All @@ -166,6 +165,38 @@ public function run(): void
}
}

/**
* Add max-messages argument and log warning if exceeds default
*
* @param array $arguments Arguments array to append to
* @param ConsumerConfigItemInterface $consumer
* @param int $defaultMaxMessages
* @return array
*/
private function addMaxMessagesArgument(
array $arguments,
ConsumerConfigItemInterface $consumer,
int $defaultMaxMessages
): array {
$consumerMaxMessages =$consumer->getMaxMessages() ?? $defaultMaxMessages;

if ($consumerMaxMessages > $defaultMaxMessages) {
$this->logger->warning(
__(
'Consumer "%1" has max-messages=%2 which exceeds the configured default (%3). '
. 'This may probably cause high memory usage or long processing times.',
$consumer->getName(),
$consumerMaxMessages,
$defaultMaxMessages
)
);
}

$arguments[] = '--max-messages=' . $consumerMaxMessages;

return $arguments;
}

/**
* Checks that the consumer can be run
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public function testRunDisabled()

/**
* @param int $maxMessages
* @param int $maxMessagesConsumer
* @param bool $isLocked
* @param string $php
* @param string $command
Expand All @@ -135,6 +136,7 @@ public function testRunDisabled()
*/
public function testRun(
$maxMessages,
$maxMessagesConsumer,
$isLocked,
$php,
$command,
Expand All @@ -161,6 +163,7 @@ public function testRun(
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
->getMockForAbstractClass();
$consumer->method('getName')->willReturn($consumerName);
$consumer->method('getMaxMessages')->willReturn($maxMessagesConsumer);

$this->phpExecutableFinderMock->expects($this->once())
->method('find')
Expand Down Expand Up @@ -190,6 +193,7 @@ public static function runDataProvider()
return [
[
'maxMessages' => 20000,
'maxMessagesConsumer' => 20000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -200,16 +204,18 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 30000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'],
'arguments' => ['consumerName', '--single-thread', '--max-messages=30000'],
'allowedConsumers' => [],
'shellBackgroundExpects' => 1,
'isRunExpects' => 1,
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -220,6 +226,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -230,6 +237,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -240,6 +248,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -250,16 +259,18 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 500,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'],
'arguments' => ['consumerName', '--single-thread', '--max-messages=500'],
'allowedConsumers' => ['consumerName'],
'shellBackgroundExpects' => 1,
'isRunExpects' => 1,
],
[
'maxMessages' => 0,
'maxMessagesConsumer' => 0,
'isLocked' => false,
'php' => '/bin/php',
'command' => '/bin/php ' . BP . '/bin/magento queue:consumers:start %s %s',
Expand Down