/
ProtocolStreamReader.php
146 lines (130 loc) · 4.68 KB
/
ProtocolStreamReader.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?php
declare(strict_types = 1);
namespace Psalm\Internal\LanguageServer;
use AdvancedJsonRpc\Message as MessageBody;
use Amp\ByteStream\ResourceInputStream;
use Amp\Promise;
use Exception;
use Generator;
use function Amp\asyncCall;
use function explode;
use function strlen;
use function substr;
use function trim;
/**
* Source: https://github.com/felixfbecker/php-language-server/tree/master/src/ProtocolStreamReader.php
*/
class ProtocolStreamReader implements ProtocolReader
{
use EmitterTrait;
private const PARSE_HEADERS = 1;
private const PARSE_BODY = 2;
/**
* This is checked by ProtocolStreamReader so that it will stop reading from streams in the forked process.
* There could be buffered bytes in stdin/over TCP, those would be processed by TCP if it were not for this check.
*
* @var bool
*/
private $is_accepting_new_requests = true;
/** @var int */
private $parsing_mode = self::PARSE_HEADERS;
/** @var string */
private $buffer = '';
/** @var string[] */
private $headers = [];
/** @var ?int */
private $content_length;
/** @var bool */
private $did_emit_close = false;
/**
* @param resource $input
*/
public function __construct($input)
{
$input = new ResourceInputStream($input);
asyncCall(
/**
* @return Generator<int, Promise<?string>, ?string, void>
*/
function () use ($input): Generator {
while ($this->is_accepting_new_requests) {
$read_promise = $input->read();
$chunk = yield $read_promise;
if ($chunk === null) {
break;
}
if ($this->readMessages($chunk) > 0) {
$this->emit('readMessageGroup');
}
}
$this->emitClose();
}
);
$this->on(
'close',
static function () use ($input): void {
$input->close();
}
);
}
private function readMessages(string $buffer): int
{
$emitted_messages = 0;
$i = 0;
while (($buffer[$i] ?? '') !== '') {
$this->buffer .= $buffer[$i++];
switch ($this->parsing_mode) {
case self::PARSE_HEADERS:
if ($this->buffer === "\r\n") {
$this->parsing_mode = self::PARSE_BODY;
$this->content_length = (int) ($this->headers['Content-Length'] ?? 0);
$this->buffer = '';
} elseif (substr($this->buffer, -2) === "\r\n") {
$parts = explode(':', $this->buffer);
$this->headers[$parts[0]] = trim($parts[1]);
$this->buffer = '';
}
break;
case self::PARSE_BODY:
if (strlen($this->buffer) === $this->content_length) {
if (!$this->is_accepting_new_requests) {
// If we fork, don't read any bytes in the input buffer from the worker process.
$this->emitClose();
return $emitted_messages;
}
// MessageBody::parse can throw an Error, maybe log an error?
try {
$msg = new Message(MessageBody::parse($this->buffer), $this->headers);
} catch (Exception $_) {
$msg = null;
}
if ($msg) {
++$emitted_messages;
$this->emit('message', [$msg]);
/**
* @psalm-suppress DocblockTypeContradiction
*/
if (!$this->is_accepting_new_requests) {
// If we fork, don't read any bytes in the input buffer from the worker process.
$this->emitClose();
return $emitted_messages;
}
}
$this->parsing_mode = self::PARSE_HEADERS;
$this->headers = [];
$this->buffer = '';
}
break;
}
}
return $emitted_messages;
}
private function emitClose(): void
{
if ($this->did_emit_close) {
return;
}
$this->did_emit_close = true;
$this->emit('close');
}
}