AmqpSender.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Exception\TransportException;
  13. use Symfony\Component\Messenger\Stamp\DelayStamp;
  14. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  15. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  16. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  17. /**
  18. * Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
  19. *
  20. * @author Samuel Roze <samuel.roze@gmail.com>
  21. */
  22. class AmqpSender implements SenderInterface
  23. {
  24. private $serializer;
  25. private $connection;
  26. public function __construct(Connection $connection, SerializerInterface $serializer = null)
  27. {
  28. $this->connection = $connection;
  29. $this->serializer = $serializer ?? new PhpSerializer();
  30. }
  31. /**
  32. * {@inheritdoc}
  33. */
  34. public function send(Envelope $envelope): Envelope
  35. {
  36. $encodedMessage = $this->serializer->encode($envelope);
  37. /** @var DelayStamp|null $delayStamp */
  38. $delayStamp = $envelope->last(DelayStamp::class);
  39. $delay = $delayStamp ? $delayStamp->getDelay() : 0;
  40. /** @var AmqpStamp|null $amqpStamp */
  41. $amqpStamp = $envelope->last(AmqpStamp::class);
  42. if (isset($encodedMessage['headers']['Content-Type'])) {
  43. $contentType = $encodedMessage['headers']['Content-Type'];
  44. unset($encodedMessage['headers']['Content-Type']);
  45. if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
  46. $amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
  47. }
  48. }
  49. $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
  50. if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
  51. $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
  52. }
  53. try {
  54. $this->connection->publish(
  55. $encodedMessage['body'],
  56. $encodedMessage['headers'] ?? [],
  57. $delay,
  58. $amqpStamp
  59. );
  60. } catch (\AMQPException $e) {
  61. throw new TransportException($e->getMessage(), 0, $e);
  62. }
  63. return $envelope;
  64. }
  65. }
  66. class_alias(AmqpSender::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender::class);