ConsumeMessagesCommand.php 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\Console\Command\Command;
  14. use Symfony\Component\Console\Exception\RuntimeException;
  15. use Symfony\Component\Console\Input\InputArgument;
  16. use Symfony\Component\Console\Input\InputInterface;
  17. use Symfony\Component\Console\Input\InputOption;
  18. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  19. use Symfony\Component\Console\Output\OutputInterface;
  20. use Symfony\Component\Console\Question\ChoiceQuestion;
  21. use Symfony\Component\Console\Style\SymfonyStyle;
  22. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  23. use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
  24. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  25. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  26. use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
  27. use Symfony\Component\Messenger\RoutableMessageBus;
  28. use Symfony\Component\Messenger\Worker;
  29. /**
  30. * @author Samuel Roze <samuel.roze@gmail.com>
  31. */
  32. class ConsumeMessagesCommand extends Command
  33. {
  34. protected static $defaultName = 'messenger:consume';
  35. private $routableBus;
  36. private $receiverLocator;
  37. private $logger;
  38. private $receiverNames;
  39. private $eventDispatcher;
  40. public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
  41. {
  42. $this->routableBus = $routableBus;
  43. $this->receiverLocator = $receiverLocator;
  44. $this->logger = $logger;
  45. $this->receiverNames = $receiverNames;
  46. $this->eventDispatcher = $eventDispatcher;
  47. parent::__construct();
  48. }
  49. /**
  50. * {@inheritdoc}
  51. */
  52. protected function configure(): void
  53. {
  54. $defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
  55. $this
  56. ->setDefinition([
  57. new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
  58. new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
  59. new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
  60. new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
  61. new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
  62. new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
  63. new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
  64. ])
  65. ->setDescription('Consume messages')
  66. ->setHelp(<<<'EOF'
  67. The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
  68. <info>php %command.full_name% <receiver-name></info>
  69. To receive from multiple transports, pass each name:
  70. <info>php %command.full_name% receiver1 receiver2</info>
  71. Use the --limit option to limit the number of messages received:
  72. <info>php %command.full_name% <receiver-name> --limit=10</info>
  73. Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
  74. <info>php %command.full_name% <receiver-name> --failure-limit=2</info>
  75. Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
  76. <info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
  77. Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached.
  78. If a message is being handled, the worker will stop after the processing is finished:
  79. <info>php %command.full_name% <receiver-name> --time-limit=3600</info>
  80. Use the --bus option to specify the message bus to dispatch received messages
  81. to instead of trying to determine it automatically. This is required if the
  82. messages didn't originate from Messenger:
  83. <info>php %command.full_name% <receiver-name> --bus=event_bus</info>
  84. EOF
  85. )
  86. ;
  87. }
  88. /**
  89. * {@inheritdoc}
  90. */
  91. protected function interact(InputInterface $input, OutputInterface $output)
  92. {
  93. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  94. if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
  95. $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
  96. $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
  97. if (\count($this->receiverNames) > 1) {
  98. $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
  99. }
  100. $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
  101. $question->setMultiselect(true);
  102. $input->setArgument('receivers', $io->askQuestion($question));
  103. }
  104. if (0 === \count($input->getArgument('receivers'))) {
  105. throw new RuntimeException('Please pass at least one receiver.');
  106. }
  107. }
  108. /**
  109. * {@inheritdoc}
  110. */
  111. protected function execute(InputInterface $input, OutputInterface $output)
  112. {
  113. $receivers = [];
  114. foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
  115. if (!$this->receiverLocator->has($receiverName)) {
  116. $message = sprintf('The receiver "%s" does not exist.', $receiverName);
  117. if ($this->receiverNames) {
  118. $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
  119. }
  120. throw new RuntimeException($message);
  121. }
  122. $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
  123. }
  124. $stopsWhen = [];
  125. if ($limit = $input->getOption('limit')) {
  126. $stopsWhen[] = "processed {$limit} messages";
  127. $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
  128. }
  129. if ($failureLimit = $input->getOption('failure-limit')) {
  130. $stopsWhen[] = "reached {$failureLimit} failed messages";
  131. $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
  132. }
  133. if ($memoryLimit = $input->getOption('memory-limit')) {
  134. $stopsWhen[] = "exceeded {$memoryLimit} of memory";
  135. $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
  136. }
  137. if ($timeLimit = $input->getOption('time-limit')) {
  138. $stopsWhen[] = "been running for {$timeLimit}s";
  139. $this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
  140. }
  141. $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
  142. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  143. $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
  144. if ($stopsWhen) {
  145. $last = array_pop($stopsWhen);
  146. $stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
  147. $io->comment("The worker will automatically exit once it has {$stopsWhen}.");
  148. }
  149. $io->comment('Quit the worker with CONTROL-C.');
  150. if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
  151. $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  152. }
  153. $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
  154. $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
  155. $worker->run([
  156. 'sleep' => $input->getOption('sleep') * 1000000,
  157. ]);
  158. return 0;
  159. }
  160. private function convertToBytes(string $memoryLimit): int
  161. {
  162. $memoryLimit = strtolower($memoryLimit);
  163. $max = ltrim($memoryLimit, '+');
  164. if (0 === strpos($max, '0x')) {
  165. $max = \intval($max, 16);
  166. } elseif (0 === strpos($max, '0')) {
  167. $max = \intval($max, 8);
  168. } else {
  169. $max = (int) $max;
  170. }
  171. switch (substr(rtrim($memoryLimit, 'b'), -1)) {
  172. case 't': $max *= 1024;
  173. // no break
  174. case 'g': $max *= 1024;
  175. // no break
  176. case 'm': $max *= 1024;
  177. // no break
  178. case 'k': $max *= 1024;
  179. }
  180. return $max;
  181. }
  182. }