123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- <?php
- namespace Doctrine\DBAL\Connections;
- use Doctrine\Common\EventManager;
- use Doctrine\DBAL\Configuration;
- use Doctrine\DBAL\Connection;
- use Doctrine\DBAL\Driver;
- use Doctrine\DBAL\Driver\Connection as DriverConnection;
- use Doctrine\DBAL\Event\ConnectionEventArgs;
- use Doctrine\DBAL\Events;
- use InvalidArgumentException;
- use function array_rand;
- use function assert;
- use function count;
- use function func_get_args;
- /**
- * Primary-Replica Connection
- *
- * Connection can be used with primary-replica setups.
- *
- * Important for the understanding of this connection should be how and when
- * it picks the replica or primary.
- *
- * 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection'
- * or 'executeQuery' is used.
- * 2. Primary picked when 'exec', 'executeUpdate', 'executeStatement', 'insert', 'delete', 'update', 'createSavepoint',
- * 'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or
- * 'prepare' is called.
- * 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards.
- * 4. One replica connection is randomly picked ONCE during a request.
- *
- * ATTENTION: You can write to the replica with this connection if you execute a write query without
- * opening up a transaction. For example:
- *
- * $conn = DriverManager::getConnection(...);
- * $conn->executeQuery("DELETE FROM table");
- *
- * Be aware that Connection#executeQuery is a method specifically for READ
- * operations only.
- *
- * Use Connection#executeStatement for any SQL statement that changes/updates
- * state in the database (UPDATE, INSERT, DELETE or DDL statements).
- *
- * This connection is limited to replica operations using the
- * Connection#executeQuery operation only, because it wouldn't be compatible
- * with the ORM or SchemaManager code otherwise. Both use all the other
- * operations in a context where writes could happen to a replica, which makes
- * this restricted approach necessary.
- *
- * You can manually connect to the primary at any time by calling:
- *
- * $conn->ensureConnectedToPrimary();
- *
- * Instantiation through the DriverManager looks like:
- *
- * @psalm-import-type Params from \Doctrine\DBAL\DriverManager
- * @example
- *
- * $conn = DriverManager::getConnection(array(
- * 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection',
- * 'driver' => 'pdo_mysql',
- * 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
- * 'replica' => array(
- * array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''),
- * array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''),
- * )
- * ));
- *
- * You can also pass 'driverOptions' and any other documented option to each of this drivers
- * to pass additional information.
- */
- class PrimaryReadReplicaConnection extends Connection
- {
- /**
- * Primary and Replica connection (one of the randomly picked replicas).
- *
- * @var DriverConnection[]|null[]
- */
- protected $connections = ['primary' => null, 'replica' => null];
- /**
- * You can keep the replica connection and then switch back to it
- * during the request if you know what you are doing.
- *
- * @var bool
- */
- protected $keepReplica = false;
- /**
- * Creates Primary Replica Connection.
- *
- * @internal The connection can be only instantiated by the driver manager.
- *
- * @param array<string,mixed> $params
- *
- * @throws InvalidArgumentException
- *
- * @phpstan-param array<string,mixed> $params
- * @psalm-param Params $params
- */
- public function __construct(
- array $params,
- Driver $driver,
- ?Configuration $config = null,
- ?EventManager $eventManager = null
- ) {
- if (! isset($params['replica'], $params['primary'])) {
- throw new InvalidArgumentException('primary or replica configuration missing');
- }
- if (count($params['replica']) === 0) {
- throw new InvalidArgumentException('You have to configure at least one replica.');
- }
- if (isset($params['driver'])) {
- $params['primary']['driver'] = $params['driver'];
- foreach ($params['replica'] as $replicaKey => $replica) {
- $params['replica'][$replicaKey]['driver'] = $params['driver'];
- }
- }
- $this->keepReplica = (bool) ($params['keepReplica'] ?? false);
- parent::__construct($params, $driver, $config, $eventManager);
- }
- /**
- * Checks if the connection is currently towards the primary or not.
- */
- public function isConnectedToPrimary(): bool
- {
- return $this->_conn !== null && $this->_conn === $this->connections['primary'];
- }
- /**
- * @param string|null $connectionName
- *
- * @return bool
- */
- public function connect($connectionName = null)
- {
- if ($connectionName !== null) {
- throw new InvalidArgumentException(
- 'Passing a connection name as first argument is not supported anymore.'
- . ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.'
- );
- }
- return $this->performConnect();
- }
- protected function performConnect(?string $connectionName = null): bool
- {
- $requestedConnectionChange = ($connectionName !== null);
- $connectionName = $connectionName ?: 'replica';
- if ($connectionName !== 'replica' && $connectionName !== 'primary') {
- throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.');
- }
- // If we have a connection open, and this is not an explicit connection
- // change request, then abort right here, because we are already done.
- // This prevents writes to the replica in case of "keepReplica" option enabled.
- if ($this->_conn !== null && ! $requestedConnectionChange) {
- return false;
- }
- $forcePrimaryAsReplica = false;
- if ($this->getTransactionNestingLevel() > 0) {
- $connectionName = 'primary';
- $forcePrimaryAsReplica = true;
- }
- if (isset($this->connections[$connectionName])) {
- $this->_conn = $this->connections[$connectionName];
- if ($forcePrimaryAsReplica && ! $this->keepReplica) {
- $this->connections['replica'] = $this->_conn;
- }
- return false;
- }
- if ($connectionName === 'primary') {
- $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName);
- // Set replica connection to primary to avoid invalid reads
- if (! $this->keepReplica) {
- $this->connections['replica'] = $this->connections['primary'];
- }
- } else {
- $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName);
- }
- if ($this->_eventManager->hasListeners(Events::postConnect)) {
- $eventArgs = new ConnectionEventArgs($this);
- $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
- }
- return true;
- }
- /**
- * Connects to the primary node of the database cluster.
- *
- * All following statements after this will be executed against the primary node.
- */
- public function ensureConnectedToPrimary(): bool
- {
- return $this->performConnect('primary');
- }
- /**
- * Connects to a replica node of the database cluster.
- *
- * All following statements after this will be executed against the replica node,
- * unless the keepReplica option is set to false and a primary connection
- * was already opened.
- */
- public function ensureConnectedToReplica(): bool
- {
- return $this->performConnect('replica');
- }
- /**
- * Connects to a specific connection.
- *
- * @param string $connectionName
- *
- * @return DriverConnection
- */
- protected function connectTo($connectionName)
- {
- $params = $this->getParams();
- $driverOptions = $params['driverOptions'] ?? [];
- $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params);
- $user = $connectionParams['user'] ?? null;
- $password = $connectionParams['password'] ?? null;
- return $this->_driver->connect($connectionParams, $user, $password, $driverOptions);
- }
- /**
- * @param string $connectionName
- * @param mixed[] $params
- *
- * @return mixed
- */
- protected function chooseConnectionConfiguration($connectionName, $params)
- {
- if ($connectionName === 'primary') {
- return $params['primary'];
- }
- $config = $params['replica'][array_rand($params['replica'])];
- if (! isset($config['charset']) && isset($params['primary']['charset'])) {
- $config['charset'] = $params['primary']['charset'];
- }
- return $config;
- }
- /**
- * {@inheritDoc}
- *
- * @deprecated Use {@link executeStatement()} instead.
- */
- public function executeUpdate($sql, array $params = [], array $types = [])
- {
- $this->ensureConnectedToPrimary();
- return parent::executeUpdate($sql, $params, $types);
- }
- /**
- * {@inheritDoc}
- */
- public function executeStatement($sql, array $params = [], array $types = [])
- {
- $this->ensureConnectedToPrimary();
- return parent::executeStatement($sql, $params, $types);
- }
- /**
- * {@inheritDoc}
- */
- public function beginTransaction()
- {
- $this->ensureConnectedToPrimary();
- return parent::beginTransaction();
- }
- /**
- * {@inheritDoc}
- */
- public function commit()
- {
- $this->ensureConnectedToPrimary();
- return parent::commit();
- }
- /**
- * {@inheritDoc}
- */
- public function rollBack()
- {
- $this->ensureConnectedToPrimary();
- return parent::rollBack();
- }
- /**
- * {@inheritDoc}
- */
- public function delete($table, array $criteria, array $types = [])
- {
- $this->ensureConnectedToPrimary();
- return parent::delete($table, $criteria, $types);
- }
- /**
- * {@inheritDoc}
- */
- public function close()
- {
- unset($this->connections['primary'], $this->connections['replica']);
- parent::close();
- $this->_conn = null;
- $this->connections = ['primary' => null, 'replica' => null];
- }
- /**
- * {@inheritDoc}
- */
- public function update($table, array $data, array $criteria, array $types = [])
- {
- $this->ensureConnectedToPrimary();
- return parent::update($table, $data, $criteria, $types);
- }
- /**
- * {@inheritDoc}
- */
- public function insert($table, array $data, array $types = [])
- {
- $this->ensureConnectedToPrimary();
- return parent::insert($table, $data, $types);
- }
- /**
- * {@inheritDoc}
- */
- public function exec($statement)
- {
- $this->ensureConnectedToPrimary();
- return parent::exec($statement);
- }
- /**
- * {@inheritDoc}
- */
- public function createSavepoint($savepoint)
- {
- $this->ensureConnectedToPrimary();
- parent::createSavepoint($savepoint);
- }
- /**
- * {@inheritDoc}
- */
- public function releaseSavepoint($savepoint)
- {
- $this->ensureConnectedToPrimary();
- parent::releaseSavepoint($savepoint);
- }
- /**
- * {@inheritDoc}
- */
- public function rollbackSavepoint($savepoint)
- {
- $this->ensureConnectedToPrimary();
- parent::rollbackSavepoint($savepoint);
- }
- /**
- * {@inheritDoc}
- */
- public function query()
- {
- $this->ensureConnectedToPrimary();
- assert($this->_conn instanceof DriverConnection);
- $args = func_get_args();
- $logger = $this->getConfiguration()->getSQLLogger();
- if ($logger) {
- $logger->startQuery($args[0]);
- }
- $statement = $this->_conn->query(...$args);
- $statement->setFetchMode($this->defaultFetchMode);
- if ($logger) {
- $logger->stopQuery();
- }
- return $statement;
- }
- /**
- * {@inheritDoc}
- */
- public function prepare($statement)
- {
- $this->ensureConnectedToPrimary();
- return parent::prepare($statement);
- }
- }
|