TransportResponseTrait.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Response;
  11. use Symfony\Component\HttpClient\Chunk\DataChunk;
  12. use Symfony\Component\HttpClient\Chunk\ErrorChunk;
  13. use Symfony\Component\HttpClient\Chunk\FirstChunk;
  14. use Symfony\Component\HttpClient\Chunk\LastChunk;
  15. use Symfony\Component\HttpClient\Exception\TransportException;
  16. use Symfony\Component\HttpClient\Internal\ClientState;
  17. /**
  18. * Implements common logic for transport-level response classes.
  19. *
  20. * @author Nicolas Grekas <p@tchwork.com>
  21. *
  22. * @internal
  23. */
  24. trait TransportResponseTrait
  25. {
  26. private $headers = [];
  27. private $info = [
  28. 'response_headers' => [],
  29. 'http_code' => 0,
  30. 'error' => null,
  31. 'canceled' => false,
  32. ];
  33. /** @var object|resource */
  34. private $handle;
  35. private $id;
  36. private $timeout = 0;
  37. private $inflate;
  38. private $finalInfo;
  39. private $canary;
  40. private $logger;
  41. /**
  42. * {@inheritdoc}
  43. */
  44. public function getStatusCode(): int
  45. {
  46. if ($this->initializer) {
  47. self::initialize($this);
  48. }
  49. return $this->info['http_code'];
  50. }
  51. /**
  52. * {@inheritdoc}
  53. */
  54. public function getHeaders(bool $throw = true): array
  55. {
  56. if ($this->initializer) {
  57. self::initialize($this);
  58. }
  59. if ($throw) {
  60. $this->checkStatusCode();
  61. }
  62. return $this->headers;
  63. }
  64. /**
  65. * {@inheritdoc}
  66. */
  67. public function cancel(): void
  68. {
  69. $this->info['canceled'] = true;
  70. $this->info['error'] = 'Response has been canceled.';
  71. $this->close();
  72. }
  73. /**
  74. * Closes the response and all its network handles.
  75. */
  76. protected function close(): void
  77. {
  78. $this->canary->cancel();
  79. $this->inflate = null;
  80. }
  81. /**
  82. * Adds pending responses to the activity list.
  83. */
  84. abstract protected static function schedule(self $response, array &$runningResponses): void;
  85. /**
  86. * Performs all pending non-blocking operations.
  87. */
  88. abstract protected static function perform(ClientState $multi, array &$responses): void;
  89. /**
  90. * Waits for network activity.
  91. */
  92. abstract protected static function select(ClientState $multi, float $timeout): int;
  93. private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
  94. {
  95. foreach ($responseHeaders as $h) {
  96. if (11 <= \strlen($h) && '/' === $h[4] && preg_match('#^HTTP/\d+(?:\.\d+)? ([1-9]\d\d)(?: |$)#', $h, $m)) {
  97. if ($headers) {
  98. $debug .= "< \r\n";
  99. $headers = [];
  100. }
  101. $info['http_code'] = (int) $m[1];
  102. } elseif (2 === \count($m = explode(':', $h, 2))) {
  103. $headers[strtolower($m[0])][] = ltrim($m[1]);
  104. }
  105. $debug .= "< {$h}\r\n";
  106. $info['response_headers'][] = $h;
  107. }
  108. $debug .= "< \r\n";
  109. if (!$info['http_code']) {
  110. throw new TransportException(sprintf('Invalid or missing HTTP status line for "%s".', implode('', $info['url'])));
  111. }
  112. }
  113. /**
  114. * Ensures the request is always sent and that the response code was checked.
  115. */
  116. private function doDestruct()
  117. {
  118. $this->shouldBuffer = true;
  119. if ($this->initializer && null === $this->info['error']) {
  120. self::initialize($this);
  121. $this->checkStatusCode();
  122. }
  123. }
  124. /**
  125. * Implements an event loop based on a buffer activity queue.
  126. *
  127. * @internal
  128. */
  129. public static function stream(iterable $responses, float $timeout = null): \Generator
  130. {
  131. $runningResponses = [];
  132. foreach ($responses as $response) {
  133. self::schedule($response, $runningResponses);
  134. }
  135. $lastActivity = microtime(true);
  136. $elapsedTimeout = 0;
  137. while (true) {
  138. $hasActivity = false;
  139. $timeoutMax = 0;
  140. $timeoutMin = $timeout ?? \INF;
  141. /** @var ClientState $multi */
  142. foreach ($runningResponses as $i => [$multi]) {
  143. $responses = &$runningResponses[$i][1];
  144. self::perform($multi, $responses);
  145. foreach ($responses as $j => $response) {
  146. $timeoutMax = $timeout ?? max($timeoutMax, $response->timeout);
  147. $timeoutMin = min($timeoutMin, $response->timeout, 1);
  148. $chunk = false;
  149. if (isset($multi->handlesActivity[$j])) {
  150. // no-op
  151. } elseif (!isset($multi->openHandles[$j])) {
  152. unset($responses[$j]);
  153. continue;
  154. } elseif ($elapsedTimeout >= $timeoutMax) {
  155. $multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))];
  156. } else {
  157. continue;
  158. }
  159. while ($multi->handlesActivity[$j] ?? false) {
  160. $hasActivity = true;
  161. $elapsedTimeout = 0;
  162. if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
  163. if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
  164. $multi->handlesActivity[$j] = [null, new TransportException(sprintf('Error while processing content unencoding for "%s".', $response->getInfo('url')))];
  165. continue;
  166. }
  167. if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) {
  168. $multi->handlesActivity[$j] = [null, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($chunk)))];
  169. continue;
  170. }
  171. $chunkLen = \strlen($chunk);
  172. $chunk = new DataChunk($response->offset, $chunk);
  173. $response->offset += $chunkLen;
  174. } elseif (null === $chunk) {
  175. $e = $multi->handlesActivity[$j][0];
  176. unset($responses[$j], $multi->handlesActivity[$j]);
  177. $response->close();
  178. if (null !== $e) {
  179. $response->info['error'] = $e->getMessage();
  180. if ($e instanceof \Error) {
  181. throw $e;
  182. }
  183. $chunk = new ErrorChunk($response->offset, $e);
  184. } else {
  185. if (0 === $response->offset && null === $response->content) {
  186. $response->content = fopen('php://memory', 'w+');
  187. }
  188. $chunk = new LastChunk($response->offset);
  189. }
  190. } elseif ($chunk instanceof ErrorChunk) {
  191. unset($responses[$j]);
  192. $elapsedTimeout = $timeoutMax;
  193. } elseif ($chunk instanceof FirstChunk) {
  194. if ($response->logger) {
  195. $info = $response->getInfo();
  196. $response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url']));
  197. }
  198. $response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(\ZLIB_ENCODING_GZIP) : null;
  199. if ($response->shouldBuffer instanceof \Closure) {
  200. try {
  201. $response->shouldBuffer = ($response->shouldBuffer)($response->headers);
  202. if (null !== $response->info['error']) {
  203. throw new TransportException($response->info['error']);
  204. }
  205. } catch (\Throwable $e) {
  206. $response->close();
  207. $multi->handlesActivity[$j] = [null, $e];
  208. }
  209. }
  210. if (true === $response->shouldBuffer) {
  211. $response->content = fopen('php://temp', 'w+');
  212. } elseif (\is_resource($response->shouldBuffer)) {
  213. $response->content = $response->shouldBuffer;
  214. }
  215. $response->shouldBuffer = null;
  216. yield $response => $chunk;
  217. if ($response->initializer && null === $response->info['error']) {
  218. // Ensure the HTTP status code is always checked
  219. $response->getHeaders(true);
  220. }
  221. continue;
  222. }
  223. yield $response => $chunk;
  224. }
  225. unset($multi->handlesActivity[$j]);
  226. if ($chunk instanceof ErrorChunk && !$chunk->didThrow()) {
  227. // Ensure transport exceptions are always thrown
  228. $chunk->getContent();
  229. }
  230. }
  231. if (!$responses) {
  232. unset($runningResponses[$i]);
  233. }
  234. // Prevent memory leaks
  235. $multi->handlesActivity = $multi->handlesActivity ?: [];
  236. $multi->openHandles = $multi->openHandles ?: [];
  237. }
  238. if (!$runningResponses) {
  239. break;
  240. }
  241. if ($hasActivity) {
  242. $lastActivity = microtime(true);
  243. continue;
  244. }
  245. if (-1 === self::select($multi, min($timeoutMin, $timeoutMax - $elapsedTimeout))) {
  246. usleep(min(500, 1E6 * $timeoutMin));
  247. }
  248. $elapsedTimeout = microtime(true) - $lastActivity;
  249. }
  250. }
  251. }