RedisReceiver.php 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Exception\LogicException;
  13. use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
  14. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  15. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  16. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  17. /**
  18. * @author Alexander Schranz <alexander@sulu.io>
  19. * @author Antoine Bluchet <soyuka@gmail.com>
  20. */
  21. class RedisReceiver implements ReceiverInterface
  22. {
  23. private $connection;
  24. private $serializer;
  25. public function __construct(Connection $connection, SerializerInterface $serializer = null)
  26. {
  27. $this->connection = $connection;
  28. $this->serializer = $serializer ?? new PhpSerializer();
  29. }
  30. /**
  31. * {@inheritdoc}
  32. */
  33. public function get(): iterable
  34. {
  35. $redisEnvelope = $this->connection->get();
  36. if (null === $redisEnvelope) {
  37. return [];
  38. }
  39. try {
  40. $envelope = $this->serializer->decode([
  41. 'body' => $redisEnvelope['body'],
  42. 'headers' => $redisEnvelope['headers'],
  43. ]);
  44. } catch (MessageDecodingFailedException $exception) {
  45. $this->connection->reject($redisEnvelope['id']);
  46. throw $exception;
  47. }
  48. return [$envelope->with(new RedisReceivedStamp($redisEnvelope['id']))];
  49. }
  50. /**
  51. * {@inheritdoc}
  52. */
  53. public function ack(Envelope $envelope): void
  54. {
  55. $this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
  56. }
  57. /**
  58. * {@inheritdoc}
  59. */
  60. public function reject(Envelope $envelope): void
  61. {
  62. $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
  63. }
  64. private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
  65. {
  66. /** @var RedisReceivedStamp|null $redisReceivedStamp */
  67. $redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
  68. if (null === $redisReceivedStamp) {
  69. throw new LogicException('No RedisReceivedStamp found on the Envelope.');
  70. }
  71. return $redisReceivedStamp;
  72. }
  73. }
  74. class_alias(RedisReceiver::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver::class);