AmqpReceiver.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Exception\LogicException;
  13. use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
  14. use Symfony\Component\Messenger\Exception\TransportException;
  15. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  16. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  17. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  18. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  19. /**
  20. * Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
  21. *
  22. * @author Samuel Roze <samuel.roze@gmail.com>
  23. */
  24. class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
  25. {
  26. private $serializer;
  27. private $connection;
  28. public function __construct(Connection $connection, SerializerInterface $serializer = null)
  29. {
  30. $this->connection = $connection;
  31. $this->serializer = $serializer ?? new PhpSerializer();
  32. }
  33. /**
  34. * {@inheritdoc}
  35. */
  36. public function get(): iterable
  37. {
  38. foreach ($this->connection->getQueueNames() as $queueName) {
  39. yield from $this->getEnvelope($queueName);
  40. }
  41. }
  42. private function getEnvelope(string $queueName): iterable
  43. {
  44. try {
  45. $amqpEnvelope = $this->connection->get($queueName);
  46. } catch (\AMQPException $exception) {
  47. throw new TransportException($exception->getMessage(), 0, $exception);
  48. }
  49. if (null === $amqpEnvelope) {
  50. return;
  51. }
  52. $body = $amqpEnvelope->getBody();
  53. try {
  54. $envelope = $this->serializer->decode([
  55. 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
  56. 'headers' => $amqpEnvelope->getHeaders(),
  57. ]);
  58. } catch (MessageDecodingFailedException $exception) {
  59. // invalid message of some type
  60. $this->rejectAmqpEnvelope($amqpEnvelope, $queueName);
  61. throw $exception;
  62. }
  63. yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
  64. }
  65. /**
  66. * {@inheritdoc}
  67. */
  68. public function ack(Envelope $envelope): void
  69. {
  70. try {
  71. $stamp = $this->findAmqpStamp($envelope);
  72. $this->connection->ack(
  73. $stamp->getAmqpEnvelope(),
  74. $stamp->getQueueName()
  75. );
  76. } catch (\AMQPException $exception) {
  77. throw new TransportException($exception->getMessage(), 0, $exception);
  78. }
  79. }
  80. /**
  81. * {@inheritdoc}
  82. */
  83. public function reject(Envelope $envelope): void
  84. {
  85. $stamp = $this->findAmqpStamp($envelope);
  86. $this->rejectAmqpEnvelope(
  87. $stamp->getAmqpEnvelope(),
  88. $stamp->getQueueName()
  89. );
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function getMessageCount(): int
  95. {
  96. try {
  97. return $this->connection->countMessagesInQueues();
  98. } catch (\AMQPException $exception) {
  99. throw new TransportException($exception->getMessage(), 0, $exception);
  100. }
  101. }
  102. private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
  103. {
  104. try {
  105. $this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
  106. } catch (\AMQPException $exception) {
  107. throw new TransportException($exception->getMessage(), 0, $exception);
  108. }
  109. }
  110. private function findAmqpStamp(Envelope $envelope): AmqpReceivedStamp
  111. {
  112. $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
  113. if (null === $amqpReceivedStamp) {
  114. throw new LogicException('No "AmqpReceivedStamp" stamp found on the Envelope.');
  115. }
  116. return $amqpReceivedStamp;
  117. }
  118. }
  119. class_alias(AmqpReceiver::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::class);