Worker.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\EventDispatcher\Event;
  13. use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
  14. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  15. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  16. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  17. use Symfony\Component\Messenger\Event\WorkerRunningEvent;
  18. use Symfony\Component\Messenger\Event\WorkerStartedEvent;
  19. use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
  20. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  21. use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
  22. use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
  23. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  24. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  25. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  26. /**
  27. * @author Samuel Roze <samuel.roze@gmail.com>
  28. * @author Tobias Schultze <http://tobion.de>
  29. *
  30. * @final
  31. */
  32. class Worker
  33. {
  34. private $receivers;
  35. private $bus;
  36. private $eventDispatcher;
  37. private $logger;
  38. private $shouldStop = false;
  39. /**
  40. * @param ReceiverInterface[] $receivers Where the key is the transport name
  41. */
  42. public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
  43. {
  44. $this->receivers = $receivers;
  45. $this->bus = $bus;
  46. $this->logger = $logger;
  47. $this->eventDispatcher = class_exists(Event::class) ? LegacyEventDispatcherProxy::decorate($eventDispatcher) : $eventDispatcher;
  48. }
  49. /**
  50. * Receive the messages and dispatch them to the bus.
  51. *
  52. * Valid options are:
  53. * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
  54. */
  55. public function run(array $options = []): void
  56. {
  57. $this->dispatchEvent(new WorkerStartedEvent($this));
  58. $options = array_merge([
  59. 'sleep' => 1000000,
  60. ], $options);
  61. while (false === $this->shouldStop) {
  62. $envelopeHandled = false;
  63. foreach ($this->receivers as $transportName => $receiver) {
  64. $envelopes = $receiver->get();
  65. foreach ($envelopes as $envelope) {
  66. $envelopeHandled = true;
  67. $this->handleMessage($envelope, $receiver, $transportName);
  68. $this->dispatchEvent(new WorkerRunningEvent($this, false));
  69. if ($this->shouldStop) {
  70. break 2;
  71. }
  72. }
  73. // after handling a single receiver, quit and start the loop again
  74. // this should prevent multiple lower priority receivers from
  75. // blocking too long before the higher priority are checked
  76. if ($envelopeHandled) {
  77. break;
  78. }
  79. }
  80. if (false === $envelopeHandled) {
  81. $this->dispatchEvent(new WorkerRunningEvent($this, true));
  82. usleep($options['sleep']);
  83. }
  84. }
  85. $this->dispatchEvent(new WorkerStoppedEvent($this));
  86. }
  87. private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
  88. {
  89. $event = new WorkerMessageReceivedEvent($envelope, $transportName);
  90. $this->dispatchEvent($event);
  91. $envelope = $event->getEnvelope();
  92. if (!$event->shouldHandle()) {
  93. return;
  94. }
  95. try {
  96. $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
  97. } catch (\Throwable $throwable) {
  98. $rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
  99. if ($rejectFirst) {
  100. // redelivered messages are rejected first so that continuous failures in an event listener or while
  101. // publishing for retry does not cause infinite redelivery loops
  102. $receiver->reject($envelope);
  103. }
  104. if ($throwable instanceof HandlerFailedException) {
  105. $envelope = $throwable->getEnvelope();
  106. }
  107. $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
  108. $this->dispatchEvent($failedEvent);
  109. $envelope = $failedEvent->getEnvelope();
  110. if (!$rejectFirst) {
  111. $receiver->reject($envelope);
  112. }
  113. return;
  114. }
  115. $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
  116. $this->dispatchEvent($handledEvent);
  117. $envelope = $handledEvent->getEnvelope();
  118. if (null !== $this->logger) {
  119. $message = $envelope->getMessage();
  120. $context = [
  121. 'message' => $message,
  122. 'class' => \get_class($message),
  123. ];
  124. $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
  125. }
  126. $receiver->ack($envelope);
  127. }
  128. public function stop(): void
  129. {
  130. $this->shouldStop = true;
  131. }
  132. private function dispatchEvent(object $event): void
  133. {
  134. if (null === $this->eventDispatcher) {
  135. return;
  136. }
  137. $this->eventDispatcher->dispatch($event);
  138. }
  139. }