Connection.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Connection as DBALConnection;
  12. use Doctrine\DBAL\DBALException;
  13. use Doctrine\DBAL\Driver\Result as DriverResult;
  14. use Doctrine\DBAL\Exception;
  15. use Doctrine\DBAL\Exception\TableNotFoundException;
  16. use Doctrine\DBAL\LockMode;
  17. use Doctrine\DBAL\Query\QueryBuilder;
  18. use Doctrine\DBAL\Result;
  19. use Doctrine\DBAL\Schema\Comparator;
  20. use Doctrine\DBAL\Schema\Schema;
  21. use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
  22. use Doctrine\DBAL\Schema\Table;
  23. use Doctrine\DBAL\Types\Types;
  24. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  25. use Symfony\Component\Messenger\Exception\TransportException;
  26. use Symfony\Contracts\Service\ResetInterface;
  27. /**
  28. * @internal since Symfony 5.1
  29. *
  30. * @author Vincent Touzet <vincent.touzet@gmail.com>
  31. * @author Kévin Dunglas <dunglas@gmail.com>
  32. */
  33. class Connection implements ResetInterface
  34. {
  35. protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name';
  36. protected const DEFAULT_OPTIONS = [
  37. 'table_name' => 'messenger_messages',
  38. 'queue_name' => 'default',
  39. 'redeliver_timeout' => 3600,
  40. 'auto_setup' => true,
  41. ];
  42. /**
  43. * Configuration of the connection.
  44. *
  45. * Available options:
  46. *
  47. * * table_name: name of the table
  48. * * connection: name of the Doctrine's entity manager
  49. * * queue_name: name of the queue
  50. * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
  51. * * auto_setup: Whether the table should be created automatically during send / get. Default: true
  52. */
  53. protected $configuration = [];
  54. protected $driverConnection;
  55. protected $queueEmptiedAt;
  56. private $schemaSynchronizer;
  57. private $autoSetup;
  58. public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
  59. {
  60. $this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
  61. $this->driverConnection = $driverConnection;
  62. $this->schemaSynchronizer = $schemaSynchronizer;
  63. $this->autoSetup = $this->configuration['auto_setup'];
  64. }
  65. public function reset()
  66. {
  67. $this->queueEmptiedAt = null;
  68. }
  69. public function getConfiguration(): array
  70. {
  71. return $this->configuration;
  72. }
  73. public static function buildConfiguration(string $dsn, array $options = []): array
  74. {
  75. if (false === $components = parse_url($dsn)) {
  76. throw new InvalidArgumentException(sprintf('The given Doctrine Messenger DSN "%s" is invalid.', $dsn));
  77. }
  78. $query = [];
  79. if (isset($components['query'])) {
  80. parse_str($components['query'], $query);
  81. }
  82. $configuration = ['connection' => $components['host']];
  83. $configuration += $query + $options + static::DEFAULT_OPTIONS;
  84. $configuration['auto_setup'] = filter_var($configuration['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
  85. // check for extra keys in options
  86. $optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
  87. if (0 < \count($optionsExtraKeys)) {
  88. throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
  89. }
  90. // check for extra keys in options
  91. $queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
  92. if (0 < \count($queryExtraKeys)) {
  93. throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
  94. }
  95. return $configuration;
  96. }
  97. /**
  98. * @param int $delay The delay in milliseconds
  99. *
  100. * @return string The inserted id
  101. *
  102. * @throws \Doctrine\DBAL\DBALException
  103. * @throws \Doctrine\DBAL\Exception
  104. */
  105. public function send(string $body, array $headers, int $delay = 0): string
  106. {
  107. $now = new \DateTime();
  108. $availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
  109. $queryBuilder = $this->driverConnection->createQueryBuilder()
  110. ->insert($this->configuration['table_name'])
  111. ->values([
  112. 'body' => '?',
  113. 'headers' => '?',
  114. 'queue_name' => '?',
  115. 'created_at' => '?',
  116. 'available_at' => '?',
  117. ]);
  118. $this->executeStatement($queryBuilder->getSQL(), [
  119. $body,
  120. json_encode($headers),
  121. $this->configuration['queue_name'],
  122. $now,
  123. $availableAt,
  124. ], [
  125. null,
  126. null,
  127. null,
  128. Types::DATETIME_MUTABLE,
  129. Types::DATETIME_MUTABLE,
  130. ]);
  131. return $this->driverConnection->lastInsertId();
  132. }
  133. public function get(): ?array
  134. {
  135. get:
  136. $this->driverConnection->beginTransaction();
  137. try {
  138. $query = $this->createAvailableMessagesQueryBuilder()
  139. ->orderBy('available_at', 'ASC')
  140. ->setMaxResults(1);
  141. // Append pessimistic write lock to FROM clause if db platform supports it
  142. $sql = $query->getSQL();
  143. if (($fromPart = $query->getQueryPart('from')) &&
  144. ($table = $fromPart[0]['table'] ?? null) &&
  145. ($alias = $fromPart[0]['alias'] ?? null)
  146. ) {
  147. $fromClause = sprintf('%s %s', $table, $alias);
  148. $sql = str_replace(
  149. sprintf('FROM %s WHERE', $fromClause),
  150. sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
  151. $sql
  152. );
  153. }
  154. // use SELECT ... FOR UPDATE to lock table
  155. $stmt = $this->executeQuery(
  156. $sql.' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
  157. $query->getParameters(),
  158. $query->getParameterTypes()
  159. );
  160. $doctrineEnvelope = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
  161. if (false === $doctrineEnvelope) {
  162. $this->driverConnection->commit();
  163. $this->queueEmptiedAt = microtime(true) * 1000;
  164. return null;
  165. }
  166. // Postgres can "group" notifications having the same channel and payload
  167. // We need to be sure to empty the queue before blocking again
  168. $this->queueEmptiedAt = null;
  169. $doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
  170. $queryBuilder = $this->driverConnection->createQueryBuilder()
  171. ->update($this->configuration['table_name'])
  172. ->set('delivered_at', '?')
  173. ->where('id = ?');
  174. $now = new \DateTime();
  175. $this->executeStatement($queryBuilder->getSQL(), [
  176. $now,
  177. $doctrineEnvelope['id'],
  178. ], [
  179. Types::DATETIME_MUTABLE,
  180. ]);
  181. $this->driverConnection->commit();
  182. return $doctrineEnvelope;
  183. } catch (\Throwable $e) {
  184. $this->driverConnection->rollBack();
  185. if ($this->autoSetup && $e instanceof TableNotFoundException) {
  186. $this->setup();
  187. goto get;
  188. }
  189. throw $e;
  190. }
  191. }
  192. public function ack(string $id): bool
  193. {
  194. try {
  195. return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
  196. } catch (DBALException | Exception $exception) {
  197. throw new TransportException($exception->getMessage(), 0, $exception);
  198. }
  199. }
  200. public function reject(string $id): bool
  201. {
  202. try {
  203. return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
  204. } catch (DBALException | Exception $exception) {
  205. throw new TransportException($exception->getMessage(), 0, $exception);
  206. }
  207. }
  208. public function setup(): void
  209. {
  210. $configuration = $this->driverConnection->getConfiguration();
  211. $assetFilter = $configuration->getSchemaAssetsFilter();
  212. $configuration->setSchemaAssetsFilter(null);
  213. $this->updateSchema();
  214. $configuration->setSchemaAssetsFilter($assetFilter);
  215. $this->autoSetup = false;
  216. }
  217. public function getMessageCount(): int
  218. {
  219. $queryBuilder = $this->createAvailableMessagesQueryBuilder()
  220. ->select('COUNT(m.id) as message_count')
  221. ->setMaxResults(1);
  222. $stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
  223. return $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchOne() : $stmt->fetchColumn();
  224. }
  225. public function findAll(int $limit = null): array
  226. {
  227. $queryBuilder = $this->createAvailableMessagesQueryBuilder();
  228. if (null !== $limit) {
  229. $queryBuilder->setMaxResults($limit);
  230. }
  231. $stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
  232. $data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAllAssociative() : $stmt->fetchAll();
  233. return array_map(function ($doctrineEnvelope) {
  234. return $this->decodeEnvelopeHeaders($doctrineEnvelope);
  235. }, $data);
  236. }
  237. public function find($id): ?array
  238. {
  239. $queryBuilder = $this->createQueryBuilder()
  240. ->where('m.id = ?');
  241. $stmt = $this->executeQuery($queryBuilder->getSQL(), [$id]);
  242. $data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
  243. return false === $data ? null : $this->decodeEnvelopeHeaders($data);
  244. }
  245. /**
  246. * @internal
  247. */
  248. public function configureSchema(Schema $schema, DBALConnection $forConnection): void
  249. {
  250. // only update the schema for this connection
  251. if ($forConnection !== $this->driverConnection) {
  252. return;
  253. }
  254. if ($schema->hasTable($this->configuration['table_name'])) {
  255. return;
  256. }
  257. $this->addTableToSchema($schema);
  258. }
  259. /**
  260. * @internal
  261. */
  262. public function getExtraSetupSqlForTable(Table $createdTable): array
  263. {
  264. return [];
  265. }
  266. private function createAvailableMessagesQueryBuilder(): QueryBuilder
  267. {
  268. $now = new \DateTime();
  269. $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
  270. return $this->createQueryBuilder()
  271. ->where('m.delivered_at is null OR m.delivered_at < ?')
  272. ->andWhere('m.available_at <= ?')
  273. ->andWhere('m.queue_name = ?')
  274. ->setParameters([
  275. $redeliverLimit,
  276. $now,
  277. $this->configuration['queue_name'],
  278. ], [
  279. Types::DATETIME_MUTABLE,
  280. Types::DATETIME_MUTABLE,
  281. ]);
  282. }
  283. private function createQueryBuilder(): QueryBuilder
  284. {
  285. return $this->driverConnection->createQueryBuilder()
  286. ->select('m.*')
  287. ->from($this->configuration['table_name'], 'm');
  288. }
  289. private function executeQuery(string $sql, array $parameters = [], array $types = [])
  290. {
  291. try {
  292. $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
  293. } catch (TableNotFoundException $e) {
  294. if ($this->driverConnection->isTransactionActive()) {
  295. throw $e;
  296. }
  297. // create table
  298. if ($this->autoSetup) {
  299. $this->setup();
  300. }
  301. $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
  302. }
  303. return $stmt;
  304. }
  305. protected function executeStatement(string $sql, array $parameters = [], array $types = [])
  306. {
  307. try {
  308. if (method_exists($this->driverConnection, 'executeStatement')) {
  309. $stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
  310. } else {
  311. $stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
  312. }
  313. } catch (TableNotFoundException $e) {
  314. if ($this->driverConnection->isTransactionActive()) {
  315. throw $e;
  316. }
  317. // create table
  318. if ($this->autoSetup) {
  319. $this->setup();
  320. }
  321. if (method_exists($this->driverConnection, 'executeStatement')) {
  322. $stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
  323. } else {
  324. $stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
  325. }
  326. }
  327. return $stmt;
  328. }
  329. private function getSchema(): Schema
  330. {
  331. $schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig());
  332. $this->addTableToSchema($schema);
  333. return $schema;
  334. }
  335. private function addTableToSchema(Schema $schema): void
  336. {
  337. $table = $schema->createTable($this->configuration['table_name']);
  338. // add an internal option to mark that we created this & the non-namespaced table name
  339. $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
  340. $table->addColumn('id', Types::BIGINT)
  341. ->setAutoincrement(true)
  342. ->setNotnull(true);
  343. $table->addColumn('body', Types::TEXT)
  344. ->setNotnull(true);
  345. $table->addColumn('headers', Types::TEXT)
  346. ->setNotnull(true);
  347. $table->addColumn('queue_name', Types::STRING)
  348. ->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
  349. ->setNotnull(true);
  350. $table->addColumn('created_at', Types::DATETIME_MUTABLE)
  351. ->setNotnull(true);
  352. $table->addColumn('available_at', Types::DATETIME_MUTABLE)
  353. ->setNotnull(true);
  354. $table->addColumn('delivered_at', Types::DATETIME_MUTABLE)
  355. ->setNotnull(false);
  356. $table->setPrimaryKey(['id']);
  357. $table->addIndex(['queue_name']);
  358. $table->addIndex(['available_at']);
  359. $table->addIndex(['delivered_at']);
  360. }
  361. private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
  362. {
  363. $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
  364. return $doctrineEnvelope;
  365. }
  366. private function updateSchema(): void
  367. {
  368. if (null !== $this->schemaSynchronizer) {
  369. $this->schemaSynchronizer->updateSchema($this->getSchema(), true);
  370. return;
  371. }
  372. $comparator = new Comparator();
  373. $schemaDiff = $comparator->compare($this->driverConnection->getSchemaManager()->createSchema(), $this->getSchema());
  374. foreach ($schemaDiff->toSaveSql($this->driverConnection->getDatabasePlatform()) as $sql) {
  375. if (method_exists($this->driverConnection, 'executeStatement')) {
  376. $this->driverConnection->executeStatement($sql);
  377. } else {
  378. $this->driverConnection->exec($sql);
  379. }
  380. }
  381. }
  382. }
  383. class_alias(Connection::class, \Symfony\Component\Messenger\Transport\Doctrine\Connection::class);