DispatchAfterCurrentBusMiddleware.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Middleware;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
  13. use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
  14. /**
  15. * Allow to configure messages to be handled after the current bus is finished.
  16. *
  17. * I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
  18. * will actually be handled once the current message being dispatched is fully
  19. * handled.
  20. *
  21. * For instance, using this middleware before the DoctrineTransactionMiddleware
  22. * means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
  23. * handled after the Doctrine transaction has been committed.
  24. *
  25. * @author Tobias Nyholm <tobias.nyholm@gmail.com>
  26. */
  27. class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
  28. {
  29. /**
  30. * @var QueuedEnvelope[] A queue of messages and next middleware
  31. */
  32. private $queue = [];
  33. /**
  34. * @var bool this property is used to signal if we are inside a the first/root call to
  35. * MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
  36. */
  37. private $isRootDispatchCallRunning = false;
  38. public function handle(Envelope $envelope, StackInterface $stack): Envelope
  39. {
  40. if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
  41. if ($this->isRootDispatchCallRunning) {
  42. $this->queue[] = new QueuedEnvelope($envelope, $stack);
  43. return $envelope;
  44. }
  45. $envelope = $envelope->withoutAll(DispatchAfterCurrentBusStamp::class);
  46. }
  47. if ($this->isRootDispatchCallRunning) {
  48. /*
  49. * A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
  50. * but the message does not have the stamp. So, process it like normal.
  51. */
  52. return $stack->next()->handle($envelope, $stack);
  53. }
  54. // First time we get here, mark as inside a "root dispatch" call:
  55. $this->isRootDispatchCallRunning = true;
  56. try {
  57. // Execute the whole middleware stack & message handling for main dispatch:
  58. $returnedEnvelope = $stack->next()->handle($envelope, $stack);
  59. } catch (\Throwable $exception) {
  60. /*
  61. * Whenever an exception occurs while handling a message that has
  62. * queued other messages, we drop the queued ones.
  63. * This is intentional since the queued commands were likely dependent
  64. * on the preceding command.
  65. */
  66. $this->queue = [];
  67. $this->isRootDispatchCallRunning = false;
  68. throw $exception;
  69. }
  70. // "Root dispatch" call is finished, dispatch stored messages.
  71. $exceptions = [];
  72. while (null !== $queueItem = array_shift($this->queue)) {
  73. // Save how many messages are left in queue before handling the message
  74. $queueLengthBefore = \count($this->queue);
  75. try {
  76. // Execute the stored messages
  77. $queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
  78. } catch (\Exception $exception) {
  79. // Gather all exceptions
  80. $exceptions[] = $exception;
  81. // Restore queue to previous state
  82. $this->queue = \array_slice($this->queue, 0, $queueLengthBefore);
  83. }
  84. }
  85. $this->isRootDispatchCallRunning = false;
  86. if (\count($exceptions) > 0) {
  87. throw new DelayedMessageHandlingException($exceptions);
  88. }
  89. return $returnedEnvelope;
  90. }
  91. }
  92. /**
  93. * @internal
  94. */
  95. final class QueuedEnvelope
  96. {
  97. /** @var Envelope */
  98. private $envelope;
  99. /** @var StackInterface */
  100. private $stack;
  101. public function __construct(Envelope $envelope, StackInterface $stack)
  102. {
  103. $this->envelope = $envelope->withoutAll(DispatchAfterCurrentBusStamp::class);
  104. $this->stack = $stack;
  105. }
  106. public function getEnvelope(): Envelope
  107. {
  108. return $this->envelope;
  109. }
  110. public function getStack(): StackInterface
  111. {
  112. return $this->stack;
  113. }
  114. }