Last active
December 26, 2023 17:38
-
-
Save arabcoders/98991f3d34d3102c1ea2567cf86b2481 to your computer and use it in GitHub Desktop.
Wrapper around curl to accelerate download speed using range requests for downloading.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| use Closure; | |
| use CurlHandle; | |
| use InvalidArgumentException; | |
| use Random\RandomException; | |
| use RuntimeException; | |
| final class CurlRanger | |
| { | |
| /** | |
| * @var array<int, int> the curl options that are not overridable by user. | |
| */ | |
| private const CURL_BLACKLIST = [ | |
| CURLOPT_RETURNTRANSFER, | |
| CURLOPT_PRIVATE, | |
| CURLOPT_RANGE, | |
| CURLOPT_WRITEFUNCTION, | |
| CURLOPT_HEADERFUNCTION, | |
| CURLOPT_NOBODY, | |
| CURLOPT_HTTPHEADER, | |
| ]; | |
| /** | |
| * @var array<int, resource> the chunk file streams. | |
| */ | |
| private array $streams = []; | |
| /** | |
| * @var array<int, array<string, array<int, string>>> the headers. | |
| */ | |
| private array $headers = []; | |
| /** | |
| * @var array<int, array{0: int, 1: int}> chunk download progress (total, downloaded). | |
| */ | |
| private array $progress = []; | |
| /** | |
| * @var array<int, array{start: int, end: int}> the ranges. | |
| */ | |
| private array $ranges = []; | |
| public function __construct(private Closure|null $logger = null) | |
| { | |
| if (!extension_loaded('curl')) { | |
| throw new RuntimeException('This class requires the curl extension.'); | |
| } | |
| } | |
| /** | |
| * Make a new instance. | |
| * | |
| * @param string $url The URL to download. | |
| * @param resource $stream The stream to write the data to. | |
| * @param int $maxConnections The maximum number of connections to use. | |
| * @param array $options (Optional) options. | |
| * | |
| * @return resource the stream. | |
| */ | |
| public static function make( | |
| string $url, | |
| $stream, | |
| int $maxConnections = 4, | |
| array $options = [] | |
| ) { | |
| return (new self())->start($url, $stream, $maxConnections, $options); | |
| } | |
| /** | |
| * Start the download process. | |
| * | |
| * @param string $url The URL to download. | |
| * @param resource $stream The stream to write the data to. | |
| * @param int $maxConnections The maximum number of connections to use. | |
| * @param array $options (Optional) options. | |
| * | |
| * @return resource the stream. | |
| */ | |
| public function start( | |
| string $url, | |
| $stream, | |
| int $maxConnections = 4, | |
| array $options = [] | |
| ) { | |
| if (!is_resource($stream)) { | |
| throw new InvalidArgumentException('Stream must be a fopen resource.'); | |
| } | |
| $options[CurlHandle::class] = $options[CurlHandle::class] ?? []; | |
| if (!is_array($options[CurlHandle::class])) { | |
| throw new InvalidArgumentException(CurlHandle::class . ' option must be an array.'); | |
| } | |
| $maxConnections = max($maxConnections, 1); | |
| $hasHeaders = isset($options['responseHeaders']) && count($options['responseHeaders']) > 1; | |
| $headers = self::normalizeHeaders($hasHeaders ? $options['responseHeaders'] : self::getHeaders($url, $options)); | |
| if (!isset($headers['accept-ranges'])) { | |
| throw new RuntimeException('Server does not support range requests.'); | |
| } | |
| $contentLength = (int)$headers['content-length'][0]; | |
| $rangeSize = (int)ceil($contentLength / $maxConnections); | |
| $this->ranges = array_map( | |
| fn($i) => ['start' => $i * $rangeSize, 'end' => ($i + 1) * $rangeSize - 1], | |
| range(0, $maxConnections - 1) | |
| ); | |
| $this->ranges[$maxConnections - 1]['end'] = $contentLength - 1; | |
| $this->download($this->makeRequests($url, $this->ranges, $options)); | |
| // -- only run parts sanity check if we have more than one connection. | |
| if ($maxConnections > 1) { | |
| $this->sanityCheck(); | |
| } | |
| $stream = $this->combineParts( | |
| stream: $stream, | |
| streams: $this->streams, | |
| readSize: $options['readSize'] ?? (1024 * 1024) | |
| ); | |
| // -- do final check of size compared to content-length header. | |
| $finalSize = fstat($stream); | |
| $finalSize = false === $finalSize ? null : (int)$finalSize['size']; | |
| if (null !== $finalSize && $finalSize !== $contentLength) { | |
| throw new RuntimeException( | |
| sprintf( | |
| 'Downloaded data size (%s) does not match content-length header (%s).', | |
| $finalSize, | |
| $contentLength | |
| ) | |
| ); | |
| } | |
| return $stream; | |
| } | |
| public static function getHeaders(string $url, array $options = []): array | |
| { | |
| $headers = []; | |
| $ch = curl_init($url); | |
| $opts = [ | |
| CURLOPT_RETURNTRANSFER => true, | |
| CURLOPT_HEADER => true, | |
| CURLOPT_NOBODY => true, | |
| CURLOPT_FOLLOWLOCATION => true, | |
| CURLOPT_MAXREDIRS => 10, | |
| CURLOPT_TIMEOUT => 60, | |
| CURLOPT_CONNECTTIMEOUT => 60, | |
| CURLOPT_HEADERFUNCTION => function ($curl, $header) use (&$headers) { | |
| $len = strlen($header); | |
| $header = explode(':', $header, 2); | |
| // ignore invalid headers. | |
| if (count($header) < 2) { | |
| return $len; | |
| } | |
| $key = strtolower(trim($header[0])); | |
| $val = trim($header[1]); | |
| $headers[$key][] = $val; | |
| return $len; | |
| }, | |
| ]; | |
| if (null !== ($options['headers'] ?? null)) { | |
| $opts[CURLOPT_HTTPHEADER] = array_map( | |
| fn($k, $v) => "{$k}: {$v}", | |
| array_keys($options['headers']), | |
| array_values($options['headers']) | |
| ); | |
| } | |
| if (count($options[CurlHandle::class] ?? []) > 0) { | |
| foreach ($options[CurlHandle::class] as $key => $value) { | |
| if (true === in_array($key, self::CURL_BLACKLIST, true)) { | |
| continue; | |
| } | |
| $opts[$key] = $value; | |
| } | |
| } | |
| curl_setopt_array($ch, $opts); | |
| $status = curl_exec($ch); | |
| if (false === $status) { | |
| throw new RuntimeException(curl_error($ch), curl_errno($ch)); | |
| } | |
| if (200 !== curl_getinfo($ch, CURLINFO_HTTP_CODE)) { | |
| throw new RuntimeException($status . ' - Unexpected HTTP code: ' . curl_getinfo($ch, CURLINFO_HTTP_CODE)); | |
| } | |
| curl_close($ch); | |
| return self::normalizeHeaders($headers); | |
| } | |
| public function __destruct() | |
| { | |
| foreach ($this->streams as $stream) { | |
| $meta = stream_get_meta_data($stream); | |
| $file = $meta['uri'] ?? null; | |
| if (null !== $file && file_exists($file)) { | |
| unlink($file); | |
| } | |
| if (is_resource($stream)) { | |
| fclose($stream); | |
| } | |
| } | |
| } | |
| /** | |
| * Make the curl handles. | |
| * | |
| * @param string $url The URL to download. | |
| * @param array<int, array{ start: int, end: int }> $ranges The ranges to download. | |
| * @param array $options (Optional) options. | |
| * | |
| * @return array<int, CurlHandle|resource> The curl handles. | |
| */ | |
| private function makeRequests(string $url, array $ranges, array $options = []): array | |
| { | |
| $handles = []; | |
| // -- prefix download parts with random string to avoid collisions. | |
| try { | |
| $random = bin2hex(random_bytes(4)); | |
| } catch (RandomException) { | |
| $random = substr(md5((string)rand(1000000, 9999999)), 0, 8); | |
| } | |
| foreach ($ranges as $i => $chunk) { | |
| $ch = curl_init($url); | |
| $opts = [ | |
| CURLOPT_RETURNTRANSFER => true, | |
| CURLOPT_HEADER => false, | |
| CURLOPT_PRIVATE => $i, | |
| CURLOPT_FOLLOWLOCATION => true, | |
| CURLOPT_MAXREDIRS => 10, | |
| CURLOPT_CONNECTTIMEOUT => 120, | |
| CURLOPT_RANGE => "{$chunk['start']}-{$chunk['end']}", | |
| CURLOPT_WRITEFUNCTION => function ($curl, $data) use ($i, $random) { | |
| if (false === array_key_exists($i, $this->streams)) { | |
| $tmpName = sys_get_temp_dir() . DIRECTORY_SEPARATOR . "cr-{$random}.part{$i}"; | |
| $this->streams[$i] = fopen($tmpName, 'wb+'); | |
| } | |
| fwrite($this->streams[$i], $data); | |
| return strlen($data); | |
| }, | |
| CURLOPT_HEADERFUNCTION => function ($curl, $header) use ($i) { | |
| $len = strlen($header); | |
| $header = explode(':', $header, 2); | |
| if (count($header) < 2) { | |
| return $len; | |
| } | |
| $key = strtolower(trim($header[0])); | |
| $val = trim($header[1]); | |
| $this->headers[$i][$key][] = $val; | |
| if (null !== $this->logger) { | |
| ($this->logger)(sprintf('Part %s [%s: %s]', $i + 1, $key, $val), LOG_DEBUG); | |
| } | |
| return $len; | |
| }, | |
| ]; | |
| // -- add curl options from user input. | |
| if (count($options[CurlHandle::class]) > 0) { | |
| foreach ($options[CurlHandle::class] as $key => $value) { | |
| if (true === in_array($key, self::CURL_BLACKLIST, true)) { | |
| continue; | |
| } | |
| $opts[$key] = $value; | |
| } | |
| } | |
| if (null !== ($options[CurlHandle::class][CURLOPT_PROGRESSFUNCTION] ?? null)) { | |
| $opts[CURLOPT_NOPROGRESS] = false; | |
| $opts[CURLOPT_PROGRESSFUNCTION] = function ($curl, $total, $downloaded) use ($i, $options) { | |
| $this->progress[$i] = [$total, $downloaded]; | |
| $this->onProgress($options[CurlHandle::class][CURLOPT_PROGRESSFUNCTION]); | |
| }; | |
| } | |
| if (null !== ($options['headers'] ?? null)) { | |
| $opts[CURLOPT_HTTPHEADER] = array_map( | |
| fn($k, $v) => "{$k}: {$v}", | |
| array_keys($options['headers']), | |
| array_values($options['headers']) | |
| ); | |
| } | |
| curl_setopt_array($ch, $opts); | |
| $handles[$i] = $ch; | |
| } | |
| return $handles; | |
| } | |
| /** | |
| * Download the file. | |
| * @param array<int,CurlHandle|resource> $handles | |
| */ | |
| private function download(array $handles): void | |
| { | |
| $mh = curl_multi_init(); | |
| foreach ($handles as $ch) { | |
| $status = curl_multi_add_handle($mh, $ch); | |
| if (0 !== $status) { | |
| throw new RuntimeException(curl_multi_strerror($status)); | |
| } | |
| } | |
| do { | |
| curl_multi_exec($mh, $active); | |
| // wait for data to arrive no sense in wasting cpu cycles. | |
| curl_multi_select($mh); | |
| } while ($active > 0); | |
| foreach ($handles as $i => $ch) { | |
| $status = curl_multi_remove_handle($mh, $ch); | |
| if (0 !== $status) { | |
| throw new RuntimeException( | |
| sprintf('Unable to remove handle %s - %s.', $i, curl_multi_strerror($status)) | |
| ); | |
| } | |
| } | |
| curl_multi_close($mh); | |
| } | |
| /** | |
| * Combine the parts. | |
| * | |
| * @param resource $stream The stream to write the data to. | |
| * @param array<int, resource> $streams The streams to combine. | |
| * @param int $readSize | |
| * @return resource Return the final combined stream. | |
| */ | |
| private function combineParts($stream, array $streams, int $readSize = 1024) | |
| { | |
| // -- sort streams to write them in order, sometimes they are not in order. | |
| ksort($streams); | |
| $count = count($streams) > 1; | |
| foreach ($streams as $i => $part) { | |
| if ($count && null !== $this->logger) { | |
| ($this->logger)(sprintf('Merging part %s...', $i + 1), LOG_DEBUG); | |
| } | |
| if (true === (bool)(stream_get_meta_data($part)['seekable'] ?? false)) { | |
| rewind($part); | |
| } | |
| while (!feof($part)) { | |
| fwrite($stream, fread($part, $readSize)); | |
| } | |
| } | |
| if ($count && null !== $this->logger) { | |
| ($this->logger)('Merging complete!', PHP_EOL); | |
| } | |
| return $stream; | |
| } | |
| /** | |
| * Call the progress callback. | |
| * @param callable(int, int):void $callback The callback. | |
| */ | |
| private function onProgress(callable $callback): void | |
| { | |
| $total = $downloaded = 0; | |
| foreach ($this->progress as $progress) { | |
| $total += $progress[0]; | |
| $downloaded += $progress[1]; | |
| } | |
| $callback($total, $downloaded); | |
| } | |
| /** | |
| * Normalize headers. | |
| * | |
| * @param array<string,array> $headers The headers to normalize. | |
| * | |
| * @return array The normalized headers. | |
| */ | |
| private static function normalizeHeaders(array $headers): array | |
| { | |
| $normalized = []; | |
| foreach ($headers as $key => $value) { | |
| $normalized[strtolower(trim($key))] = $value; | |
| } | |
| return $normalized; | |
| } | |
| private function sanityCheck(): void | |
| { | |
| // check each part has a content-range header and the content-length of the header matches the downloaded data size. | |
| foreach ($this->headers as $i => $header) { | |
| if (!isset($header['content-range'])) { | |
| throw new RuntimeException( | |
| sprintf('Part %s does not have a content-range header.', $i + 1) | |
| ); | |
| } | |
| if (!isset($header['content-length'])) { | |
| throw new RuntimeException( | |
| sprintf('Part %s does not have a content-length header.', $i + 1) | |
| ); | |
| } | |
| $contentRange = array_pop($header['content-range']); | |
| $rx = '/(?<start>\d+)-(?<end>\d+)\/(?<total>\d+)/'; | |
| if (!preg_match($rx, $contentRange, $matches)) { | |
| throw new RuntimeException( | |
| sprintf( | |
| 'Part %s content-range [%s] header is invalid.', | |
| $i + 1, | |
| $contentRange, | |
| ) | |
| ); | |
| } | |
| $start = (int)$matches['start']; | |
| $end = (int)$matches['end']; | |
| if ($start !== $this->ranges[$i]['start'] || $end !== $this->ranges[$i]['end']) { | |
| throw new RuntimeException( | |
| sprintf( | |
| 'Part %s content-range header (start: %s, end: %s) does not match the sent range request (start: %s, end: %s).', | |
| $i + 1, | |
| $start, | |
| $end, | |
| $this->ranges[$i]['start'], | |
| $this->ranges[$i]['end'] | |
| ) | |
| ); | |
| } | |
| $contentLength = (int)array_pop($header['content-length']); | |
| $streamSize = fstat($this->streams[$i]); | |
| $streamSize = false === $streamSize ? null : (int)$streamSize['size']; | |
| if (null !== $streamSize && $streamSize !== $contentLength) { | |
| throw new RuntimeException( | |
| sprintf( | |
| 'Part %s content-length header (%s) does not match the downloaded data size (%s).', | |
| $i + 1, | |
| $contentLength, | |
| $streamSize, | |
| ) | |
| ); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment