Skip to content

Instantly share code, notes, and snippets.

@arabcoders
Last active December 26, 2023 17:38
Show Gist options
  • Select an option

  • Save arabcoders/98991f3d34d3102c1ea2567cf86b2481 to your computer and use it in GitHub Desktop.

Select an option

Save arabcoders/98991f3d34d3102c1ea2567cf86b2481 to your computer and use it in GitHub Desktop.
Wrapper around curl to accelerate download speed using range requests for downloading.
<?php
use Closure;
use CurlHandle;
use InvalidArgumentException;
use Random\RandomException;
use RuntimeException;
final class CurlRanger
{
/**
* @var array<int, int> the curl options that are not overridable by user.
*/
private const CURL_BLACKLIST = [
CURLOPT_RETURNTRANSFER,
CURLOPT_PRIVATE,
CURLOPT_RANGE,
CURLOPT_WRITEFUNCTION,
CURLOPT_HEADERFUNCTION,
CURLOPT_NOBODY,
CURLOPT_HTTPHEADER,
];
/**
* @var array<int, resource> the chunk file streams.
*/
private array $streams = [];
/**
* @var array<int, array<string, array<int, string>>> the headers.
*/
private array $headers = [];
/**
* @var array<int, array{0: int, 1: int}> chunk download progress (total, downloaded).
*/
private array $progress = [];
/**
* @var array<int, array{start: int, end: int}> the ranges.
*/
private array $ranges = [];
public function __construct(private Closure|null $logger = null)
{
if (!extension_loaded('curl')) {
throw new RuntimeException('This class requires the curl extension.');
}
}
/**
* Make a new instance.
*
* @param string $url The URL to download.
* @param resource $stream The stream to write the data to.
* @param int $maxConnections The maximum number of connections to use.
* @param array $options (Optional) options.
*
* @return resource the stream.
*/
public static function make(
string $url,
$stream,
int $maxConnections = 4,
array $options = []
) {
return (new self())->start($url, $stream, $maxConnections, $options);
}
/**
* Start the download process.
*
* @param string $url The URL to download.
* @param resource $stream The stream to write the data to.
* @param int $maxConnections The maximum number of connections to use.
* @param array $options (Optional) options.
*
* @return resource the stream.
*/
public function start(
string $url,
$stream,
int $maxConnections = 4,
array $options = []
) {
if (!is_resource($stream)) {
throw new InvalidArgumentException('Stream must be a fopen resource.');
}
$options[CurlHandle::class] = $options[CurlHandle::class] ?? [];
if (!is_array($options[CurlHandle::class])) {
throw new InvalidArgumentException(CurlHandle::class . ' option must be an array.');
}
$maxConnections = max($maxConnections, 1);
$hasHeaders = isset($options['responseHeaders']) && count($options['responseHeaders']) > 1;
$headers = self::normalizeHeaders($hasHeaders ? $options['responseHeaders'] : self::getHeaders($url, $options));
if (!isset($headers['accept-ranges'])) {
throw new RuntimeException('Server does not support range requests.');
}
$contentLength = (int)$headers['content-length'][0];
$rangeSize = (int)ceil($contentLength / $maxConnections);
$this->ranges = array_map(
fn($i) => ['start' => $i * $rangeSize, 'end' => ($i + 1) * $rangeSize - 1],
range(0, $maxConnections - 1)
);
$this->ranges[$maxConnections - 1]['end'] = $contentLength - 1;
$this->download($this->makeRequests($url, $this->ranges, $options));
// -- only run parts sanity check if we have more than one connection.
if ($maxConnections > 1) {
$this->sanityCheck();
}
$stream = $this->combineParts(
stream: $stream,
streams: $this->streams,
readSize: $options['readSize'] ?? (1024 * 1024)
);
// -- do final check of size compared to content-length header.
$finalSize = fstat($stream);
$finalSize = false === $finalSize ? null : (int)$finalSize['size'];
if (null !== $finalSize && $finalSize !== $contentLength) {
throw new RuntimeException(
sprintf(
'Downloaded data size (%s) does not match content-length header (%s).',
$finalSize,
$contentLength
)
);
}
return $stream;
}
public static function getHeaders(string $url, array $options = []): array
{
$headers = [];
$ch = curl_init($url);
$opts = [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HEADER => true,
CURLOPT_NOBODY => true,
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 60,
CURLOPT_CONNECTTIMEOUT => 60,
CURLOPT_HEADERFUNCTION => function ($curl, $header) use (&$headers) {
$len = strlen($header);
$header = explode(':', $header, 2);
// ignore invalid headers.
if (count($header) < 2) {
return $len;
}
$key = strtolower(trim($header[0]));
$val = trim($header[1]);
$headers[$key][] = $val;
return $len;
},
];
if (null !== ($options['headers'] ?? null)) {
$opts[CURLOPT_HTTPHEADER] = array_map(
fn($k, $v) => "{$k}: {$v}",
array_keys($options['headers']),
array_values($options['headers'])
);
}
if (count($options[CurlHandle::class] ?? []) > 0) {
foreach ($options[CurlHandle::class] as $key => $value) {
if (true === in_array($key, self::CURL_BLACKLIST, true)) {
continue;
}
$opts[$key] = $value;
}
}
curl_setopt_array($ch, $opts);
$status = curl_exec($ch);
if (false === $status) {
throw new RuntimeException(curl_error($ch), curl_errno($ch));
}
if (200 !== curl_getinfo($ch, CURLINFO_HTTP_CODE)) {
throw new RuntimeException($status . ' - Unexpected HTTP code: ' . curl_getinfo($ch, CURLINFO_HTTP_CODE));
}
curl_close($ch);
return self::normalizeHeaders($headers);
}
public function __destruct()
{
foreach ($this->streams as $stream) {
$meta = stream_get_meta_data($stream);
$file = $meta['uri'] ?? null;
if (null !== $file && file_exists($file)) {
unlink($file);
}
if (is_resource($stream)) {
fclose($stream);
}
}
}
/**
* Make the curl handles.
*
* @param string $url The URL to download.
* @param array<int, array{ start: int, end: int }> $ranges The ranges to download.
* @param array $options (Optional) options.
*
* @return array<int, CurlHandle|resource> The curl handles.
*/
private function makeRequests(string $url, array $ranges, array $options = []): array
{
$handles = [];
// -- prefix download parts with random string to avoid collisions.
try {
$random = bin2hex(random_bytes(4));
} catch (RandomException) {
$random = substr(md5((string)rand(1000000, 9999999)), 0, 8);
}
foreach ($ranges as $i => $chunk) {
$ch = curl_init($url);
$opts = [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HEADER => false,
CURLOPT_PRIVATE => $i,
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_MAXREDIRS => 10,
CURLOPT_CONNECTTIMEOUT => 120,
CURLOPT_RANGE => "{$chunk['start']}-{$chunk['end']}",
CURLOPT_WRITEFUNCTION => function ($curl, $data) use ($i, $random) {
if (false === array_key_exists($i, $this->streams)) {
$tmpName = sys_get_temp_dir() . DIRECTORY_SEPARATOR . "cr-{$random}.part{$i}";
$this->streams[$i] = fopen($tmpName, 'wb+');
}
fwrite($this->streams[$i], $data);
return strlen($data);
},
CURLOPT_HEADERFUNCTION => function ($curl, $header) use ($i) {
$len = strlen($header);
$header = explode(':', $header, 2);
if (count($header) < 2) {
return $len;
}
$key = strtolower(trim($header[0]));
$val = trim($header[1]);
$this->headers[$i][$key][] = $val;
if (null !== $this->logger) {
($this->logger)(sprintf('Part %s [%s: %s]', $i + 1, $key, $val), LOG_DEBUG);
}
return $len;
},
];
// -- add curl options from user input.
if (count($options[CurlHandle::class]) > 0) {
foreach ($options[CurlHandle::class] as $key => $value) {
if (true === in_array($key, self::CURL_BLACKLIST, true)) {
continue;
}
$opts[$key] = $value;
}
}
if (null !== ($options[CurlHandle::class][CURLOPT_PROGRESSFUNCTION] ?? null)) {
$opts[CURLOPT_NOPROGRESS] = false;
$opts[CURLOPT_PROGRESSFUNCTION] = function ($curl, $total, $downloaded) use ($i, $options) {
$this->progress[$i] = [$total, $downloaded];
$this->onProgress($options[CurlHandle::class][CURLOPT_PROGRESSFUNCTION]);
};
}
if (null !== ($options['headers'] ?? null)) {
$opts[CURLOPT_HTTPHEADER] = array_map(
fn($k, $v) => "{$k}: {$v}",
array_keys($options['headers']),
array_values($options['headers'])
);
}
curl_setopt_array($ch, $opts);
$handles[$i] = $ch;
}
return $handles;
}
/**
* Download the file.
* @param array<int,CurlHandle|resource> $handles
*/
private function download(array $handles): void
{
$mh = curl_multi_init();
foreach ($handles as $ch) {
$status = curl_multi_add_handle($mh, $ch);
if (0 !== $status) {
throw new RuntimeException(curl_multi_strerror($status));
}
}
do {
curl_multi_exec($mh, $active);
// wait for data to arrive no sense in wasting cpu cycles.
curl_multi_select($mh);
} while ($active > 0);
foreach ($handles as $i => $ch) {
$status = curl_multi_remove_handle($mh, $ch);
if (0 !== $status) {
throw new RuntimeException(
sprintf('Unable to remove handle %s - %s.', $i, curl_multi_strerror($status))
);
}
}
curl_multi_close($mh);
}
/**
* Combine the parts.
*
* @param resource $stream The stream to write the data to.
* @param array<int, resource> $streams The streams to combine.
* @param int $readSize
* @return resource Return the final combined stream.
*/
private function combineParts($stream, array $streams, int $readSize = 1024)
{
// -- sort streams to write them in order, sometimes they are not in order.
ksort($streams);
$count = count($streams) > 1;
foreach ($streams as $i => $part) {
if ($count && null !== $this->logger) {
($this->logger)(sprintf('Merging part %s...', $i + 1), LOG_DEBUG);
}
if (true === (bool)(stream_get_meta_data($part)['seekable'] ?? false)) {
rewind($part);
}
while (!feof($part)) {
fwrite($stream, fread($part, $readSize));
}
}
if ($count && null !== $this->logger) {
($this->logger)('Merging complete!', PHP_EOL);
}
return $stream;
}
/**
* Call the progress callback.
* @param callable(int, int):void $callback The callback.
*/
private function onProgress(callable $callback): void
{
$total = $downloaded = 0;
foreach ($this->progress as $progress) {
$total += $progress[0];
$downloaded += $progress[1];
}
$callback($total, $downloaded);
}
/**
* Normalize headers.
*
* @param array<string,array> $headers The headers to normalize.
*
* @return array The normalized headers.
*/
private static function normalizeHeaders(array $headers): array
{
$normalized = [];
foreach ($headers as $key => $value) {
$normalized[strtolower(trim($key))] = $value;
}
return $normalized;
}
private function sanityCheck(): void
{
// check each part has a content-range header and the content-length of the header matches the downloaded data size.
foreach ($this->headers as $i => $header) {
if (!isset($header['content-range'])) {
throw new RuntimeException(
sprintf('Part %s does not have a content-range header.', $i + 1)
);
}
if (!isset($header['content-length'])) {
throw new RuntimeException(
sprintf('Part %s does not have a content-length header.', $i + 1)
);
}
$contentRange = array_pop($header['content-range']);
$rx = '/(?<start>\d+)-(?<end>\d+)\/(?<total>\d+)/';
if (!preg_match($rx, $contentRange, $matches)) {
throw new RuntimeException(
sprintf(
'Part %s content-range [%s] header is invalid.',
$i + 1,
$contentRange,
)
);
}
$start = (int)$matches['start'];
$end = (int)$matches['end'];
if ($start !== $this->ranges[$i]['start'] || $end !== $this->ranges[$i]['end']) {
throw new RuntimeException(
sprintf(
'Part %s content-range header (start: %s, end: %s) does not match the sent range request (start: %s, end: %s).',
$i + 1,
$start,
$end,
$this->ranges[$i]['start'],
$this->ranges[$i]['end']
)
);
}
$contentLength = (int)array_pop($header['content-length']);
$streamSize = fstat($this->streams[$i]);
$streamSize = false === $streamSize ? null : (int)$streamSize['size'];
if (null !== $streamSize && $streamSize !== $contentLength) {
throw new RuntimeException(
sprintf(
'Part %s content-length header (%s) does not match the downloaded data size (%s).',
$i + 1,
$contentLength,
$streamSize,
)
);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment