Skip to content
Merged
41 changes: 36 additions & 5 deletions app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
* Copyright 2017 Adobe
* All Rights Reserved.
*/
namespace Magento\MessageQueue\Model\Cron;

Expand Down Expand Up @@ -139,8 +139,7 @@ public function run(): void
];

if ($maxMessages) {
$arguments[] =
'--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
$arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages);
}

$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
Expand All @@ -155,7 +154,7 @@ public function run(): void
];

if ($maxMessages) {
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
$arguments = $this->addMaxMessagesArgument($arguments, $consumer, $maxMessages);
}

$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
Expand All @@ -166,6 +165,38 @@ public function run(): void
}
}

/**
* Add max-messages argument and log warning if exceeds default
*
* @param array $arguments Arguments array to append to
* @param ConsumerConfigItemInterface $consumer
* @param int $defaultMaxMessages
* @return array
*/
private function addMaxMessagesArgument(
array $arguments,
ConsumerConfigItemInterface $consumer,
int $defaultMaxMessages
): array {
$consumerMaxMessages =$consumer->getMaxMessages() ?? $defaultMaxMessages;

if ($consumerMaxMessages > $defaultMaxMessages) {
$this->logger->warning(
__(
'Consumer "%1" has max-messages=%2 which exceeds the configured default (%3). '
. 'This may probably cause high memory usage or long processing times.',
$consumer->getName(),
$consumerMaxMessages,
$defaultMaxMessages
)
);
}

$arguments[] = '--max-messages=' . $consumerMaxMessages;

return $arguments;
}

/**
* Checks that the consumer can be run
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
* Copyright 2017 Adobe
* All Rights Reserved.
*/
declare(strict_types=1);

Expand All @@ -18,6 +18,7 @@
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\PhpExecutableFinder;
use Psr\Log\LoggerInterface;

/**
* Unit tests for ConsumersRunner.
Expand Down Expand Up @@ -64,6 +65,11 @@ class ConsumersRunnerTest extends TestCase
*/
private $consumersRunner;

/**
* @var LoggerInterface
*/
private $loggerMock;

/**
* {@inheritdoc}
*/
Expand All @@ -89,14 +95,18 @@ protected function setUp(): void
->getMock();
$this->connectionTypeResolver->method('getConnectionType')->willReturn('something');

$this->loggerMock = $this->getMockBuilder(LoggerInterface::class)
->disableOriginalConstructor()
->getMock();

$this->consumersRunner = new ConsumersRunner(
$this->phpExecutableFinderMock,
$this->consumerConfigMock,
$this->deploymentConfigMock,
$this->shellBackgroundMock,
$this->lockManagerMock,
$this->connectionTypeResolver,
null,
$this->loggerMock,
$this->checkIsAvailableMessagesMock
);
}
Expand Down Expand Up @@ -124,6 +134,7 @@ public function testRunDisabled()

/**
* @param int $maxMessages
* @param int $maxMessagesConsumer
* @param bool $isLocked
* @param string $php
* @param string $command
Expand All @@ -135,6 +146,7 @@ public function testRunDisabled()
*/
public function testRun(
$maxMessages,
$maxMessagesConsumer,
$isLocked,
$php,
$command,
Expand All @@ -161,6 +173,7 @@ public function testRun(
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
->getMockForAbstractClass();
$consumer->method('getName')->willReturn($consumerName);
$consumer->method('getMaxMessages')->willReturn($maxMessagesConsumer);

$this->phpExecutableFinderMock->expects($this->once())
->method('find')
Expand Down Expand Up @@ -190,6 +203,7 @@ public static function runDataProvider()
return [
[
'maxMessages' => 20000,
'maxMessagesConsumer' => 20000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -200,16 +214,18 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 30000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'],
'arguments' => ['consumerName', '--single-thread', '--max-messages=30000'],
'allowedConsumers' => [],
'shellBackgroundExpects' => 1,
'isRunExpects' => 1,
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -220,6 +236,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -230,6 +247,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -240,6 +258,7 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 10000,
'isLocked' => true,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
Expand All @@ -250,16 +269,18 @@ public static function runDataProvider()
],
[
'maxMessages' => 10000,
'maxMessagesConsumer' => 500,
'isLocked' => false,
'php' => '',
'command' => 'php ' . BP . '/bin/magento queue:consumers:start %s %s %s',
'arguments' => ['consumerName', '--single-thread', '--max-messages=10000'],
'arguments' => ['consumerName', '--single-thread', '--max-messages=500'],
'allowedConsumers' => ['consumerName'],
'shellBackgroundExpects' => 1,
'isRunExpects' => 1,
],
[
'maxMessages' => 0,
'maxMessagesConsumer' => 0,
'isLocked' => false,
'php' => '/bin/php',
'command' => '/bin/php ' . BP . '/bin/magento queue:consumers:start %s %s',
Expand Down