Skip to content

Commit

Permalink
Only read up to Content-Length in stream wrapper
Browse files Browse the repository at this point in the history
This commit updates the stream wrapper to only read up to the number of
bytes returned in the Content-Length header when draining a stream
synchronously.
  • Loading branch information
mtdowling committed Jun 30, 2016
1 parent 79c6fbe commit baef4ac
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
74 changes: 74 additions & 0 deletions _test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

require 'vendor/autoload.php';

use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\MessageFormatter;
use GuzzleHttp\Middleware;
use PHPHtmlParser\Dom;
use PHPHtmlParser\Exceptions\EmptyCollectionException;

$stack = new HandlerStack();
$formatter = new MessageFormatter();
$stack->setHandler(\GuzzleHttp\choose_handler());
$stack->push(Middleware::httpErrors());
$client = new Client(['handler' => $stack,]);

function overviewRequestFactory($page = 1)
{
return new Request('GET', sprintf('https://kat.cr/movies/%d/', $page));
}

function subpageRequestFactory($url)
{
return new Request('GET', 'https://kat.cr' . $url);
}

$iterator = new SplQueue();

// if the upper bound is set to a number > 5 it will work
for ($i = 0; $i < 2; ++$i) {
$iterator->push(overviewRequestFactory($i));
}

$requestGenerator = function() use ($iterator) {
while (!$iterator->isEmpty()) {
yield $iterator->pop();
}
};

$pool = new Pool($client, $requestGenerator(), [
'concurrency' => 5,
'fulfilled' => function ($response) use ($iterator) {
//parse HTML response
$dom = new Dom();
$dom->load($response->getBody());
if ($dom->find('title')->innerHTML === 'Download Movie Torrents - Kickass Torrents') {
//this is a overviewResponse
echo "received overview response\n";
try {
$rows = $dom->find('#mainSearchTable')->find('table')->find('tr');
for ($i = 1;$i < count($rows); ++$i) { //skip first row (header)
$row = $rows[$i];
$url = $row->find('td')->find('.markeredBlock')->find('a')->getAttribute('href');
####### ADD NEW REQUEST
echo $url . "\n";
$iterator->push(subpageRequestFactory($url));
}
} catch (EmptyCollectionException $e) {
echo $e;
}
} else {
//this is a subpageResponse
echo "Subpage response\n";
}
},
'rejected' => function ($reason) {
echo $reason;
},
]);

$pool->promise()->wait();
26 changes: 22 additions & 4 deletions src/Handler/StreamHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ private function createResponse(
}

if ($sink !== $stream) {
$this->drain($stream, $sink);
$this->drain(
$stream,
$sink,
$response->getHeaderLine('Content-Length')
);
}

$this->invokeStats($options, $request, $startTime, $response, null);
Expand Down Expand Up @@ -181,13 +185,27 @@ private function checkDecode(array $options, array $headers, $stream)
*
* @param StreamInterface $source
* @param StreamInterface $sink
* @param string $contentLength Header specifying the amount of
* data to read.
*
* @return StreamInterface
* @throws \RuntimeException when the sink option is invalid.
*/
private function drain(StreamInterface $source, StreamInterface $sink)
{
Psr7\copy_to_stream($source, $sink);
private function drain(
StreamInterface $source,
StreamInterface $sink,
$contentLength
) {
// If a content-length header is provided, then stop reading once
// that number of bytes has been read. This can prevent infinitely
// reading from a stream when dealing with servers that do not honor
// Connection: Close headers.
Psr7\copy_to_stream(
$source,
$sink,
strlen($contentLength) > 0 ? (int) $contentLength : -1
);

$sink->seek(0);
$source->close();

Expand Down
18 changes: 18 additions & 0 deletions tests/Handler/StreamHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ public function testDrainsResponseIntoSaveToBodyAtNonExistentPath()
unlink($tmpfname);
}

public function testDrainsResponseAndReadsOnlyContentLengthBytes()
{
Server::flush();
Server::enqueue([
new Response(200, [
'Foo' => 'Bar',
'Content-Length' => 8,
], 'hi there... This has way too much data!')
]);
$handler = new StreamHandler();
$request = new Request('GET', Server::$url);
$response = $handler($request, [])->wait();
$body = $response->getBody();
$stream = $body->detach();
$this->assertEquals('hi there', stream_get_contents($stream));
fclose($stream);
}

public function testAutomaticallyDecompressGzip()
{
Server::flush();
Expand Down

0 comments on commit baef4ac

Please sign in to comment.