DoctrineTransportFactory.php 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Driver\AbstractPostgreSQLDriver;
  12. use Doctrine\Persistence\ConnectionRegistry;
  13. use Symfony\Bridge\Doctrine\RegistryInterface;
  14. use Symfony\Component\Messenger\Exception\TransportException;
  15. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  16. use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
  17. use Symfony\Component\Messenger\Transport\TransportInterface;
  18. /**
  19. * @author Vincent Touzet <vincent.touzet@gmail.com>
  20. */
  21. class DoctrineTransportFactory implements TransportFactoryInterface
  22. {
  23. private $registry;
  24. public function __construct($registry)
  25. {
  26. if (!$registry instanceof RegistryInterface && !$registry instanceof ConnectionRegistry) {
  27. throw new \TypeError(sprintf('Expected an instance of "%s" or "%s", but got "%s".', RegistryInterface::class, ConnectionRegistry::class, get_debug_type($registry)));
  28. }
  29. $this->registry = $registry;
  30. }
  31. public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
  32. {
  33. $useNotify = ($options['use_notify'] ?? true);
  34. unset($options['transport_name'], $options['use_notify']);
  35. // Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
  36. $configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
  37. try {
  38. $driverConnection = $this->registry->getConnection($configuration['connection']);
  39. } catch (\InvalidArgumentException $e) {
  40. throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
  41. }
  42. if ($useNotify && $driverConnection->getDriver() instanceof AbstractPostgreSQLDriver) {
  43. $connection = new PostgreSqlConnection($configuration, $driverConnection);
  44. } else {
  45. $connection = new Connection($configuration, $driverConnection);
  46. }
  47. return new DoctrineTransport($connection, $serializer);
  48. }
  49. public function supports(string $dsn, array $options): bool
  50. {
  51. return 0 === strpos($dsn, 'doctrine://');
  52. }
  53. }
  54. class_alias(DoctrineTransportFactory::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::class);