Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
39.57% |
93 / 235 |
|
0.00% |
0 / 6 |
CRAP | |
0.00% |
0 / 1 |
Websocket | |
39.57% |
93 / 235 |
|
0.00% |
0 / 6 |
1282.16 | |
0.00% |
0 / 1 |
input | |
34.31% |
35 / 102 |
|
0.00% |
0 / 1 |
341.64 | |||
encode | |
31.03% |
9 / 29 |
|
0.00% |
0 / 1 |
78.29 | |||
decode | |
59.26% |
16 / 27 |
|
0.00% |
0 / 1 |
10.31 | |||
inflate | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
12 | |||
deflate | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
dealHandshake | |
62.26% |
33 / 53 |
|
0.00% |
0 / 1 |
27.09 |
1 | <?php |
2 | /** |
3 | * This file is part of workerman. |
4 | * |
5 | * Licensed under The MIT License |
6 | * For full copyright and license information, please see the MIT-LICENSE.txt |
7 | * Redistributions of files must retain the above copyright notice. |
8 | * |
9 | * @author walkor<walkor@workerman.net> |
10 | * @copyright walkor<walkor@workerman.net> |
11 | * @link http://www.workerman.net/ |
12 | * @license http://www.opensource.org/licenses/mit-license.php MIT License |
13 | */ |
14 | |
15 | declare(strict_types=1); |
16 | |
17 | namespace Workerman\Protocols; |
18 | |
19 | use Throwable; |
20 | use Workerman\Connection\ConnectionInterface; |
21 | use Workerman\Connection\TcpConnection; |
22 | use Workerman\Protocols\Http\Request; |
23 | use Workerman\Worker; |
24 | use function base64_encode; |
25 | use function chr; |
26 | use function deflate_add; |
27 | use function deflate_init; |
28 | use function floor; |
29 | use function inflate_add; |
30 | use function inflate_init; |
31 | use function is_scalar; |
32 | use function ord; |
33 | use function pack; |
34 | use function preg_match; |
35 | use function sha1; |
36 | use function str_repeat; |
37 | use function stripos; |
38 | use function strlen; |
39 | use function strpos; |
40 | use function substr; |
41 | use function unpack; |
42 | use const ZLIB_DEFAULT_STRATEGY; |
43 | use const ZLIB_ENCODING_RAW; |
44 | |
45 | /** |
46 | * WebSocket protocol. |
47 | */ |
48 | class Websocket |
49 | { |
50 | /** |
51 | * Websocket blob type. |
52 | * |
53 | * @var string |
54 | */ |
55 | public const BINARY_TYPE_BLOB = "\x81"; |
56 | |
57 | /** |
58 | * Websocket blob type. |
59 | * |
60 | * @var string |
61 | */ |
62 | const BINARY_TYPE_BLOB_DEFLATE = "\xc1"; |
63 | |
64 | /** |
65 | * Websocket arraybuffer type. |
66 | * |
67 | * @var string |
68 | */ |
69 | public const BINARY_TYPE_ARRAYBUFFER = "\x82"; |
70 | |
71 | /** |
72 | * Websocket arraybuffer type. |
73 | * |
74 | * @var string |
75 | */ |
76 | const BINARY_TYPE_ARRAYBUFFER_DEFLATE = "\xc2"; |
77 | |
78 | /** |
79 | * Check the integrity of the package. |
80 | * |
81 | * @param string $buffer |
82 | * @param TcpConnection $connection |
83 | * @return int |
84 | */ |
85 | public static function input(string $buffer, TcpConnection $connection): int |
86 | { |
87 | $connection->websocketOrigin = $connection->websocketOrigin ?? null; |
88 | $connection->websocketClientProtocol = $connection->websocketClientProtocol ?? null; |
89 | // Receive length. |
90 | $recvLen = strlen($buffer); |
91 | // We need more data. |
92 | if ($recvLen < 6) { |
93 | return 0; |
94 | } |
95 | |
96 | // Has not yet completed the handshake. |
97 | if (empty($connection->context->websocketHandshake)) { |
98 | return static::dealHandshake($buffer, $connection); |
99 | } |
100 | |
101 | // Buffer websocket frame data. |
102 | if ($connection->context->websocketCurrentFrameLength) { |
103 | // We need more frame data. |
104 | if ($connection->context->websocketCurrentFrameLength > $recvLen) { |
105 | // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1. |
106 | return 0; |
107 | } |
108 | } else { |
109 | $firstByte = ord($buffer[0]); |
110 | $secondByte = ord($buffer[1]); |
111 | $dataLen = $secondByte & 127; |
112 | $isFinFrame = $firstByte >> 7; |
113 | $masked = $secondByte >> 7; |
114 | |
115 | if (!$masked) { |
116 | Worker::safeEcho("frame not masked so close the connection\n"); |
117 | $connection->close(); |
118 | return 0; |
119 | } |
120 | |
121 | $opcode = $firstByte & 0xf; |
122 | switch ($opcode) { |
123 | case 0x0: |
124 | // Blob type. |
125 | case 0x1: |
126 | // Arraybuffer type. |
127 | case 0x2: |
128 | // Ping package. |
129 | case 0x9: |
130 | // Pong package. |
131 | case 0xa: |
132 | break; |
133 | // Close package. |
134 | case 0x8: |
135 | // Try to emit onWebSocketClose callback. |
136 | $closeCb = $connection->onWebSocketClose ?? $connection->worker->onWebSocketClose ?? false; |
137 | if ($closeCb) { |
138 | try { |
139 | $closeCb($connection); |
140 | } catch (Throwable $e) { |
141 | Worker::stopAll(250, $e); |
142 | } |
143 | } // Close connection. |
144 | else { |
145 | $connection->close("\x88\x02\x03\xe8", true); |
146 | } |
147 | return 0; |
148 | // Wrong opcode. |
149 | default : |
150 | Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n"); |
151 | $connection->close(); |
152 | return 0; |
153 | } |
154 | |
155 | // Calculate packet length. |
156 | $headLen = 6; |
157 | if ($dataLen === 126) { |
158 | $headLen = 8; |
159 | if ($headLen > $recvLen) { |
160 | return 0; |
161 | } |
162 | $pack = unpack('nn/ntotal_len', $buffer); |
163 | $dataLen = $pack['total_len']; |
164 | } else { |
165 | if ($dataLen === 127) { |
166 | $headLen = 14; |
167 | if ($headLen > $recvLen) { |
168 | return 0; |
169 | } |
170 | $arr = unpack('n/N2c', $buffer); |
171 | $dataLen = $arr['c1'] * 4294967296 + $arr['c2']; |
172 | } |
173 | } |
174 | $currentFrameLength = $headLen + $dataLen; |
175 | |
176 | $totalPackageSize = strlen($connection->context->websocketDataBuffer) + $currentFrameLength; |
177 | if ($totalPackageSize > $connection->maxPackageSize) { |
178 | Worker::safeEcho("error package. package_length=$totalPackageSize\n"); |
179 | $connection->close(); |
180 | return 0; |
181 | } |
182 | |
183 | if ($isFinFrame) { |
184 | if ($opcode === 0x9) { |
185 | if ($recvLen >= $currentFrameLength) { |
186 | $pingData = static::decode(substr($buffer, 0, $currentFrameLength), $connection); |
187 | $connection->consumeRecvBuffer($currentFrameLength); |
188 | $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB; |
189 | $connection->websocketType = "\x8a"; |
190 | $pingCb = $connection->onWebSocketPing ?? $connection->worker->onWebSocketPing ?? false; |
191 | if ($pingCb) { |
192 | try { |
193 | $pingCb($connection, $pingData); |
194 | } catch (Throwable $e) { |
195 | Worker::stopAll(250, $e); |
196 | } |
197 | } else { |
198 | $connection->send($pingData); |
199 | } |
200 | $connection->websocketType = $tmpConnectionType; |
201 | if ($recvLen > $currentFrameLength) { |
202 | return static::input(substr($buffer, $currentFrameLength), $connection); |
203 | } |
204 | } |
205 | return 0; |
206 | } |
207 | |
208 | if ($opcode === 0xa) { |
209 | if ($recvLen >= $currentFrameLength) { |
210 | $pongData = static::decode(substr($buffer, 0, $currentFrameLength), $connection); |
211 | $connection->consumeRecvBuffer($currentFrameLength); |
212 | $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB; |
213 | $connection->websocketType = "\x8a"; |
214 | // Try to emit onWebSocketPong callback. |
215 | $pongCb = $connection->onWebSocketPong ?? $connection->worker->onWebSocketPong ?? false; |
216 | if ($pongCb) { |
217 | try { |
218 | $pongCb($connection, $pongData); |
219 | } catch (Throwable $e) { |
220 | Worker::stopAll(250, $e); |
221 | } |
222 | } |
223 | $connection->websocketType = $tmpConnectionType; |
224 | if ($recvLen > $currentFrameLength) { |
225 | return static::input(substr($buffer, $currentFrameLength), $connection); |
226 | } |
227 | } |
228 | return 0; |
229 | } |
230 | return $currentFrameLength; |
231 | } |
232 | |
233 | $connection->context->websocketCurrentFrameLength = $currentFrameLength; |
234 | } |
235 | |
236 | // Received just a frame length data. |
237 | if ($connection->context->websocketCurrentFrameLength === $recvLen) { |
238 | static::decode($buffer, $connection); |
239 | $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength); |
240 | $connection->context->websocketCurrentFrameLength = 0; |
241 | return 0; |
242 | } |
243 | |
244 | // The length of the received data is greater than the length of a frame. |
245 | if ($connection->context->websocketCurrentFrameLength < $recvLen) { |
246 | static::decode(substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection); |
247 | $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength); |
248 | $currentFrameLength = $connection->context->websocketCurrentFrameLength; |
249 | $connection->context->websocketCurrentFrameLength = 0; |
250 | // Continue to read next frame. |
251 | return static::input(substr($buffer, $currentFrameLength), $connection); |
252 | } |
253 | |
254 | // The length of the received data is less than the length of a frame. |
255 | return 0; |
256 | } |
257 | |
258 | /** |
259 | * Websocket encode. |
260 | * |
261 | * @param mixed $buffer |
262 | * @param TcpConnection $connection |
263 | * @return string |
264 | */ |
265 | public static function encode(mixed $buffer, TcpConnection $connection): string |
266 | { |
267 | if (!is_scalar($buffer)) { |
268 | $buffer = json_encode($buffer, JSON_UNESCAPED_UNICODE); |
269 | } |
270 | |
271 | if (empty($connection->websocketType)) { |
272 | $connection->websocketType = static::BINARY_TYPE_BLOB; |
273 | } |
274 | |
275 | if (ord($connection->websocketType) & 64) { |
276 | $buffer = static::deflate($connection, $buffer); |
277 | } |
278 | |
279 | $firstByte = $connection->websocketType; |
280 | $len = strlen($buffer); |
281 | |
282 | if ($len <= 125) { |
283 | $encodeBuffer = $firstByte . chr($len) . $buffer; |
284 | } else { |
285 | if ($len <= 65535) { |
286 | $encodeBuffer = $firstByte . chr(126) . pack("n", $len) . $buffer; |
287 | } else { |
288 | $encodeBuffer = $firstByte . chr(127) . pack("xxxxN", $len) . $buffer; |
289 | } |
290 | } |
291 | |
292 | // Handshake not completed so temporary buffer websocket data waiting for send. |
293 | if (empty($connection->context->websocketHandshake)) { |
294 | if (empty($connection->context->tmpWebsocketData)) { |
295 | $connection->context->tmpWebsocketData = ''; |
296 | } |
297 | // If buffer has already full then discard the current package. |
298 | if (strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) { |
299 | if ($connection->onError) { |
300 | try { |
301 | ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package'); |
302 | } catch (Throwable $e) { |
303 | Worker::stopAll(250, $e); |
304 | } |
305 | } |
306 | return ''; |
307 | } |
308 | $connection->context->tmpWebsocketData .= $encodeBuffer; |
309 | // Check buffer is full. |
310 | if ($connection->onBufferFull && $connection->maxSendBufferSize <= strlen($connection->context->tmpWebsocketData)) { |
311 | try { |
312 | ($connection->onBufferFull)($connection); |
313 | } catch (Throwable $e) { |
314 | Worker::stopAll(250, $e); |
315 | } |
316 | } |
317 | // Return empty string. |
318 | return ''; |
319 | } |
320 | |
321 | return $encodeBuffer; |
322 | } |
323 | |
324 | /** |
325 | * Websocket decode. |
326 | * |
327 | * @param string $buffer |
328 | * @param TcpConnection $connection |
329 | * @return string |
330 | */ |
331 | public static function decode(string $buffer, TcpConnection $connection): string |
332 | { |
333 | $firstByte = ord($buffer[0]); |
334 | $secondByte = ord($buffer[1]); |
335 | $len = $secondByte & 127; |
336 | $isFinFrame = (bool)($firstByte >> 7); |
337 | $rsv1 = 64 === ($firstByte & 64); |
338 | |
339 | if ($len === 126) { |
340 | $masks = substr($buffer, 4, 4); |
341 | $data = substr($buffer, 8); |
342 | } else { |
343 | if ($len === 127) { |
344 | $masks = substr($buffer, 10, 4); |
345 | $data = substr($buffer, 14); |
346 | } else { |
347 | $masks = substr($buffer, 2, 4); |
348 | $data = substr($buffer, 6); |
349 | } |
350 | } |
351 | $dataLength = strlen($data); |
352 | $masks = str_repeat($masks, (int)floor($dataLength / 4)) . substr($masks, 0, $dataLength % 4); |
353 | $decoded = $data ^ $masks; |
354 | if ($connection->context->websocketCurrentFrameLength) { |
355 | $connection->context->websocketDataBuffer .= $decoded; |
356 | if ($rsv1) { |
357 | return static::inflate($connection, $connection->context->websocketDataBuffer, $isFinFrame); |
358 | } |
359 | return $connection->context->websocketDataBuffer; |
360 | } |
361 | if ($connection->context->websocketDataBuffer !== '') { |
362 | $decoded = $connection->context->websocketDataBuffer . $decoded; |
363 | $connection->context->websocketDataBuffer = ''; |
364 | } |
365 | if ($rsv1) { |
366 | return static::inflate($connection, $decoded, $isFinFrame); |
367 | } |
368 | return $decoded; |
369 | } |
370 | |
371 | /** |
372 | * Inflate. |
373 | * |
374 | * @param TcpConnection $connection |
375 | * @param string $buffer |
376 | * @param bool $isFinFrame |
377 | * @return false|string |
378 | */ |
379 | protected static function inflate(TcpConnection $connection, string $buffer, bool $isFinFrame): bool|string |
380 | { |
381 | if (!isset($connection->context->inflator)) { |
382 | $connection->context->inflator = inflate_init( |
383 | ZLIB_ENCODING_RAW, |
384 | [ |
385 | 'level' => -1, |
386 | 'memory' => 8, |
387 | 'window' => 15, |
388 | 'strategy' => ZLIB_DEFAULT_STRATEGY |
389 | ] |
390 | ); |
391 | } |
392 | if ($isFinFrame) { |
393 | $buffer .= "\x00\x00\xff\xff"; |
394 | } |
395 | return inflate_add($connection->context->inflator, $buffer); |
396 | } |
397 | |
398 | /** |
399 | * Deflate. |
400 | * |
401 | * @param TcpConnection $connection |
402 | * @param string $buffer |
403 | * @return false|string |
404 | */ |
405 | protected static function deflate(TcpConnection $connection, string $buffer): bool|string |
406 | { |
407 | if (!isset($connection->context->deflator)) { |
408 | $connection->context->deflator = deflate_init( |
409 | ZLIB_ENCODING_RAW, |
410 | [ |
411 | 'level' => -1, |
412 | 'memory' => 8, |
413 | 'window' => 15, |
414 | 'strategy' => ZLIB_DEFAULT_STRATEGY |
415 | ] |
416 | ); |
417 | } |
418 | return substr(deflate_add($connection->context->deflator, $buffer), 0, -4); |
419 | } |
420 | |
421 | /** |
422 | * Websocket handshake. |
423 | * |
424 | * @param string $buffer |
425 | * @param TcpConnection $connection |
426 | * @return int |
427 | */ |
428 | public static function dealHandshake(string $buffer, TcpConnection $connection): int |
429 | { |
430 | // HTTP protocol. |
431 | if (str_starts_with($buffer, 'GET')) { |
432 | // Find \r\n\r\n. |
433 | $headerEndPos = strpos($buffer, "\r\n\r\n"); |
434 | if (!$headerEndPos) { |
435 | return 0; |
436 | } |
437 | $headerLength = $headerEndPos + 4; |
438 | |
439 | // Get Sec-WebSocket-Key. |
440 | if (preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) { |
441 | $SecWebSocketKey = $match[1]; |
442 | } else { |
443 | $connection->close( |
444 | "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>WebSocket</h1><hr>workerman</div>", true); |
445 | return 0; |
446 | } |
447 | // Calculation websocket key. |
448 | $newKey = base64_encode(sha1($SecWebSocketKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)); |
449 | // Handshake response data. |
450 | $handshakeMessage = "HTTP/1.1 101 Switching Protocol\r\n" |
451 | . "Upgrade: websocket\r\n" |
452 | . "Sec-WebSocket-Version: 13\r\n" |
453 | . "Connection: Upgrade\r\n" |
454 | . "Sec-WebSocket-Accept: " . $newKey . "\r\n"; |
455 | |
456 | // Websocket data buffer. |
457 | $connection->context->websocketDataBuffer = ''; |
458 | // Current websocket frame length. |
459 | $connection->context->websocketCurrentFrameLength = 0; |
460 | // Current websocket frame data. |
461 | $connection->context->websocketCurrentFrameBuffer = ''; |
462 | // Consume handshake data. |
463 | $connection->consumeRecvBuffer($headerLength); |
464 | // Request from buffer |
465 | $request = new Request($buffer); |
466 | |
467 | // Try to emit onWebSocketConnect callback. |
468 | $onWebsocketConnect = $connection->onWebSocketConnect ?? $connection->worker->onWebSocketConnect ?? false; |
469 | if ($onWebsocketConnect) { |
470 | try { |
471 | $onWebsocketConnect($connection, $request); |
472 | } catch (Throwable $e) { |
473 | Worker::stopAll(250, $e); |
474 | } |
475 | } |
476 | |
477 | // blob or arraybuffer |
478 | if (empty($connection->websocketType)) { |
479 | $connection->websocketType = static::BINARY_TYPE_BLOB; |
480 | } |
481 | |
482 | $hasServerHeader = false; |
483 | |
484 | if ($connection->headers) { |
485 | foreach ($connection->headers as $header) { |
486 | if (stripos($header, 'Server:') === 0) { |
487 | $hasServerHeader = true; |
488 | } |
489 | $handshakeMessage .= "$header\r\n"; |
490 | } |
491 | } |
492 | if (!$hasServerHeader) { |
493 | $handshakeMessage .= "Server: workerman\r\n"; |
494 | } |
495 | $handshakeMessage .= "\r\n"; |
496 | // Send handshake response. |
497 | $connection->send($handshakeMessage, true); |
498 | // Mark handshake complete. |
499 | $connection->context->websocketHandshake = true; |
500 | |
501 | // Try to emit onWebSocketConnected callback. |
502 | $onWebsocketConnected = $connection->onWebSocketConnected ?? $connection->worker->onWebSocketConnected ?? false; |
503 | if ($onWebsocketConnected) { |
504 | try { |
505 | $onWebsocketConnected($connection, $request); |
506 | } catch (Throwable $e) { |
507 | Worker::stopAll(250, $e); |
508 | } |
509 | } |
510 | |
511 | // There are data waiting to be sent. |
512 | if (!empty($connection->context->tmpWebsocketData)) { |
513 | $connection->send($connection->context->tmpWebsocketData, true); |
514 | $connection->context->tmpWebsocketData = ''; |
515 | } |
516 | if (strlen($buffer) > $headerLength) { |
517 | return static::input(substr($buffer, $headerLength), $connection); |
518 | } |
519 | return 0; |
520 | } |
521 | // Bad websocket handshake request. |
522 | $connection->close( |
523 | "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>400 Bad Request</h1><hr>workerman</div>", true); |
524 | return 0; |
525 | } |
526 | } |