PostgreSqlConnection.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Schema\Table;
  12. /**
  13. * Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
  14. *
  15. * @internal
  16. *
  17. * @author Kévin Dunglas <dunglas@gmail.com>
  18. */
  19. final class PostgreSqlConnection extends Connection
  20. {
  21. /**
  22. * * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
  23. * * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 60000 (1 minute)
  24. * * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
  25. */
  26. protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
  27. 'check_delayed_interval' => 60000,
  28. 'get_notify_timeout' => 0,
  29. ];
  30. private $listening = false;
  31. public function __sleep()
  32. {
  33. throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
  34. }
  35. public function __wakeup()
  36. {
  37. throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
  38. }
  39. public function __destruct()
  40. {
  41. $this->unlisten();
  42. }
  43. public function reset()
  44. {
  45. parent::reset();
  46. $this->unlisten();
  47. }
  48. public function get(): ?array
  49. {
  50. if (null === $this->queueEmptiedAt) {
  51. return parent::get();
  52. }
  53. if (!$this->listening) {
  54. // This is secure because the table name must be a valid identifier:
  55. // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
  56. $this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name']));
  57. $this->listening = true;
  58. }
  59. $notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
  60. if (
  61. // no notifications, or for another table or queue
  62. (false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
  63. // delayed messages
  64. (microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
  65. ) {
  66. usleep(1000);
  67. return null;
  68. }
  69. return parent::get();
  70. }
  71. public function setup(): void
  72. {
  73. parent::setup();
  74. $this->executeStatement(implode("\n", $this->getTriggerSql()));
  75. }
  76. /**
  77. * @return string[]
  78. */
  79. public function getExtraSetupSqlForTable(Table $createdTable): array
  80. {
  81. if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
  82. return [];
  83. }
  84. if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
  85. return [];
  86. }
  87. return $this->getTriggerSql();
  88. }
  89. private function getTriggerSql(): array
  90. {
  91. return [
  92. // create trigger function
  93. sprintf(<<<'SQL'
  94. CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
  95. BEGIN
  96. PERFORM pg_notify('%1$s', NEW.queue_name::text);
  97. RETURN NEW;
  98. END;
  99. $$ LANGUAGE plpgsql;
  100. SQL
  101. , $this->configuration['table_name']),
  102. // register trigger
  103. sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']),
  104. sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();', $this->configuration['table_name']),
  105. ];
  106. }
  107. private function unlisten()
  108. {
  109. if (!$this->listening) {
  110. return;
  111. }
  112. $this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
  113. $this->listening = false;
  114. }
  115. }