PrimaryReadReplicaConnection.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. <?php
  2. namespace Doctrine\DBAL\Connections;
  3. use Doctrine\Common\EventManager;
  4. use Doctrine\DBAL\Configuration;
  5. use Doctrine\DBAL\Connection;
  6. use Doctrine\DBAL\Driver;
  7. use Doctrine\DBAL\Driver\Connection as DriverConnection;
  8. use Doctrine\DBAL\Event\ConnectionEventArgs;
  9. use Doctrine\DBAL\Events;
  10. use InvalidArgumentException;
  11. use function array_rand;
  12. use function assert;
  13. use function count;
  14. use function func_get_args;
  15. /**
  16. * Primary-Replica Connection
  17. *
  18. * Connection can be used with primary-replica setups.
  19. *
  20. * Important for the understanding of this connection should be how and when
  21. * it picks the replica or primary.
  22. *
  23. * 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection'
  24. * or 'executeQuery' is used.
  25. * 2. Primary picked when 'exec', 'executeUpdate', 'executeStatement', 'insert', 'delete', 'update', 'createSavepoint',
  26. * 'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or
  27. * 'prepare' is called.
  28. * 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards.
  29. * 4. One replica connection is randomly picked ONCE during a request.
  30. *
  31. * ATTENTION: You can write to the replica with this connection if you execute a write query without
  32. * opening up a transaction. For example:
  33. *
  34. * $conn = DriverManager::getConnection(...);
  35. * $conn->executeQuery("DELETE FROM table");
  36. *
  37. * Be aware that Connection#executeQuery is a method specifically for READ
  38. * operations only.
  39. *
  40. * Use Connection#executeStatement for any SQL statement that changes/updates
  41. * state in the database (UPDATE, INSERT, DELETE or DDL statements).
  42. *
  43. * This connection is limited to replica operations using the
  44. * Connection#executeQuery operation only, because it wouldn't be compatible
  45. * with the ORM or SchemaManager code otherwise. Both use all the other
  46. * operations in a context where writes could happen to a replica, which makes
  47. * this restricted approach necessary.
  48. *
  49. * You can manually connect to the primary at any time by calling:
  50. *
  51. * $conn->ensureConnectedToPrimary();
  52. *
  53. * Instantiation through the DriverManager looks like:
  54. *
  55. * @psalm-import-type Params from \Doctrine\DBAL\DriverManager
  56. * @example
  57. *
  58. * $conn = DriverManager::getConnection(array(
  59. * 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection',
  60. * 'driver' => 'pdo_mysql',
  61. * 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
  62. * 'replica' => array(
  63. * array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''),
  64. * array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''),
  65. * )
  66. * ));
  67. *
  68. * You can also pass 'driverOptions' and any other documented option to each of this drivers
  69. * to pass additional information.
  70. */
  71. class PrimaryReadReplicaConnection extends Connection
  72. {
  73. /**
  74. * Primary and Replica connection (one of the randomly picked replicas).
  75. *
  76. * @var DriverConnection[]|null[]
  77. */
  78. protected $connections = ['primary' => null, 'replica' => null];
  79. /**
  80. * You can keep the replica connection and then switch back to it
  81. * during the request if you know what you are doing.
  82. *
  83. * @var bool
  84. */
  85. protected $keepReplica = false;
  86. /**
  87. * Creates Primary Replica Connection.
  88. *
  89. * @internal The connection can be only instantiated by the driver manager.
  90. *
  91. * @param array<string,mixed> $params
  92. *
  93. * @throws InvalidArgumentException
  94. *
  95. * @phpstan-param array<string,mixed> $params
  96. * @psalm-param Params $params
  97. */
  98. public function __construct(
  99. array $params,
  100. Driver $driver,
  101. ?Configuration $config = null,
  102. ?EventManager $eventManager = null
  103. ) {
  104. if (! isset($params['replica'], $params['primary'])) {
  105. throw new InvalidArgumentException('primary or replica configuration missing');
  106. }
  107. if (count($params['replica']) === 0) {
  108. throw new InvalidArgumentException('You have to configure at least one replica.');
  109. }
  110. if (isset($params['driver'])) {
  111. $params['primary']['driver'] = $params['driver'];
  112. foreach ($params['replica'] as $replicaKey => $replica) {
  113. $params['replica'][$replicaKey]['driver'] = $params['driver'];
  114. }
  115. }
  116. $this->keepReplica = (bool) ($params['keepReplica'] ?? false);
  117. parent::__construct($params, $driver, $config, $eventManager);
  118. }
  119. /**
  120. * Checks if the connection is currently towards the primary or not.
  121. */
  122. public function isConnectedToPrimary(): bool
  123. {
  124. return $this->_conn !== null && $this->_conn === $this->connections['primary'];
  125. }
  126. /**
  127. * @param string|null $connectionName
  128. *
  129. * @return bool
  130. */
  131. public function connect($connectionName = null)
  132. {
  133. if ($connectionName !== null) {
  134. throw new InvalidArgumentException(
  135. 'Passing a connection name as first argument is not supported anymore.'
  136. . ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.'
  137. );
  138. }
  139. return $this->performConnect();
  140. }
  141. protected function performConnect(?string $connectionName = null): bool
  142. {
  143. $requestedConnectionChange = ($connectionName !== null);
  144. $connectionName = $connectionName ?: 'replica';
  145. if ($connectionName !== 'replica' && $connectionName !== 'primary') {
  146. throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.');
  147. }
  148. // If we have a connection open, and this is not an explicit connection
  149. // change request, then abort right here, because we are already done.
  150. // This prevents writes to the replica in case of "keepReplica" option enabled.
  151. if ($this->_conn !== null && ! $requestedConnectionChange) {
  152. return false;
  153. }
  154. $forcePrimaryAsReplica = false;
  155. if ($this->getTransactionNestingLevel() > 0) {
  156. $connectionName = 'primary';
  157. $forcePrimaryAsReplica = true;
  158. }
  159. if (isset($this->connections[$connectionName])) {
  160. $this->_conn = $this->connections[$connectionName];
  161. if ($forcePrimaryAsReplica && ! $this->keepReplica) {
  162. $this->connections['replica'] = $this->_conn;
  163. }
  164. return false;
  165. }
  166. if ($connectionName === 'primary') {
  167. $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName);
  168. // Set replica connection to primary to avoid invalid reads
  169. if (! $this->keepReplica) {
  170. $this->connections['replica'] = $this->connections['primary'];
  171. }
  172. } else {
  173. $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName);
  174. }
  175. if ($this->_eventManager->hasListeners(Events::postConnect)) {
  176. $eventArgs = new ConnectionEventArgs($this);
  177. $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
  178. }
  179. return true;
  180. }
  181. /**
  182. * Connects to the primary node of the database cluster.
  183. *
  184. * All following statements after this will be executed against the primary node.
  185. */
  186. public function ensureConnectedToPrimary(): bool
  187. {
  188. return $this->performConnect('primary');
  189. }
  190. /**
  191. * Connects to a replica node of the database cluster.
  192. *
  193. * All following statements after this will be executed against the replica node,
  194. * unless the keepReplica option is set to false and a primary connection
  195. * was already opened.
  196. */
  197. public function ensureConnectedToReplica(): bool
  198. {
  199. return $this->performConnect('replica');
  200. }
  201. /**
  202. * Connects to a specific connection.
  203. *
  204. * @param string $connectionName
  205. *
  206. * @return DriverConnection
  207. */
  208. protected function connectTo($connectionName)
  209. {
  210. $params = $this->getParams();
  211. $driverOptions = $params['driverOptions'] ?? [];
  212. $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params);
  213. $user = $connectionParams['user'] ?? null;
  214. $password = $connectionParams['password'] ?? null;
  215. return $this->_driver->connect($connectionParams, $user, $password, $driverOptions);
  216. }
  217. /**
  218. * @param string $connectionName
  219. * @param mixed[] $params
  220. *
  221. * @return mixed
  222. */
  223. protected function chooseConnectionConfiguration($connectionName, $params)
  224. {
  225. if ($connectionName === 'primary') {
  226. return $params['primary'];
  227. }
  228. $config = $params['replica'][array_rand($params['replica'])];
  229. if (! isset($config['charset']) && isset($params['primary']['charset'])) {
  230. $config['charset'] = $params['primary']['charset'];
  231. }
  232. return $config;
  233. }
  234. /**
  235. * {@inheritDoc}
  236. *
  237. * @deprecated Use {@link executeStatement()} instead.
  238. */
  239. public function executeUpdate($sql, array $params = [], array $types = [])
  240. {
  241. $this->ensureConnectedToPrimary();
  242. return parent::executeUpdate($sql, $params, $types);
  243. }
  244. /**
  245. * {@inheritDoc}
  246. */
  247. public function executeStatement($sql, array $params = [], array $types = [])
  248. {
  249. $this->ensureConnectedToPrimary();
  250. return parent::executeStatement($sql, $params, $types);
  251. }
  252. /**
  253. * {@inheritDoc}
  254. */
  255. public function beginTransaction()
  256. {
  257. $this->ensureConnectedToPrimary();
  258. return parent::beginTransaction();
  259. }
  260. /**
  261. * {@inheritDoc}
  262. */
  263. public function commit()
  264. {
  265. $this->ensureConnectedToPrimary();
  266. return parent::commit();
  267. }
  268. /**
  269. * {@inheritDoc}
  270. */
  271. public function rollBack()
  272. {
  273. $this->ensureConnectedToPrimary();
  274. return parent::rollBack();
  275. }
  276. /**
  277. * {@inheritDoc}
  278. */
  279. public function delete($table, array $criteria, array $types = [])
  280. {
  281. $this->ensureConnectedToPrimary();
  282. return parent::delete($table, $criteria, $types);
  283. }
  284. /**
  285. * {@inheritDoc}
  286. */
  287. public function close()
  288. {
  289. unset($this->connections['primary'], $this->connections['replica']);
  290. parent::close();
  291. $this->_conn = null;
  292. $this->connections = ['primary' => null, 'replica' => null];
  293. }
  294. /**
  295. * {@inheritDoc}
  296. */
  297. public function update($table, array $data, array $criteria, array $types = [])
  298. {
  299. $this->ensureConnectedToPrimary();
  300. return parent::update($table, $data, $criteria, $types);
  301. }
  302. /**
  303. * {@inheritDoc}
  304. */
  305. public function insert($table, array $data, array $types = [])
  306. {
  307. $this->ensureConnectedToPrimary();
  308. return parent::insert($table, $data, $types);
  309. }
  310. /**
  311. * {@inheritDoc}
  312. */
  313. public function exec($statement)
  314. {
  315. $this->ensureConnectedToPrimary();
  316. return parent::exec($statement);
  317. }
  318. /**
  319. * {@inheritDoc}
  320. */
  321. public function createSavepoint($savepoint)
  322. {
  323. $this->ensureConnectedToPrimary();
  324. parent::createSavepoint($savepoint);
  325. }
  326. /**
  327. * {@inheritDoc}
  328. */
  329. public function releaseSavepoint($savepoint)
  330. {
  331. $this->ensureConnectedToPrimary();
  332. parent::releaseSavepoint($savepoint);
  333. }
  334. /**
  335. * {@inheritDoc}
  336. */
  337. public function rollbackSavepoint($savepoint)
  338. {
  339. $this->ensureConnectedToPrimary();
  340. parent::rollbackSavepoint($savepoint);
  341. }
  342. /**
  343. * {@inheritDoc}
  344. */
  345. public function query()
  346. {
  347. $this->ensureConnectedToPrimary();
  348. assert($this->_conn instanceof DriverConnection);
  349. $args = func_get_args();
  350. $logger = $this->getConfiguration()->getSQLLogger();
  351. if ($logger) {
  352. $logger->startQuery($args[0]);
  353. }
  354. $statement = $this->_conn->query(...$args);
  355. $statement->setFetchMode($this->defaultFetchMode);
  356. if ($logger) {
  357. $logger->stopQuery();
  358. }
  359. return $statement;
  360. }
  361. /**
  362. * {@inheritDoc}
  363. */
  364. public function prepare($statement)
  365. {
  366. $this->ensureConnectedToPrimary();
  367. return parent::prepare($statement);
  368. }
  369. }