executeQuery("DELETE FROM table"); * * Be aware that Connection#executeQuery is a method specifically for READ * operations only. * * Use Connection#executeStatement for any SQL statement that changes/updates * state in the database (UPDATE, INSERT, DELETE or DDL statements). * * This connection is limited to replica operations using the * Connection#executeQuery operation only, because it wouldn't be compatible * with the ORM or SchemaManager code otherwise. Both use all the other * operations in a context where writes could happen to a replica, which makes * this restricted approach necessary. * * You can manually connect to the primary at any time by calling: * * $conn->ensureConnectedToPrimary(); * * Instantiation through the DriverManager looks like: * * @psalm-import-type Params from \Doctrine\DBAL\DriverManager * @example * * $conn = DriverManager::getConnection(array( * 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection', * 'driver' => 'pdo_mysql', * 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), * 'replica' => array( * array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''), * array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''), * ) * )); * * You can also pass 'driverOptions' and any other documented option to each of this drivers * to pass additional information. */ class PrimaryReadReplicaConnection extends Connection { /** * Primary and Replica connection (one of the randomly picked replicas). * * @var DriverConnection[]|null[] */ protected $connections = ['primary' => null, 'replica' => null]; /** * You can keep the replica connection and then switch back to it * during the request if you know what you are doing. * * @var bool */ protected $keepReplica = false; /** * Creates Primary Replica Connection. * * @internal The connection can be only instantiated by the driver manager. * * @param array $params * * @throws InvalidArgumentException * * @phpstan-param array $params * @psalm-param Params $params */ public function __construct( array $params, Driver $driver, ?Configuration $config = null, ?EventManager $eventManager = null ) { if (! isset($params['replica'], $params['primary'])) { throw new InvalidArgumentException('primary or replica configuration missing'); } if (count($params['replica']) === 0) { throw new InvalidArgumentException('You have to configure at least one replica.'); } if (isset($params['driver'])) { $params['primary']['driver'] = $params['driver']; foreach ($params['replica'] as $replicaKey => $replica) { $params['replica'][$replicaKey]['driver'] = $params['driver']; } } $this->keepReplica = (bool) ($params['keepReplica'] ?? false); parent::__construct($params, $driver, $config, $eventManager); } /** * Checks if the connection is currently towards the primary or not. */ public function isConnectedToPrimary(): bool { return $this->_conn !== null && $this->_conn === $this->connections['primary']; } /** * @param string|null $connectionName * * @return bool */ public function connect($connectionName = null) { if ($connectionName !== null) { throw new InvalidArgumentException( 'Passing a connection name as first argument is not supported anymore.' . ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.' ); } return $this->performConnect(); } protected function performConnect(?string $connectionName = null): bool { $requestedConnectionChange = ($connectionName !== null); $connectionName = $connectionName ?: 'replica'; if ($connectionName !== 'replica' && $connectionName !== 'primary') { throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.'); } // If we have a connection open, and this is not an explicit connection // change request, then abort right here, because we are already done. // This prevents writes to the replica in case of "keepReplica" option enabled. if ($this->_conn !== null && ! $requestedConnectionChange) { return false; } $forcePrimaryAsReplica = false; if ($this->getTransactionNestingLevel() > 0) { $connectionName = 'primary'; $forcePrimaryAsReplica = true; } if (isset($this->connections[$connectionName])) { $this->_conn = $this->connections[$connectionName]; if ($forcePrimaryAsReplica && ! $this->keepReplica) { $this->connections['replica'] = $this->_conn; } return false; } if ($connectionName === 'primary') { $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName); // Set replica connection to primary to avoid invalid reads if (! $this->keepReplica) { $this->connections['replica'] = $this->connections['primary']; } } else { $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName); } if ($this->_eventManager->hasListeners(Events::postConnect)) { $eventArgs = new ConnectionEventArgs($this); $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); } return true; } /** * Connects to the primary node of the database cluster. * * All following statements after this will be executed against the primary node. */ public function ensureConnectedToPrimary(): bool { return $this->performConnect('primary'); } /** * Connects to a replica node of the database cluster. * * All following statements after this will be executed against the replica node, * unless the keepReplica option is set to false and a primary connection * was already opened. */ public function ensureConnectedToReplica(): bool { return $this->performConnect('replica'); } /** * Connects to a specific connection. * * @param string $connectionName * * @return DriverConnection */ protected function connectTo($connectionName) { $params = $this->getParams(); $driverOptions = $params['driverOptions'] ?? []; $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params); $user = $connectionParams['user'] ?? null; $password = $connectionParams['password'] ?? null; return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); } /** * @param string $connectionName * @param mixed[] $params * * @return mixed */ protected function chooseConnectionConfiguration($connectionName, $params) { if ($connectionName === 'primary') { return $params['primary']; } $config = $params['replica'][array_rand($params['replica'])]; if (! isset($config['charset']) && isset($params['primary']['charset'])) { $config['charset'] = $params['primary']['charset']; } return $config; } /** * {@inheritDoc} * * @deprecated Use {@link executeStatement()} instead. */ public function executeUpdate($sql, array $params = [], array $types = []) { $this->ensureConnectedToPrimary(); return parent::executeUpdate($sql, $params, $types); } /** * {@inheritDoc} */ public function executeStatement($sql, array $params = [], array $types = []) { $this->ensureConnectedToPrimary(); return parent::executeStatement($sql, $params, $types); } /** * {@inheritDoc} */ public function beginTransaction() { $this->ensureConnectedToPrimary(); return parent::beginTransaction(); } /** * {@inheritDoc} */ public function commit() { $this->ensureConnectedToPrimary(); return parent::commit(); } /** * {@inheritDoc} */ public function rollBack() { $this->ensureConnectedToPrimary(); return parent::rollBack(); } /** * {@inheritDoc} */ public function delete($table, array $criteria, array $types = []) { $this->ensureConnectedToPrimary(); return parent::delete($table, $criteria, $types); } /** * {@inheritDoc} */ public function close() { unset($this->connections['primary'], $this->connections['replica']); parent::close(); $this->_conn = null; $this->connections = ['primary' => null, 'replica' => null]; } /** * {@inheritDoc} */ public function update($table, array $data, array $criteria, array $types = []) { $this->ensureConnectedToPrimary(); return parent::update($table, $data, $criteria, $types); } /** * {@inheritDoc} */ public function insert($table, array $data, array $types = []) { $this->ensureConnectedToPrimary(); return parent::insert($table, $data, $types); } /** * {@inheritDoc} */ public function exec($statement) { $this->ensureConnectedToPrimary(); return parent::exec($statement); } /** * {@inheritDoc} */ public function createSavepoint($savepoint) { $this->ensureConnectedToPrimary(); parent::createSavepoint($savepoint); } /** * {@inheritDoc} */ public function releaseSavepoint($savepoint) { $this->ensureConnectedToPrimary(); parent::releaseSavepoint($savepoint); } /** * {@inheritDoc} */ public function rollbackSavepoint($savepoint) { $this->ensureConnectedToPrimary(); parent::rollbackSavepoint($savepoint); } /** * {@inheritDoc} */ public function query() { $this->ensureConnectedToPrimary(); assert($this->_conn instanceof DriverConnection); $args = func_get_args(); $logger = $this->getConfiguration()->getSQLLogger(); if ($logger) { $logger->startQuery($args[0]); } $statement = $this->_conn->query(...$args); $statement->setFetchMode($this->defaultFetchMode); if ($logger) { $logger->stopQuery(); } return $statement; } /** * {@inheritDoc} */ public function prepare($statement) { $this->ensureConnectedToPrimary(); return parent::prepare($statement); } }