AmqpTransport.php 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  13. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  14. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  15. use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
  16. use Symfony\Component\Messenger\Transport\TransportInterface;
  17. /**
  18. * @author Nicolas Grekas <p@tchwork.com>
  19. */
  20. class AmqpTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
  21. {
  22. private $serializer;
  23. private $connection;
  24. private $receiver;
  25. private $sender;
  26. public function __construct(Connection $connection, SerializerInterface $serializer = null)
  27. {
  28. $this->connection = $connection;
  29. $this->serializer = $serializer ?? new PhpSerializer();
  30. }
  31. /**
  32. * {@inheritdoc}
  33. */
  34. public function get(): iterable
  35. {
  36. return ($this->receiver ?? $this->getReceiver())->get();
  37. }
  38. /**
  39. * {@inheritdoc}
  40. */
  41. public function ack(Envelope $envelope): void
  42. {
  43. ($this->receiver ?? $this->getReceiver())->ack($envelope);
  44. }
  45. /**
  46. * {@inheritdoc}
  47. */
  48. public function reject(Envelope $envelope): void
  49. {
  50. ($this->receiver ?? $this->getReceiver())->reject($envelope);
  51. }
  52. /**
  53. * {@inheritdoc}
  54. */
  55. public function send(Envelope $envelope): Envelope
  56. {
  57. return ($this->sender ?? $this->getSender())->send($envelope);
  58. }
  59. /**
  60. * {@inheritdoc}
  61. */
  62. public function setup(): void
  63. {
  64. $this->connection->setup();
  65. }
  66. /**
  67. * {@inheritdoc}
  68. */
  69. public function getMessageCount(): int
  70. {
  71. return ($this->receiver ?? $this->getReceiver())->getMessageCount();
  72. }
  73. private function getReceiver(): AmqpReceiver
  74. {
  75. return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
  76. }
  77. private function getSender(): AmqpSender
  78. {
  79. return $this->sender = new AmqpSender($this->connection, $this->serializer);
  80. }
  81. }
  82. class_alias(AmqpTransport::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport::class);