DoctrineReceiver.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\DBALException;
  12. use Doctrine\DBAL\Exception;
  13. use Doctrine\DBAL\Exception\RetryableException;
  14. use Symfony\Component\Messenger\Envelope;
  15. use Symfony\Component\Messenger\Exception\LogicException;
  16. use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
  17. use Symfony\Component\Messenger\Exception\TransportException;
  18. use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
  19. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  20. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  21. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  22. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  23. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  24. /**
  25. * @author Vincent Touzet <vincent.touzet@gmail.com>
  26. */
  27. class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
  28. {
  29. private const MAX_RETRIES = 3;
  30. private $retryingSafetyCounter = 0;
  31. private $connection;
  32. private $serializer;
  33. public function __construct(Connection $connection, SerializerInterface $serializer = null)
  34. {
  35. $this->connection = $connection;
  36. $this->serializer = $serializer ?? new PhpSerializer();
  37. }
  38. /**
  39. * {@inheritdoc}
  40. */
  41. public function get(): iterable
  42. {
  43. try {
  44. $doctrineEnvelope = $this->connection->get();
  45. $this->retryingSafetyCounter = 0; // reset counter
  46. } catch (RetryableException $exception) {
  47. // Do nothing when RetryableException occurs less than "MAX_RETRIES"
  48. // as it will likely be resolved on the next call to get()
  49. // Problem with concurrent consumers and database deadlocks
  50. if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
  51. $this->retryingSafetyCounter = 0; // reset counter
  52. throw new TransportException($exception->getMessage(), 0, $exception);
  53. }
  54. return [];
  55. } catch (DBALException | Exception $exception) {
  56. throw new TransportException($exception->getMessage(), 0, $exception);
  57. }
  58. if (null === $doctrineEnvelope) {
  59. return [];
  60. }
  61. return [$this->createEnvelopeFromData($doctrineEnvelope)];
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function ack(Envelope $envelope): void
  67. {
  68. try {
  69. $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
  70. } catch (DBALException | Exception $exception) {
  71. throw new TransportException($exception->getMessage(), 0, $exception);
  72. }
  73. }
  74. /**
  75. * {@inheritdoc}
  76. */
  77. public function reject(Envelope $envelope): void
  78. {
  79. try {
  80. $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
  81. } catch (DBALException | Exception $exception) {
  82. throw new TransportException($exception->getMessage(), 0, $exception);
  83. }
  84. }
  85. /**
  86. * {@inheritdoc}
  87. */
  88. public function getMessageCount(): int
  89. {
  90. try {
  91. return $this->connection->getMessageCount();
  92. } catch (DBALException | Exception $exception) {
  93. throw new TransportException($exception->getMessage(), 0, $exception);
  94. }
  95. }
  96. /**
  97. * {@inheritdoc}
  98. */
  99. public function all(int $limit = null): iterable
  100. {
  101. try {
  102. $doctrineEnvelopes = $this->connection->findAll($limit);
  103. } catch (DBALException | Exception $exception) {
  104. throw new TransportException($exception->getMessage(), 0, $exception);
  105. }
  106. foreach ($doctrineEnvelopes as $doctrineEnvelope) {
  107. yield $this->createEnvelopeFromData($doctrineEnvelope);
  108. }
  109. }
  110. /**
  111. * {@inheritdoc}
  112. */
  113. public function find($id): ?Envelope
  114. {
  115. try {
  116. $doctrineEnvelope = $this->connection->find($id);
  117. } catch (DBALException | Exception $exception) {
  118. throw new TransportException($exception->getMessage(), 0, $exception);
  119. }
  120. if (null === $doctrineEnvelope) {
  121. return null;
  122. }
  123. return $this->createEnvelopeFromData($doctrineEnvelope);
  124. }
  125. private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
  126. {
  127. /** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
  128. $doctrineReceivedStamp = $envelope->last(DoctrineReceivedStamp::class);
  129. if (null === $doctrineReceivedStamp) {
  130. throw new LogicException('No DoctrineReceivedStamp found on the Envelope.');
  131. }
  132. return $doctrineReceivedStamp;
  133. }
  134. private function createEnvelopeFromData(array $data): Envelope
  135. {
  136. try {
  137. $envelope = $this->serializer->decode([
  138. 'body' => $data['body'],
  139. 'headers' => $data['headers'],
  140. ]);
  141. } catch (MessageDecodingFailedException $exception) {
  142. $this->connection->reject($data['id']);
  143. throw $exception;
  144. }
  145. return $envelope->with(
  146. new DoctrineReceivedStamp($data['id']),
  147. new TransportMessageIdStamp($data['id'])
  148. );
  149. }
  150. }
  151. class_alias(DoctrineReceiver::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver::class);