123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- <?php
- /*
- * This file is part of the Symfony package.
- *
- * (c) Fabien Potencier <fabien@symfony.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
- use Symfony\Component\Messenger\Envelope;
- use Symfony\Component\Messenger\Exception\LogicException;
- use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
- use Symfony\Component\Messenger\Exception\TransportException;
- use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
- use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
- use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
- use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
- /**
- * Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
- *
- * @author Samuel Roze <samuel.roze@gmail.com>
- */
- class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
- {
- private $serializer;
- private $connection;
- public function __construct(Connection $connection, SerializerInterface $serializer = null)
- {
- $this->connection = $connection;
- $this->serializer = $serializer ?? new PhpSerializer();
- }
- /**
- * {@inheritdoc}
- */
- public function get(): iterable
- {
- foreach ($this->connection->getQueueNames() as $queueName) {
- yield from $this->getEnvelope($queueName);
- }
- }
- private function getEnvelope(string $queueName): iterable
- {
- try {
- $amqpEnvelope = $this->connection->get($queueName);
- } catch (\AMQPException $exception) {
- throw new TransportException($exception->getMessage(), 0, $exception);
- }
- if (null === $amqpEnvelope) {
- return;
- }
- $body = $amqpEnvelope->getBody();
- try {
- $envelope = $this->serializer->decode([
- 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
- 'headers' => $amqpEnvelope->getHeaders(),
- ]);
- } catch (MessageDecodingFailedException $exception) {
- // invalid message of some type
- $this->rejectAmqpEnvelope($amqpEnvelope, $queueName);
- throw $exception;
- }
- yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
- }
- /**
- * {@inheritdoc}
- */
- public function ack(Envelope $envelope): void
- {
- try {
- $stamp = $this->findAmqpStamp($envelope);
- $this->connection->ack(
- $stamp->getAmqpEnvelope(),
- $stamp->getQueueName()
- );
- } catch (\AMQPException $exception) {
- throw new TransportException($exception->getMessage(), 0, $exception);
- }
- }
- /**
- * {@inheritdoc}
- */
- public function reject(Envelope $envelope): void
- {
- $stamp = $this->findAmqpStamp($envelope);
- $this->rejectAmqpEnvelope(
- $stamp->getAmqpEnvelope(),
- $stamp->getQueueName()
- );
- }
- /**
- * {@inheritdoc}
- */
- public function getMessageCount(): int
- {
- try {
- return $this->connection->countMessagesInQueues();
- } catch (\AMQPException $exception) {
- throw new TransportException($exception->getMessage(), 0, $exception);
- }
- }
- private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
- {
- try {
- $this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
- } catch (\AMQPException $exception) {
- throw new TransportException($exception->getMessage(), 0, $exception);
- }
- }
- private function findAmqpStamp(Envelope $envelope): AmqpReceivedStamp
- {
- $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
- if (null === $amqpReceivedStamp) {
- throw new LogicException('No "AmqpReceivedStamp" stamp found on the Envelope.');
- }
- return $amqpReceivedStamp;
- }
- }
- class_alias(AmqpReceiver::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::class);
|