AmpBody.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Internal;
  11. use Amp\ByteStream\InputStream;
  12. use Amp\ByteStream\ResourceInputStream;
  13. use Amp\Http\Client\RequestBody;
  14. use Amp\Promise;
  15. use Amp\Success;
  16. use Symfony\Component\HttpClient\Exception\TransportException;
  17. /**
  18. * @author Nicolas Grekas <p@tchwork.com>
  19. *
  20. * @internal
  21. */
  22. class AmpBody implements RequestBody, InputStream
  23. {
  24. private $body;
  25. private $onProgress;
  26. private $offset = 0;
  27. private $length = -1;
  28. private $uploaded;
  29. public function __construct($body, &$info, \Closure $onProgress)
  30. {
  31. $this->body = $body;
  32. $this->info = &$info;
  33. $this->onProgress = $onProgress;
  34. if (\is_resource($body)) {
  35. $this->offset = ftell($body);
  36. $this->length = fstat($body)['size'];
  37. $this->body = new ResourceInputStream($body);
  38. } elseif (\is_string($body)) {
  39. $this->length = \strlen($body);
  40. }
  41. }
  42. public function createBodyStream(): InputStream
  43. {
  44. if (null !== $this->uploaded) {
  45. $this->uploaded = null;
  46. if (\is_string($this->body)) {
  47. $this->offset = 0;
  48. } elseif ($this->body instanceof ResourceInputStream) {
  49. fseek($this->body->getResource(), $this->offset);
  50. }
  51. }
  52. return $this;
  53. }
  54. public function getHeaders(): Promise
  55. {
  56. return new Success([]);
  57. }
  58. public function getBodyLength(): Promise
  59. {
  60. return new Success($this->length - $this->offset);
  61. }
  62. public function read(): Promise
  63. {
  64. $this->info['size_upload'] += $this->uploaded;
  65. $this->uploaded = 0;
  66. ($this->onProgress)();
  67. $chunk = $this->doRead();
  68. $chunk->onResolve(function ($e, $data) {
  69. if (null !== $data) {
  70. $this->uploaded = \strlen($data);
  71. } else {
  72. $this->info['upload_content_length'] = $this->info['size_upload'];
  73. }
  74. });
  75. return $chunk;
  76. }
  77. public static function rewind(RequestBody $body): RequestBody
  78. {
  79. if (!$body instanceof self) {
  80. return $body;
  81. }
  82. $body->uploaded = null;
  83. if ($body->body instanceof ResourceInputStream) {
  84. fseek($body->body->getResource(), $body->offset);
  85. return new $body($body->body, $body->info, $body->onProgress);
  86. }
  87. if (\is_string($body->body)) {
  88. $body->offset = 0;
  89. }
  90. return $body;
  91. }
  92. private function doRead(): Promise
  93. {
  94. if ($this->body instanceof ResourceInputStream) {
  95. return $this->body->read();
  96. }
  97. if (null === $this->offset || !$this->length) {
  98. return new Success();
  99. }
  100. if (\is_string($this->body)) {
  101. $this->offset = null;
  102. return new Success($this->body);
  103. }
  104. if ('' === $data = ($this->body)(16372)) {
  105. $this->offset = null;
  106. return new Success();
  107. }
  108. if (!\is_string($data)) {
  109. throw new TransportException(sprintf('Return value of the "body" option callback must be string, "%s" returned.', get_debug_type($data)));
  110. }
  111. return new Success($data);
  112. }
  113. }