Connection.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
  11. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  12. use Symfony\Component\Messenger\Exception\LogicException;
  13. /**
  14. * An AMQP connection.
  15. *
  16. * @author Samuel Roze <samuel.roze@gmail.com>
  17. *
  18. * @final
  19. */
  20. class Connection
  21. {
  22. private const ARGUMENTS_AS_INTEGER = [
  23. 'x-delay',
  24. 'x-expires',
  25. 'x-max-length',
  26. 'x-max-length-bytes',
  27. 'x-max-priority',
  28. 'x-message-ttl',
  29. ];
  30. private const AVAILABLE_OPTIONS = [
  31. 'host',
  32. 'port',
  33. 'vhost',
  34. 'user',
  35. 'login',
  36. 'password',
  37. 'queues',
  38. 'exchange',
  39. 'delay',
  40. 'auto_setup',
  41. 'prefetch_count',
  42. 'retry',
  43. 'persistent',
  44. 'frame_max',
  45. 'channel_max',
  46. 'heartbeat',
  47. 'read_timeout',
  48. 'write_timeout',
  49. 'confirm_timeout',
  50. 'connect_timeout',
  51. 'cacert',
  52. 'cert',
  53. 'key',
  54. 'verify',
  55. 'sasl_method',
  56. ];
  57. private const AVAILABLE_QUEUE_OPTIONS = [
  58. 'binding_keys',
  59. 'binding_arguments',
  60. 'flags',
  61. 'arguments',
  62. ];
  63. private const AVAILABLE_EXCHANGE_OPTIONS = [
  64. 'name',
  65. 'type',
  66. 'default_publish_routing_key',
  67. 'flags',
  68. 'arguments',
  69. ];
  70. private $connectionOptions;
  71. private $exchangeOptions;
  72. private $queuesOptions;
  73. private $amqpFactory;
  74. /**
  75. * @var \AMQPChannel|null
  76. */
  77. private $amqpChannel;
  78. /**
  79. * @var \AMQPExchange|null
  80. */
  81. private $amqpExchange;
  82. /**
  83. * @var \AMQPQueue[]|null
  84. */
  85. private $amqpQueues = [];
  86. /**
  87. * @var \AMQPExchange|null
  88. */
  89. private $amqpDelayExchange;
  90. public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
  91. {
  92. if (!\extension_loaded('amqp')) {
  93. throw new LogicException(sprintf('You cannot use the "%s" as the "amqp" extension is not installed.', __CLASS__));
  94. }
  95. $this->connectionOptions = array_replace_recursive([
  96. 'delay' => [
  97. 'exchange_name' => 'delays',
  98. 'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
  99. ],
  100. ], $connectionOptions);
  101. $this->exchangeOptions = $exchangeOptions;
  102. $this->queuesOptions = $queuesOptions;
  103. $this->amqpFactory = $amqpFactory ?: new AmqpFactory();
  104. }
  105. /**
  106. * Creates a connection based on the DSN and options.
  107. *
  108. * Available options:
  109. *
  110. * * host: Hostname of the AMQP service
  111. * * port: Port of the AMQP service
  112. * * vhost: Virtual Host to use with the AMQP service
  113. * * user|login: Username to use to connect the AMQP service
  114. * * password: Password to use to connect to the AMQP service
  115. * * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
  116. * * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
  117. * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
  118. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
  119. * * queues[name]: An array of queues, keyed by the name
  120. * * binding_keys: The binding keys (if any) to bind to this queue
  121. * * binding_arguments: Arguments to be used while binding the queue.
  122. * * flags: Queue flags (Default: AMQP_DURABLE)
  123. * * arguments: Extra arguments
  124. * * exchange:
  125. * * name: Name of the exchange
  126. * * type: Type of exchange (Default: fanout)
  127. * * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
  128. * * flags: Exchange flags (Default: AMQP_DURABLE)
  129. * * arguments: Extra arguments
  130. * * delay:
  131. * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
  132. * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
  133. * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
  134. * * prefetch_count: set channel prefetch count
  135. *
  136. * * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details):
  137. * * channel_max: Specifies highest channel number that the server permits. 0 means standard extension limit
  138. * (see PHP_AMQP_MAX_CHANNELS constant)
  139. * * frame_max: The largest frame size that the server proposes for the connection, including frame header
  140. * and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit)
  141. * * heartbeat: The delay, in seconds, of the connection heartbeat that the server wants.
  142. * 0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support,
  143. * which means heartbeats checked only during blocking calls.
  144. *
  145. * TLS support (see https://www.rabbitmq.com/ssl.html for details):
  146. * * cacert: Path to the CA cert file in PEM format.
  147. * * cert: Path to the client certificate in PEM format.
  148. * * key: Path to the client key in PEM format.
  149. * * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
  150. * server certificate must match the server name. Peer verification is enabled by default.
  151. */
  152. public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
  153. {
  154. if (false === $parsedUrl = parse_url($dsn)) {
  155. // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
  156. if (!\in_array($dsn, ['amqp://', 'amqps://'])) {
  157. throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
  158. }
  159. $parsedUrl = [];
  160. }
  161. $useAmqps = 0 === strpos($dsn, 'amqps://');
  162. $pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
  163. $exchangeName = $pathParts[1] ?? 'messages';
  164. parse_str($parsedUrl['query'] ?? '', $parsedQuery);
  165. $port = $useAmqps ? 5671 : 5672;
  166. $amqpOptions = array_replace_recursive([
  167. 'host' => $parsedUrl['host'] ?? 'localhost',
  168. 'port' => $parsedUrl['port'] ?? $port,
  169. 'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
  170. 'exchange' => [
  171. 'name' => $exchangeName,
  172. ],
  173. ], $options, $parsedQuery);
  174. self::validateOptions($amqpOptions);
  175. if (isset($parsedUrl['user'])) {
  176. $amqpOptions['login'] = $parsedUrl['user'];
  177. }
  178. if (isset($parsedUrl['pass'])) {
  179. $amqpOptions['password'] = $parsedUrl['pass'];
  180. }
  181. if (!isset($amqpOptions['queues'])) {
  182. $amqpOptions['queues'][$exchangeName] = [];
  183. }
  184. $exchangeOptions = $amqpOptions['exchange'];
  185. $queuesOptions = $amqpOptions['queues'];
  186. unset($amqpOptions['queues'], $amqpOptions['exchange']);
  187. $queuesOptions = array_map(function ($queueOptions) {
  188. if (!\is_array($queueOptions)) {
  189. $queueOptions = [];
  190. }
  191. if (\is_array($queueOptions['arguments'] ?? false)) {
  192. $queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
  193. }
  194. return $queueOptions;
  195. }, $queuesOptions);
  196. if ($useAmqps && !self::hasCaCertConfigured($amqpOptions)) {
  197. throw new InvalidArgumentException('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.');
  198. }
  199. return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
  200. }
  201. private static function validateOptions(array $options): void
  202. {
  203. if (0 < \count($invalidOptions = array_diff(array_keys($options), self::AVAILABLE_OPTIONS))) {
  204. trigger_deprecation('symfony/messenger', '5.1', 'Invalid option(s) "%s" passed to the AMQP Messenger transport. Passing invalid options is deprecated.', implode('", "', $invalidOptions));
  205. }
  206. if (\is_array($options['queues'] ?? false)) {
  207. foreach ($options['queues'] as $queue) {
  208. if (!\is_array($queue)) {
  209. continue;
  210. }
  211. if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
  212. trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
  213. }
  214. }
  215. }
  216. if (\is_array($options['exchange'] ?? false)
  217. && 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) {
  218. trigger_deprecation('symfony/messenger', '5.1', 'Invalid exchange option(s) "%s" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated.', implode('", "', $invalidExchangeOptions));
  219. }
  220. }
  221. private static function normalizeQueueArguments(array $arguments): array
  222. {
  223. foreach (self::ARGUMENTS_AS_INTEGER as $key) {
  224. if (!\array_key_exists($key, $arguments)) {
  225. continue;
  226. }
  227. if (!is_numeric($arguments[$key])) {
  228. throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, get_debug_type($arguments[$key])));
  229. }
  230. $arguments[$key] = (int) $arguments[$key];
  231. }
  232. return $arguments;
  233. }
  234. private static function hasCaCertConfigured(array $amqpOptions): bool
  235. {
  236. return (isset($amqpOptions['cacert']) && '' !== $amqpOptions['cacert']) || '' !== ini_get('amqp.cacert');
  237. }
  238. /**
  239. * @throws \AMQPException
  240. */
  241. public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
  242. {
  243. $this->clearWhenDisconnected();
  244. if (0 !== $delayInMs) {
  245. $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
  246. return;
  247. }
  248. if ($this->shouldSetup()) {
  249. $this->setupExchangeAndQueues();
  250. }
  251. $this->publishOnExchange(
  252. $this->exchange(),
  253. $body,
  254. $this->getRoutingKeyForMessage($amqpStamp),
  255. $headers,
  256. $amqpStamp
  257. );
  258. }
  259. /**
  260. * Returns an approximate count of the messages in defined queues.
  261. */
  262. public function countMessagesInQueues(): int
  263. {
  264. return array_sum(array_map(function ($queueName) {
  265. return $this->queue($queueName)->declareQueue();
  266. }, $this->getQueueNames()));
  267. }
  268. /**
  269. * @throws \AMQPException
  270. */
  271. private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
  272. {
  273. $routingKey = $this->getRoutingKeyForMessage($amqpStamp);
  274. $this->setupDelay($delay, $routingKey);
  275. $this->publishOnExchange(
  276. $this->getDelayExchange(),
  277. $body,
  278. $this->getRoutingKeyForDelay($delay, $routingKey),
  279. $headers,
  280. $amqpStamp
  281. );
  282. }
  283. private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
  284. {
  285. $attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
  286. $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
  287. $attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
  288. $attributes['timestamp'] = $attributes['timestamp'] ?? time();
  289. $exchange->publish(
  290. $body,
  291. $routingKey,
  292. $amqpStamp ? $amqpStamp->getFlags() : \AMQP_NOPARAM,
  293. $attributes
  294. );
  295. if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
  296. $this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']);
  297. }
  298. }
  299. private function setupDelay(int $delay, ?string $routingKey)
  300. {
  301. if ($this->shouldSetup()) {
  302. $this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
  303. }
  304. $queue = $this->createDelayQueue($delay, $routingKey);
  305. $queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
  306. $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
  307. }
  308. private function getDelayExchange(): \AMQPExchange
  309. {
  310. if (null === $this->amqpDelayExchange) {
  311. $this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
  312. $this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
  313. $this->amqpDelayExchange->setType(\AMQP_EX_TYPE_DIRECT);
  314. $this->amqpDelayExchange->setFlags(\AMQP_DURABLE);
  315. }
  316. return $this->amqpDelayExchange;
  317. }
  318. /**
  319. * Creates a delay queue that will delay for a certain amount of time.
  320. *
  321. * This works by setting message TTL for the delay and pointing
  322. * the dead letter exchange to the original exchange. The result
  323. * is that after the TTL, the message is sent to the dead-letter-exchange,
  324. * which is the original exchange, resulting on it being put back into
  325. * the original queue.
  326. */
  327. private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
  328. {
  329. $queue = $this->amqpFactory->createQueue($this->channel());
  330. $queue->setName(str_replace(
  331. ['%delay%', '%exchange_name%', '%routing_key%'],
  332. [$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
  333. $this->connectionOptions['delay']['queue_name_pattern']
  334. ));
  335. $queue->setFlags(\AMQP_DURABLE);
  336. $queue->setArguments([
  337. 'x-message-ttl' => $delay,
  338. // delete the delay queue 10 seconds after the message expires
  339. // publishing another message redeclares the queue which renews the lease
  340. 'x-expires' => $delay + 10000,
  341. 'x-dead-letter-exchange' => $this->exchangeOptions['name'],
  342. // after being released from to DLX, make sure the original routing key will be used
  343. // we must use an empty string instead of null for the argument to be picked up
  344. 'x-dead-letter-routing-key' => $routingKey ?? '',
  345. ]);
  346. return $queue;
  347. }
  348. private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
  349. {
  350. return str_replace(
  351. ['%delay%', '%exchange_name%', '%routing_key%'],
  352. [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
  353. $this->connectionOptions['delay']['queue_name_pattern']
  354. );
  355. }
  356. /**
  357. * Gets a message from the specified queue.
  358. *
  359. * @throws \AMQPException
  360. */
  361. public function get(string $queueName): ?\AMQPEnvelope
  362. {
  363. $this->clearWhenDisconnected();
  364. if ($this->shouldSetup()) {
  365. $this->setupExchangeAndQueues();
  366. }
  367. try {
  368. if (false !== $message = $this->queue($queueName)->get()) {
  369. return $message;
  370. }
  371. } catch (\AMQPQueueException $e) {
  372. if (404 === $e->getCode() && $this->shouldSetup()) {
  373. // If we get a 404 for the queue, it means we need to set up the exchange & queue.
  374. $this->setupExchangeAndQueues();
  375. return $this->get($queueName);
  376. }
  377. throw $e;
  378. }
  379. return null;
  380. }
  381. public function ack(\AMQPEnvelope $message, string $queueName): bool
  382. {
  383. return $this->queue($queueName)->ack($message->getDeliveryTag());
  384. }
  385. public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
  386. {
  387. return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
  388. }
  389. public function setup(): void
  390. {
  391. $this->setupExchangeAndQueues();
  392. $this->getDelayExchange()->declareExchange();
  393. }
  394. private function setupExchangeAndQueues(): void
  395. {
  396. $this->exchange()->declareExchange();
  397. foreach ($this->queuesOptions as $queueName => $queueConfig) {
  398. $this->queue($queueName)->declareQueue();
  399. foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
  400. $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
  401. }
  402. }
  403. }
  404. /**
  405. * @return string[]
  406. */
  407. public function getQueueNames(): array
  408. {
  409. return array_keys($this->queuesOptions);
  410. }
  411. public function channel(): \AMQPChannel
  412. {
  413. if (null === $this->amqpChannel) {
  414. $connection = $this->amqpFactory->createConnection($this->connectionOptions);
  415. $connectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pconnect' : 'connect';
  416. try {
  417. $connection->{$connectMethod}();
  418. } catch (\AMQPConnectionException $e) {
  419. $credentials = $this->connectionOptions;
  420. $credentials['password'] = '********';
  421. unset($credentials['delay']);
  422. throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s).', json_encode($credentials, \JSON_UNESCAPED_SLASHES)), 0, $e);
  423. }
  424. $this->amqpChannel = $this->amqpFactory->createChannel($connection);
  425. if (isset($this->connectionOptions['prefetch_count'])) {
  426. $this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
  427. }
  428. if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
  429. $this->amqpChannel->confirmSelect();
  430. $this->amqpChannel->setConfirmCallback(
  431. static function (): bool {
  432. return false;
  433. },
  434. static function (): bool {
  435. return false;
  436. }
  437. );
  438. }
  439. }
  440. return $this->amqpChannel;
  441. }
  442. public function queue(string $queueName): \AMQPQueue
  443. {
  444. if (!isset($this->amqpQueues[$queueName])) {
  445. $queueConfig = $this->queuesOptions[$queueName];
  446. $amqpQueue = $this->amqpFactory->createQueue($this->channel());
  447. $amqpQueue->setName($queueName);
  448. $amqpQueue->setFlags($queueConfig['flags'] ?? \AMQP_DURABLE);
  449. if (isset($queueConfig['arguments'])) {
  450. $amqpQueue->setArguments($queueConfig['arguments']);
  451. }
  452. $this->amqpQueues[$queueName] = $amqpQueue;
  453. }
  454. return $this->amqpQueues[$queueName];
  455. }
  456. public function exchange(): \AMQPExchange
  457. {
  458. if (null === $this->amqpExchange) {
  459. $this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
  460. $this->amqpExchange->setName($this->exchangeOptions['name']);
  461. $this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
  462. $this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
  463. if (isset($this->exchangeOptions['arguments'])) {
  464. $this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
  465. }
  466. }
  467. return $this->amqpExchange;
  468. }
  469. private function clearWhenDisconnected(): void
  470. {
  471. if (!$this->channel()->isConnected()) {
  472. $this->amqpChannel = null;
  473. $this->amqpQueues = [];
  474. $this->amqpExchange = null;
  475. $this->amqpDelayExchange = null;
  476. }
  477. }
  478. private function shouldSetup(): bool
  479. {
  480. if (!\array_key_exists('auto_setup', $this->connectionOptions)) {
  481. return true;
  482. }
  483. if (\in_array($this->connectionOptions['auto_setup'], [false, 'false'], true)) {
  484. return false;
  485. }
  486. return true;
  487. }
  488. private function getDefaultPublishRoutingKey(): ?string
  489. {
  490. return $this->exchangeOptions['default_publish_routing_key'] ?? null;
  491. }
  492. public function purgeQueues()
  493. {
  494. foreach ($this->getQueueNames() as $queueName) {
  495. $this->queue($queueName)->purge();
  496. }
  497. }
  498. private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
  499. {
  500. return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
  501. }
  502. }
  503. class_alias(Connection::class, \Symfony\Component\Messenger\Transport\AmqpExt\Connection::class);