FailedMessagesRetryCommand.php 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\Console\Exception\RuntimeException;
  13. use Symfony\Component\Console\Input\InputArgument;
  14. use Symfony\Component\Console\Input\InputInterface;
  15. use Symfony\Component\Console\Input\InputOption;
  16. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  17. use Symfony\Component\Console\Output\OutputInterface;
  18. use Symfony\Component\Console\Style\SymfonyStyle;
  19. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  20. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  21. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  22. use Symfony\Component\Messenger\Exception\LogicException;
  23. use Symfony\Component\Messenger\MessageBusInterface;
  24. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  25. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  26. use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
  27. use Symfony\Component\Messenger\Worker;
  28. /**
  29. * @author Ryan Weaver <ryan@symfonycasts.com>
  30. */
  31. class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
  32. {
  33. protected static $defaultName = 'messenger:failed:retry';
  34. private $eventDispatcher;
  35. private $messageBus;
  36. private $logger;
  37. public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
  38. {
  39. $this->eventDispatcher = $eventDispatcher;
  40. $this->messageBus = $messageBus;
  41. $this->logger = $logger;
  42. parent::__construct($receiverName, $receiver);
  43. }
  44. /**
  45. * {@inheritdoc}
  46. */
  47. protected function configure(): void
  48. {
  49. $this
  50. ->setDefinition([
  51. new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
  52. new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
  53. ])
  54. ->setDescription('Retry one or more messages from the failure transport')
  55. ->setHelp(<<<'EOF'
  56. The <info>%command.name%</info> retries message in the failure transport.
  57. <info>php %command.full_name%</info>
  58. The command will interactively ask if each message should be retried
  59. or discarded.
  60. Some transports support retrying a specific message id, which comes
  61. from the <info>messenger:failed:show</info> command.
  62. <info>php %command.full_name% {id}</info>
  63. Or pass multiple ids at once to process multiple messages:
  64. <info>php %command.full_name% {id1} {id2} {id3}</info>
  65. EOF
  66. )
  67. ;
  68. }
  69. /**
  70. * {@inheritdoc}
  71. */
  72. protected function execute(InputInterface $input, OutputInterface $output)
  73. {
  74. $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
  75. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  76. $io->comment('Quit this command with CONTROL-C.');
  77. if (!$output->isVeryVerbose()) {
  78. $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  79. }
  80. $receiver = $this->getReceiver();
  81. $this->printPendingMessagesMessage($receiver, $io);
  82. $io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
  83. $shouldForce = $input->getOption('force');
  84. $ids = $input->getArgument('id');
  85. if (0 === \count($ids)) {
  86. if (!$input->isInteractive()) {
  87. throw new RuntimeException('Message id must be passed when in non-interactive mode.');
  88. }
  89. $this->runInteractive($io, $shouldForce);
  90. return 0;
  91. }
  92. $this->retrySpecificIds($ids, $io, $shouldForce);
  93. $io->success('All done!');
  94. return 0;
  95. }
  96. private function runInteractive(SymfonyStyle $io, bool $shouldForce)
  97. {
  98. $receiver = $this->getReceiver();
  99. $count = 0;
  100. if ($receiver instanceof ListableReceiverInterface) {
  101. // for listable receivers, find the messages one-by-one
  102. // this avoids using get(), which for some less-robust
  103. // transports (like Doctrine), will cause the message
  104. // to be temporarily "acked", even if the user aborts
  105. // handling the message
  106. while (true) {
  107. $ids = [];
  108. foreach ($receiver->all(1) as $envelope) {
  109. ++$count;
  110. $id = $this->getMessageId($envelope);
  111. if (null === $id) {
  112. throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName()));
  113. }
  114. $ids[] = $id;
  115. }
  116. // break the loop if all messages are consumed
  117. if (0 === \count($ids)) {
  118. break;
  119. }
  120. $this->retrySpecificIds($ids, $io, $shouldForce);
  121. }
  122. } else {
  123. // get() and ask messages one-by-one
  124. $count = $this->runWorker($this->getReceiver(), $io, $shouldForce);
  125. }
  126. // avoid success message if nothing was processed
  127. if (1 <= $count) {
  128. $io->success('All failed messages have been handled or removed!');
  129. }
  130. }
  131. private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
  132. {
  133. $count = 0;
  134. $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
  135. ++$count;
  136. $envelope = $messageReceivedEvent->getEnvelope();
  137. $this->displaySingleMessage($envelope, $io);
  138. $shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
  139. if ($shouldHandle) {
  140. return;
  141. }
  142. $messageReceivedEvent->shouldHandle(false);
  143. $receiver->reject($envelope);
  144. };
  145. $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
  146. $worker = new Worker(
  147. [$this->getReceiverName() => $receiver],
  148. $this->messageBus,
  149. $this->eventDispatcher,
  150. $this->logger
  151. );
  152. try {
  153. $worker->run();
  154. } finally {
  155. $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
  156. }
  157. return $count;
  158. }
  159. private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
  160. {
  161. $receiver = $this->getReceiver();
  162. if (!$receiver instanceof ListableReceiverInterface) {
  163. throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName()));
  164. }
  165. foreach ($ids as $id) {
  166. $envelope = $receiver->find($id);
  167. if (null === $envelope) {
  168. throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
  169. }
  170. $singleReceiver = new SingleMessageReceiver($receiver, $envelope);
  171. $this->runWorker($singleReceiver, $io, $shouldForce);
  172. }
  173. }
  174. }