Connection.php 19 KB

  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
  11. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  12. use Symfony\Component\Messenger\Exception\LogicException;
  13. use Symfony\Component\Messenger\Exception\TransportException;
  14. /**
  15. * A Redis connection.
  16. *
  17. * @author Alexander Schranz <>
  18. * @author Antoine Bluchet <>
  19. * @author Robin Chalas <>
  20. *
  21. * @internal
  22. * @final
  23. */
  24. class Connection
  25. {
  26. private const DEFAULT_OPTIONS = [
  27. 'stream' => 'messages',
  28. 'group' => 'symfony',
  29. 'consumer' => 'consumer',
  30. 'auto_setup' => true,
  31. 'delete_after_ack' => false,
  32. 'delete_after_reject' => true,
  33. 'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
  34. 'dbindex' => 0,
  35. 'tls' => false,
  36. 'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
  37. 'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
  38. 'lazy' => false,
  39. ];
  40. private $connection;
  41. private $stream;
  42. private $queue;
  43. private $group;
  44. private $consumer;
  45. private $autoSetup;
  46. private $maxEntries;
  47. private $redeliverTimeout;
  48. private $nextClaim = 0;
  49. private $claimInterval;
  50. private $deleteAfterAck;
  51. private $deleteAfterReject;
  52. private $couldHavePendingMessages = true;
  53. public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
  54. {
  55. if (version_compare(phpversion('redis'), '4.3.0', '<')) {
  56. throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
  57. }
  58. $host = $connectionCredentials['host'] ?? '';
  59. $port = $connectionCredentials['port'] ?? 6379;
  60. $serializer = $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP;
  61. $dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex'];
  62. $auth = $connectionCredentials['auth'] ?? null;
  63. if ('' === $auth) {
  64. $auth = null;
  65. }
  66. $initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
  67. $redis->connect($host, $port);
  68. $redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
  69. if (null !== $auth && !$redis->auth($auth)) {
  70. throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
  71. }
  72. if ($dbIndex && !$redis->select($dbIndex)) {
  73. throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
  74. }
  75. return true;
  76. };
  77. if (null === $redis) {
  78. $redis = new \Redis();
  79. }
  80. if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
  81. $redis = new RedisProxy($redis, $initializer);
  82. } else {
  83. $initializer($redis);
  84. }
  85. $this->connection = $redis;
  86. foreach (['stream', 'group', 'consumer'] as $key) {
  87. if (isset($configuration[$key]) && '' === $configuration[$key]) {
  88. throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
  89. }
  90. }
  91. $this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
  92. $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
  93. $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
  94. $this->queue = $this->stream.'__queue';
  95. $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
  96. $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
  97. $this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
  98. $this->deleteAfterReject = $configuration['delete_after_reject'] ?? self::DEFAULT_OPTIONS['delete_after_reject'];
  99. $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
  100. $this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
  101. }
  102. public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
  103. {
  104. $url = $dsn;
  105. if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
  106. $url = str_replace('redis:', 'file:', $dsn);
  107. }
  108. if (false === $parsedUrl = parse_url($url)) {
  109. throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
  110. }
  111. if (isset($parsedUrl['query'])) {
  112. parse_str($parsedUrl['query'], $dsnOptions);
  113. $redisOptions = array_merge($redisOptions, $dsnOptions);
  114. }
  115. self::validateOptions($redisOptions);
  116. $autoSetup = null;
  117. if (\array_key_exists('auto_setup', $redisOptions)) {
  118. $autoSetup = filter_var($redisOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
  119. unset($redisOptions['auto_setup']);
  120. }
  121. $maxEntries = null;
  122. if (\array_key_exists('stream_max_entries', $redisOptions)) {
  123. $maxEntries = filter_var($redisOptions['stream_max_entries'], \FILTER_VALIDATE_INT);
  124. unset($redisOptions['stream_max_entries']);
  125. }
  126. $deleteAfterAck = null;
  127. if (\array_key_exists('delete_after_ack', $redisOptions)) {
  128. $deleteAfterAck = filter_var($redisOptions['delete_after_ack'], \FILTER_VALIDATE_BOOLEAN);
  129. unset($redisOptions['delete_after_ack']);
  130. }
  131. $deleteAfterReject = null;
  132. if (\array_key_exists('delete_after_reject', $redisOptions)) {
  133. $deleteAfterReject = filter_var($redisOptions['delete_after_reject'], \FILTER_VALIDATE_BOOLEAN);
  134. unset($redisOptions['delete_after_reject']);
  135. }
  136. $dbIndex = null;
  137. if (\array_key_exists('dbindex', $redisOptions)) {
  138. $dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT);
  139. unset($redisOptions['dbindex']);
  140. }
  141. $tls = false;
  142. if (\array_key_exists('tls', $redisOptions)) {
  143. $tls = filter_var($redisOptions['tls'], \FILTER_VALIDATE_BOOLEAN);
  144. unset($redisOptions['tls']);
  145. }
  146. $redeliverTimeout = null;
  147. if (\array_key_exists('redeliver_timeout', $redisOptions)) {
  148. $redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], \FILTER_VALIDATE_INT);
  149. unset($redisOptions['redeliver_timeout']);
  150. }
  151. $claimInterval = null;
  152. if (\array_key_exists('claim_interval', $redisOptions)) {
  153. $claimInterval = filter_var($redisOptions['claim_interval'], \FILTER_VALIDATE_INT);
  154. unset($redisOptions['claim_interval']);
  155. }
  156. $configuration = [
  157. 'stream' => $redisOptions['stream'] ?? null,
  158. 'group' => $redisOptions['group'] ?? null,
  159. 'consumer' => $redisOptions['consumer'] ?? null,
  160. 'lazy' => $redisOptions['lazy'] ?? self::DEFAULT_OPTIONS['lazy'],
  161. 'auto_setup' => $autoSetup,
  162. 'stream_max_entries' => $maxEntries,
  163. 'delete_after_ack' => $deleteAfterAck,
  164. 'delete_after_reject' => $deleteAfterReject,
  165. 'dbindex' => $dbIndex,
  166. 'redeliver_timeout' => $redeliverTimeout,
  167. 'claim_interval' => $claimInterval,
  168. ];
  169. if (isset($parsedUrl['host'])) {
  170. $connectionCredentials = [
  171. 'host' => $parsedUrl['host'] ?? '',
  172. 'port' => $parsedUrl['port'] ?? 6379,
  173. 'auth' => $parsedUrl['pass'] ?? $parsedUrl['user'] ?? null,
  174. ];
  175. $pathParts = explode('/', rtrim($parsedUrl['path'] ?? '', '/'));
  176. $configuration['stream'] = $pathParts[1] ?? $configuration['stream'];
  177. $configuration['group'] = $pathParts[2] ?? $configuration['group'];
  178. $configuration['consumer'] = $pathParts[3] ?? $configuration['consumer'];
  179. if ($tls) {
  180. $connectionCredentials['host'] = 'tls://'.$connectionCredentials['host'];
  181. }
  182. } else {
  183. $connectionCredentials = [
  184. 'host' => $parsedUrl['path'],
  185. 'port' => 0,
  186. ];
  187. }
  188. return new self($configuration, $connectionCredentials, $redisOptions, $redis);
  189. }
  190. private static function validateOptions(array $options): void
  191. {
  192. $availableOptions = array_keys(self::DEFAULT_OPTIONS);
  193. $availableOptions[] = 'serializer';
  194. if (0 < \count($invalidOptions = array_diff(array_keys($options), $availableOptions))) {
  195. trigger_deprecation('symfony/messenger', '5.1', 'Invalid option(s) "%s" passed to the Redis Messenger transport. Passing invalid options is deprecated.', implode('", "', $invalidOptions));
  196. }
  197. }
  198. private function claimOldPendingMessages()
  199. {
  200. try {
  201. // This could soon be optimized with or
  202. //
  203. $pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1);
  204. } catch (\RedisException $e) {
  205. throw new TransportException($e->getMessage(), 0, $e);
  206. }
  207. $claimableIds = [];
  208. foreach ($pendingMessages as $pendingMessage) {
  209. if ($pendingMessage[1] === $this->consumer) {
  210. $this->couldHavePendingMessages = true;
  211. return;
  212. }
  213. if ($pendingMessage[2] >= $this->redeliverTimeout) {
  214. $claimableIds[] = $pendingMessage[0];
  215. }
  216. }
  217. if (\count($claimableIds) > 0) {
  218. try {
  219. $this->connection->xclaim(
  220. $this->stream,
  221. $this->group,
  222. $this->consumer,
  223. $this->redeliverTimeout,
  224. $claimableIds,
  225. ['JUSTID']
  226. );
  227. $this->couldHavePendingMessages = true;
  228. } catch (\RedisException $e) {
  229. throw new TransportException($e->getMessage(), 0, $e);
  230. }
  231. }
  232. $this->nextClaim = $this->getCurrentTimeInMilliseconds() + $this->claimInterval;
  233. }
  234. public function get(): ?array
  235. {
  236. if ($this->autoSetup) {
  237. $this->setup();
  238. }
  239. try {
  240. $queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
  241. } catch (\RedisException $e) {
  242. throw new TransportException($e->getMessage(), 0, $e);
  243. }
  244. if ($queuedMessageCount) {
  245. for ($i = 0; $i < $queuedMessageCount; ++$i) {
  246. try {
  247. $queuedMessages = $this->connection->zpopmin($this->queue, 1);
  248. } catch (\RedisException $e) {
  249. throw new TransportException($e->getMessage(), 0, $e);
  250. }
  251. foreach ($queuedMessages as $queuedMessage => $time) {
  252. $queuedMessage = json_decode($queuedMessage, true);
  253. // if a futured placed message is actually popped because of a race condition with
  254. // another running message consumer, the message is readded to the queue by add function
  255. // else its just added stream and will be available for all stream consumers
  256. $this->add(
  257. $queuedMessage['body'],
  258. $queuedMessage['headers'],
  259. $time - $this->getCurrentTimeInMilliseconds()
  260. );
  261. }
  262. }
  263. }
  264. if (!$this->couldHavePendingMessages && $this->nextClaim <= $this->getCurrentTimeInMilliseconds()) {
  265. $this->claimOldPendingMessages();
  266. }
  267. $messageId = '>'; // will receive new messages
  268. if ($this->couldHavePendingMessages) {
  269. $messageId = '0'; // will receive consumers pending messages
  270. }
  271. try {
  272. $messages = $this->connection->xreadgroup(
  273. $this->group,
  274. $this->consumer,
  275. [$this->stream => $messageId],
  276. 1
  277. );
  278. } catch (\RedisException $e) {
  279. throw new TransportException($e->getMessage(), 0, $e);
  280. }
  281. if (false === $messages) {
  282. if ($error = $this->connection->getLastError() ?: null) {
  283. $this->connection->clearLastError();
  284. }
  285. throw new TransportException($error ?? 'Could not read messages from the redis stream.');
  286. }
  287. if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
  288. $this->couldHavePendingMessages = false;
  289. // No pending messages so get a new one
  290. return $this->get();
  291. }
  292. foreach ($messages[$this->stream] ?? [] as $key => $message) {
  293. $redisEnvelope = json_decode($message['message'], true);
  294. return [
  295. 'id' => $key,
  296. 'body' => $redisEnvelope['body'],
  297. 'headers' => $redisEnvelope['headers'],
  298. ];
  299. }
  300. return null;
  301. }
  302. public function ack(string $id): void
  303. {
  304. try {
  305. $acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
  306. if ($this->deleteAfterAck) {
  307. $acknowledged = $this->connection->xdel($this->stream, [$id]);
  308. }
  309. } catch (\RedisException $e) {
  310. throw new TransportException($e->getMessage(), 0, $e);
  311. }
  312. if (!$acknowledged) {
  313. if ($error = $this->connection->getLastError() ?: null) {
  314. $this->connection->clearLastError();
  315. }
  316. throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
  317. }
  318. }
  319. public function reject(string $id): void
  320. {
  321. try {
  322. $deleted = $this->connection->xack($this->stream, $this->group, [$id]);
  323. if ($this->deleteAfterReject) {
  324. $deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
  325. }
  326. } catch (\RedisException $e) {
  327. throw new TransportException($e->getMessage(), 0, $e);
  328. }
  329. if (!$deleted) {
  330. if ($error = $this->connection->getLastError() ?: null) {
  331. $this->connection->clearLastError();
  332. }
  333. throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
  334. }
  335. }
  336. public function add(string $body, array $headers, int $delayInMs = 0): void
  337. {
  338. if ($this->autoSetup) {
  339. $this->setup();
  340. }
  341. try {
  342. if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
  343. $message = json_encode([
  344. 'body' => $body,
  345. 'headers' => $headers,
  346. // Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
  347. 'uniqid' => uniqid('', true),
  348. ]);
  349. if (false === $message) {
  350. throw new TransportException(json_last_error_msg());
  351. }
  352. $score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs);
  353. $added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
  354. } else {
  355. $message = json_encode([
  356. 'body' => $body,
  357. 'headers' => $headers,
  358. ]);
  359. if (false === $message) {
  360. throw new TransportException(json_last_error_msg());
  361. }
  362. if ($this->maxEntries) {
  363. $added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
  364. } else {
  365. $added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
  366. }
  367. }
  368. } catch (\RedisException $e) {
  369. if ($error = $this->connection->getLastError() ?: null) {
  370. $this->connection->clearLastError();
  371. }
  372. throw new TransportException($error ?? $e->getMessage(), 0, $e);
  373. }
  374. if (!$added) {
  375. if ($error = $this->connection->getLastError() ?: null) {
  376. $this->connection->clearLastError();
  377. }
  378. throw new TransportException($error ?? 'Could not add a message to the redis stream.');
  379. }
  380. }
  381. public function setup(): void
  382. {
  383. try {
  384. $this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
  385. } catch (\RedisException $e) {
  386. throw new TransportException($e->getMessage(), 0, $e);
  387. }
  388. // group might already exist, ignore
  389. if ($this->connection->getLastError()) {
  390. $this->connection->clearLastError();
  391. }
  392. if ($this->deleteAfterAck || $this->deleteAfterReject) {
  393. $groups = $this->connection->xinfo('GROUPS', $this->stream);
  394. if (
  395. // support for Redis extension version 5+
  396. (\is_array($groups) && 1 < \count($groups))
  397. // support for Redis extension version 4.x
  398. || (\is_string($groups) && substr_count($groups, '"name"'))
  399. ) {
  400. throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
  401. }
  402. }
  403. $this->autoSetup = false;
  404. }
  405. private function getCurrentTimeInMilliseconds(): int
  406. {
  407. return (int) (microtime(true) * 1000);
  408. }
  409. public function cleanup(): void
  410. {
  411. static $unlink = true;
  412. if ($unlink) {
  413. try {
  414. $unlink = false !== $this->connection->unlink($this->stream, $this->queue);
  415. } catch (\Throwable $e) {
  416. $unlink = false;
  417. }
  418. }
  419. if (!$unlink) {
  420. $this->connection->del($this->stream, $this->queue);
  421. }
  422. }
  423. }
  424. class_alias(Connection::class, \Symfony\Component\Messenger\Transport\RedisExt\Connection::class);