Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
27.62% covered (danger)
27.62%
87 / 315
3.45% covered (danger)
3.45%
1 / 29
CRAP
0.00% covered (danger)
0.00%
0 / 1
TcpConnection
27.62% covered (danger)
27.62%
87 / 315
3.45% covered (danger)
3.45%
1 / 29
7572.40
0.00% covered (danger)
0.00%
0 / 1
 __construct
92.86% covered (success)
92.86%
13 / 14
0.00% covered (danger)
0.00%
0 / 1
2.00
 getStatus
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 send
26.00% covered (danger)
26.00%
13 / 50
0.00% covered (danger)
0.00%
0 / 1
218.13
 getRemoteIp
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getRemotePort
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getRemoteAddress
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getLocalIp
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getLocalPort
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getLocalAddress
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getSendBufferQueueSize
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecvBufferQueueSize
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 pauseRecv
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resumeRecv
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 baseRead
37.97% covered (danger)
37.97%
30 / 79
0.00% covered (danger)
0.00%
0 / 1
401.94
 baseWrite
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
90
 doSslHandshake
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
42
 pipe
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
2
 consumeRecvBuffer
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 close
63.64% covered (warning)
63.64%
7 / 11
0.00% covered (danger)
0.00%
0 / 1
7.73
 isIpV4
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 isIpV6
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getSocket
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 checkBufferWillFull
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
20
 bufferIsFull
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 bufferIsEmpty
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 destroy
70.37% covered (warning)
70.37%
19 / 27
0.00% covered (danger)
0.00%
0 / 1
17.40
 jsonSerialize
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
2
 __wakeup
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 __destruct
36.36% covered (danger)
36.36%
4 / 11
0.00% covered (danger)
0.00%
0 / 1
15.28
1<?php
2/**
3 * This file is part of workerman.
4 *
5 * Licensed under The MIT License
6 * For full copyright and license information, please see the MIT-LICENSE.txt
7 * Redistributions of files must retain the above copyright notice.
8 *
9 * @author    walkor<walkor@workerman.net>
10 * @copyright walkor<walkor@workerman.net>
11 * @link      http://www.workerman.net/
12 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
13 */
14declare(strict_types=1);
15
16namespace Workerman\Connection;
17
18use JsonSerializable;
19use RuntimeException;
20use stdClass;
21use Throwable;
22use Workerman\Events\Ev;
23use Workerman\Events\Event;
24use Workerman\Events\EventInterface;
25use Workerman\Events\Select;
26use Workerman\Protocols\Http;
27use Workerman\Protocols\Http\Request;
28use Workerman\Protocols\ProtocolInterface;
29use Workerman\Worker;
30
31use function ceil;
32use function count;
33use function fclose;
34use function feof;
35use function fread;
36use function function_exists;
37use function fwrite;
38use function is_object;
39use function is_resource;
40use function key;
41use function method_exists;
42use function posix_getpid;
43use function restore_error_handler;
44use function set_error_handler;
45use function stream_set_blocking;
46use function stream_set_read_buffer;
47use function stream_socket_enable_crypto;
48use function stream_socket_get_name;
49use function strlen;
50use function strrchr;
51use function strrpos;
52use function substr;
53use function var_export;
54
55use const PHP_INT_MAX;
56use const STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
57use const STREAM_CRYPTO_METHOD_SSLv23_SERVER;
58use const STREAM_CRYPTO_METHOD_SSLv2_CLIENT;
59use const STREAM_CRYPTO_METHOD_SSLv2_SERVER;
60
61/**
62 * TcpConnection.
63 * @property string $websocketType
64 * @property string|null $websocketClientProtocol
65 * @property string|null $websocketOrigin
66 */
67class TcpConnection extends ConnectionInterface implements JsonSerializable
68{
69    /**
70     * Read buffer size.
71     *
72     * @var int
73     */
74    public const READ_BUFFER_SIZE = 87380;
75
76    /**
77     * Status initial.
78     *
79     * @var int
80     */
81    public const STATUS_INITIAL = 0;
82
83    /**
84     * Status connecting.
85     *
86     * @var int
87     */
88    public const STATUS_CONNECTING = 1;
89
90    /**
91     * Status connection established.
92     *
93     * @var int
94     */
95    public const STATUS_ESTABLISHED = 2;
96
97    /**
98     * Status closing.
99     *
100     * @var int
101     */
102    public const STATUS_CLOSING = 4;
103
104    /**
105     * Status closed.
106     *
107     * @var int
108     */
109    public const STATUS_CLOSED = 8;
110
111    /**
112     * Maximum string length for cache
113     *
114     * @var int
115     */
116    public const MAX_CACHE_STRING_LENGTH = 2048;
117
118    /**
119     * Maximum cache size.
120     *
121     * @var int
122     */
123    public const MAX_CACHE_SIZE = 512;
124
125    /**
126     * Tcp keepalive interval.
127     */
128    public const TCP_KEEPALIVE_INTERVAL = 55;
129
130    /**
131     * Emitted when socket connection is successfully established.
132     *
133     * @var ?callable
134     */
135    public $onConnect = null;
136
137    /**
138     * Emitted before websocket handshake (Only called when protocol is ws).
139     *
140     * @var ?callable
141     */
142    public $onWebSocketConnect = null;
143
144    /**
145     * Emitted after websocket handshake (Only called when protocol is ws).
146     *
147     * @var ?callable
148     */
149    public $onWebSocketConnected = null;
150
151    /**
152     * Emitted when websocket connection is closed (Only called when protocol is ws).
153     *
154     * @var ?callable
155     */
156    public $onWebSocketClose = null;
157
158    /**
159     * Emitted when data is received.
160     *
161     * @var ?callable
162     */
163    public $onMessage = null;
164
165    /**
166     * Emitted when the other end of the socket sends a FIN packet.
167     *
168     * @var ?callable
169     */
170    public $onClose = null;
171
172    /**
173     * Emitted when an error occurs with connection.
174     *
175     * @var ?callable
176     */
177    public $onError = null;
178
179    /**
180     * Emitted when the send buffer becomes full.
181     *
182     * @var ?callable
183     */
184    public $onBufferFull = null;
185
186    /**
187     * Emitted when send buffer becomes empty.
188     *
189     * @var ?callable
190     */
191    public $onBufferDrain = null;
192
193    /**
194     * Transport (tcp/udp/unix/ssl).
195     *
196     * @var string
197     */
198    public string $transport = 'tcp';
199
200    /**
201     * Which worker belong to.
202     *
203     * @var ?Worker
204     */
205    public ?Worker $worker = null;
206
207    /**
208     * Bytes read.
209     *
210     * @var int
211     */
212    public int $bytesRead = 0;
213
214    /**
215     * Bytes written.
216     *
217     * @var int
218     */
219    public int $bytesWritten = 0;
220
221    /**
222     * Connection->id.
223     *
224     * @var int
225     */
226    public int $id = 0;
227
228    /**
229     * A copy of $worker->id which used to clean up the connection in worker->connections
230     *
231     * @var int
232     */
233    protected int $realId = 0;
234
235    /**
236     * Sets the maximum send buffer size for the current connection.
237     * OnBufferFull callback will be emitted When send buffer is full.
238     *
239     * @var int
240     */
241    public int $maxSendBufferSize = 1048576;
242
243    /**
244     * Context.
245     *
246     * @var ?stdClass
247     */
248    public ?stdClass $context = null;
249
250    /**
251     * @var array
252     */
253    public array $headers = [];
254
255    /**
256     * @var ?Request
257     */
258    public ?Request $request = null;
259
260    /**
261     * Is safe.
262     *
263     * @var bool
264     */
265    protected bool $isSafe = true;
266
267    /**
268     * Default send buffer size.
269     *
270     * @var int
271     */
272    public static int $defaultMaxSendBufferSize = 1048576;
273
274    /**
275     * Sets the maximum acceptable packet size for the current connection.
276     *
277     * @var int
278     */
279    public int $maxPackageSize = 1048576;
280
281    /**
282     * Default maximum acceptable packet size.
283     *
284     * @var int
285     */
286    public static int $defaultMaxPackageSize = 10485760;
287
288    /**
289     * Id recorder.
290     *
291     * @var int
292     */
293    protected static int $idRecorder = 1;
294
295    /**
296     * Socket
297     *
298     * @var resource
299     */
300    protected $socket = null;
301
302    /**
303     * Send buffer.
304     *
305     * @var string
306     */
307    protected string $sendBuffer = '';
308
309    /**
310     * Receive buffer.
311     *
312     * @var string
313     */
314    protected string $recvBuffer = '';
315
316    /**
317     * Current package length.
318     *
319     * @var int
320     */
321    protected int $currentPackageLength = 0;
322
323    /**
324     * Connection status.
325     *
326     * @var int
327     */
328    protected int $status = self::STATUS_ESTABLISHED;
329
330    /**
331     * Remote address.
332     *
333     * @var string
334     */
335    protected string $remoteAddress = '';
336
337    /**
338     * Is paused.
339     *
340     * @var bool
341     */
342    protected bool $isPaused = false;
343
344    /**
345     * SSL handshake completed or not.
346     *
347     * @var bool
348     */
349    protected bool|int $sslHandshakeCompleted = false;
350
351    /**
352     * All connection instances.
353     *
354     * @var array
355     */
356    public static array $connections = [];
357
358    /**
359     * Status to string.
360     *
361     * @var array
362     */
363    public const STATUS_TO_STRING = [
364        self::STATUS_INITIAL => 'INITIAL',
365        self::STATUS_CONNECTING => 'CONNECTING',
366        self::STATUS_ESTABLISHED => 'ESTABLISHED',
367        self::STATUS_CLOSING => 'CLOSING',
368        self::STATUS_CLOSED => 'CLOSED',
369    ];
370
371    /**
372     * Construct.
373     *
374     * @param EventInterface $eventLoop
375     * @param resource $socket
376     * @param string $remoteAddress
377     */
378    public function __construct(EventInterface $eventLoop, $socket, string $remoteAddress = '')
379    {
380        ++self::$statistics['connection_count'];
381        $this->id = $this->realId = self::$idRecorder++;
382        if (self::$idRecorder === PHP_INT_MAX) {
383            self::$idRecorder = 0;
384        }
385        $this->socket = $socket;
386        stream_set_blocking($this->socket, false);
387        stream_set_read_buffer($this->socket, 0);
388        $this->eventLoop = $eventLoop;
389        $this->eventLoop->onReadable($this->socket, $this->baseRead(...));
390        $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
391        $this->maxPackageSize = self::$defaultMaxPackageSize;
392        $this->remoteAddress = $remoteAddress;
393        static::$connections[$this->id] = $this;
394        $this->context = new stdClass();
395    }
396
397    /**
398     * Get status.
399     *
400     * @param bool $rawOutput
401     *
402     * @return int|string
403     */
404    public function getStatus(bool $rawOutput = true): int|string
405    {
406        if ($rawOutput) {
407            return $this->status;
408        }
409        return self::STATUS_TO_STRING[$this->status];
410    }
411
412    /**
413     * Sends data on the connection.
414     *
415     * @param mixed $sendBuffer
416     * @param bool $raw
417     * @return bool|null
418     */
419    public function send(mixed $sendBuffer, bool $raw = false): bool|null
420    {
421        if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
422            return false;
423        }
424
425        // Try to call protocol::encode($sendBuffer) before sending.
426        if (false === $raw && $this->protocol !== null) {
427            try {
428                $sendBuffer = $this->protocol::encode($sendBuffer, $this);
429            } catch(Throwable $e) {
430                $this->error($e);
431            }
432            if ($sendBuffer === '') {
433                return null;
434            }
435        }
436
437        if ($this->status !== self::STATUS_ESTABLISHED ||
438            ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true)
439        ) {
440            if ($this->sendBuffer && $this->bufferIsFull()) {
441                ++self::$statistics['send_fail'];
442                return false;
443            }
444            $this->sendBuffer .= $sendBuffer;
445            $this->checkBufferWillFull();
446            return null;
447        }
448
449        // Attempt to send data directly.
450        if ($this->sendBuffer === '') {
451            if ($this->transport === 'ssl') {
452                $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
453                $this->sendBuffer = $sendBuffer;
454                $this->checkBufferWillFull();
455                return null;
456            }
457            $len = 0;
458            try {
459                $len = @fwrite($this->socket, $sendBuffer);
460            } catch (Throwable $e) {
461                Worker::log($e);
462            }
463            // send successful.
464            if ($len === strlen($sendBuffer)) {
465                $this->bytesWritten += $len;
466                return true;
467            }
468            // Send only part of the data.
469            if ($len > 0) {
470                $this->sendBuffer = substr($sendBuffer, $len);
471                $this->bytesWritten += $len;
472            } else {
473                // Connection closed?
474                if (!is_resource($this->socket) || feof($this->socket)) {
475                    ++self::$statistics['send_fail'];
476                    if ($this->onError) {
477                        try {
478                            ($this->onError)($this, static::SEND_FAIL, 'client closed');
479                        } catch (Throwable $e) {
480                            $this->error($e);
481                        }
482                    }
483                    $this->destroy();
484                    return false;
485                }
486                $this->sendBuffer = $sendBuffer;
487            }
488            $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
489            // Check if send buffer will be full.
490            $this->checkBufferWillFull();
491            return null;
492        }
493
494        if ($this->bufferIsFull()) {
495            ++self::$statistics['send_fail'];
496            return false;
497        }
498
499        $this->sendBuffer .= $sendBuffer;
500        // Check if send buffer is full.
501        $this->checkBufferWillFull();
502        return null;
503    }
504
505    /**
506     * Get remote IP.
507     *
508     * @return string
509     */
510    public function getRemoteIp(): string
511    {
512        $pos = strrpos($this->remoteAddress, ':');
513        if ($pos) {
514            return substr($this->remoteAddress, 0, $pos);
515        }
516        return '';
517    }
518
519    /**
520     * Get remote port.
521     *
522     * @return int
523     */
524    public function getRemotePort(): int
525    {
526        if ($this->remoteAddress) {
527            return (int)substr(strrchr($this->remoteAddress, ':'), 1);
528        }
529        return 0;
530    }
531
532    /**
533     * Get remote address.
534     *
535     * @return string
536     */
537    public function getRemoteAddress(): string
538    {
539        return $this->remoteAddress;
540    }
541
542    /**
543     * Get local IP.
544     *
545     * @return string
546     */
547    public function getLocalIp(): string
548    {
549        $address = $this->getLocalAddress();
550        $pos = strrpos($address, ':');
551        if (!$pos) {
552            return '';
553        }
554        return substr($address, 0, $pos);
555    }
556
557    /**
558     * Get local port.
559     *
560     * @return int
561     */
562    public function getLocalPort(): int
563    {
564        $address = $this->getLocalAddress();
565        $pos = strrpos($address, ':');
566        if (!$pos) {
567            return 0;
568        }
569        return (int)substr(strrchr($address, ':'), 1);
570    }
571
572    /**
573     * Get local address.
574     *
575     * @return string
576     */
577    public function getLocalAddress(): string
578    {
579        if (!is_resource($this->socket)) {
580            return '';
581        }
582        return (string)@stream_socket_get_name($this->socket, false);
583    }
584
585    /**
586     * Get send buffer queue size.
587     *
588     * @return integer
589     */
590    public function getSendBufferQueueSize(): int
591    {
592        return strlen($this->sendBuffer);
593    }
594
595    /**
596     * Get receive buffer queue size.
597     *
598     * @return integer
599     */
600    public function getRecvBufferQueueSize(): int
601    {
602        return strlen($this->recvBuffer);
603    }
604
605    /**
606     * Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
607     *
608     * @return void
609     */
610    public function pauseRecv(): void
611    {
612        if($this->eventLoop !== null){
613            $this->eventLoop->offReadable($this->socket);
614        }
615        $this->isPaused = true;
616    }
617
618    /**
619     * Resumes reading after a call to pauseRecv.
620     *
621     * @return void
622     */
623    public function resumeRecv(): void
624    {
625        if ($this->isPaused === true) {
626            $this->eventLoop->onReadable($this->socket, $this->baseRead(...));
627            $this->isPaused = false;
628            $this->baseRead($this->socket, false);
629        }
630    }
631
632
633    /**
634     * Base read handler.
635     *
636     * @param resource $socket
637     * @param bool $checkEof
638     * @return void
639     */
640    public function baseRead($socket, bool $checkEof = true): void
641    {
642        static $requests = [];
643        // SSL handshake.
644        if ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true) {
645            if ($this->doSslHandshake($socket)) {
646                $this->sslHandshakeCompleted = true;
647                if ($this->sendBuffer) {
648                    $this->eventLoop->onWritable($socket, $this->baseWrite(...));
649                }
650            } else {
651                return;
652            }
653        }
654
655        $buffer = '';
656        try {
657            $buffer = @fread($socket, self::READ_BUFFER_SIZE);
658        } catch (Throwable) {
659            // do nothing
660        }
661
662        // Check connection closed.
663        if ($buffer === '' || $buffer === false) {
664            if ($checkEof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
665                $this->destroy();
666                return;
667            }
668        } else {
669            $this->bytesRead += strlen($buffer);
670            if ($this->recvBuffer === '') {
671                if (!isset($buffer[static::MAX_CACHE_STRING_LENGTH]) && isset($requests[$buffer])) {
672                    ++self::$statistics['total_request'];
673                    if ($this->protocol === Http::class) {
674                        $request = clone $requests[$buffer];
675                        $request->destroy();
676                        $request->connection = $this;
677                        $this->request = $request;
678                        try {
679                            ($this->onMessage)($this, $request);
680                        } catch (Throwable $e) {
681                            $this->error($e);
682                        }
683                        return;
684                    }
685                    $request = $requests[$buffer];
686                    try {
687                        ($this->onMessage)($this, $request);
688                    } catch (Throwable $e) {
689                        $this->error($e);
690                    }
691                    return;
692                }
693                $this->recvBuffer = $buffer;
694            } else {
695                $this->recvBuffer .= $buffer;
696            }
697        }
698
699        // If the application layer protocol has been set up.
700        if ($this->protocol !== null) {
701            while ($this->recvBuffer !== '' && !$this->isPaused) {
702                // The current packet length is known.
703                if ($this->currentPackageLength) {
704                    // Data is not enough for a package.
705                    if ($this->currentPackageLength > strlen($this->recvBuffer)) {
706                        break;
707                    }
708                } else {
709                    // Get current package length.
710                    try {
711                        $this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
712                    } catch (Throwable $e) {
713                        $this->currentPackageLength = -1;
714                        Worker::safeEcho((string)$e);
715                    }
716                    // The packet length is unknown.
717                    if ($this->currentPackageLength === 0) {
718                        break;
719                    } elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
720                        // Data is not enough for a package.
721                        if ($this->currentPackageLength > strlen($this->recvBuffer)) {
722                            break;
723                        }
724                    } // Wrong package.
725                    else {
726                        Worker::safeEcho((string)(new RuntimeException("Protocol $this->protocol Error package. package_length=" . var_export($this->currentPackageLength, true))));
727                        $this->destroy();
728                        return;
729                    }
730                }
731
732                // The data is enough for a packet.
733                ++self::$statistics['total_request'];
734                // The current packet length is equal to the length of the buffer.
735                if ($one = (strlen($this->recvBuffer) === $this->currentPackageLength)) {
736                    $oneRequestBuffer = $this->recvBuffer;
737                    $this->recvBuffer = '';
738                } else {
739                    // Get a full package from the buffer.
740                    $oneRequestBuffer = substr($this->recvBuffer, 0, $this->currentPackageLength);
741                    // Remove the current package from receive buffer.
742                    $this->recvBuffer = substr($this->recvBuffer, $this->currentPackageLength);
743                }
744                // Reset the current packet length to 0.
745                $this->currentPackageLength = 0;
746                try {
747                    // Decode request buffer before Emitting onMessage callback.
748                    $request = $this->protocol::decode($oneRequestBuffer, $this);
749                    if ((!is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[static::MAX_CACHE_STRING_LENGTH])) {
750                        ($this->onMessage)($this, $request);
751                        if ($request instanceof Request) {
752                            $requests[$oneRequestBuffer] = clone $request;
753                            $requests[$oneRequestBuffer]->destroy();
754                        } else {
755                            $requests[$oneRequestBuffer] = $request;
756                        }
757                        if (count($requests) > static::MAX_CACHE_SIZE) {
758                            unset($requests[key($requests)]);
759                        }
760                        return;
761                    }
762                    ($this->onMessage)($this, $request);
763                } catch (Throwable $e) {
764                    $this->error($e);
765                }
766            }
767            return;
768        }
769
770        if ($this->recvBuffer === '' || $this->isPaused) {
771            return;
772        }
773
774        // Applications protocol is not set.
775        ++self::$statistics['total_request'];
776        try {
777            ($this->onMessage)($this, $this->recvBuffer);
778        } catch (Throwable $e) {
779            $this->error($e);
780        }
781        // Clean receive buffer.
782        $this->recvBuffer = '';
783    }
784
785    /**
786     * Base write handler.
787     *
788     * @return void
789     */
790    public function baseWrite(): void
791    {
792        $len = 0;
793        try {
794            if ($this->transport === 'ssl') {
795                $len = @fwrite($this->socket, $this->sendBuffer, 8192);
796            } else {
797                $len = @fwrite($this->socket, $this->sendBuffer);
798            }
799        } catch (Throwable) {
800        }
801        if ($len === strlen($this->sendBuffer)) {
802            $this->bytesWritten += $len;
803            $this->eventLoop->offWritable($this->socket);
804            $this->sendBuffer = '';
805            // Try to emit onBufferDrain callback when send buffer becomes empty.
806            if ($this->onBufferDrain) {
807                try {
808                    ($this->onBufferDrain)($this);
809                } catch (Throwable $e) {
810                    $this->error($e);
811                }
812            }
813            if ($this->status === self::STATUS_CLOSING) {
814                if (!empty($this->context->streamSending)) {
815                    return;
816                }
817                $this->destroy();
818            }
819            return;
820        }
821        if ($len > 0) {
822            $this->bytesWritten += $len;
823            $this->sendBuffer = substr($this->sendBuffer, $len);
824        } else {
825            ++self::$statistics['send_fail'];
826            $this->destroy();
827        }
828    }
829
830    /**
831     * SSL handshake.
832     *
833     * @param resource $socket
834     * @return bool|int
835     */
836    public function doSslHandshake($socket): bool|int
837    {
838        if (feof($socket)) {
839            $this->destroy();
840            return false;
841        }
842        $async = $this instanceof AsyncTcpConnection;
843
844        /**
845         *  We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
846         *  You can enable ssl3 by the codes below.
847         */
848        /*if($async){
849            $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT | STREAM_CRYPTO_METHOD_SSLv3_CLIENT;
850        }else{
851            $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER | STREAM_CRYPTO_METHOD_SSLv3_SERVER;
852        }*/
853
854        if ($async) {
855            $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
856        } else {
857            $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER;
858        }
859
860        // Hidden error.
861        set_error_handler(static function (int $code, string $msg): bool {
862            if (!Worker::$daemonize) {
863                Worker::safeEcho(sprintf("SSL handshake error: %s\n", $msg));
864            }
865            return true;
866        });
867        $ret = stream_socket_enable_crypto($socket, true, $type);
868        restore_error_handler();
869        // Negotiation has failed.
870        if (false === $ret) {
871            $this->destroy();
872            return false;
873        }
874        if (0 === $ret) {
875            // There isn't enough data and should try again.
876            return 0;
877        }
878        return true;
879    }
880
881    /**
882     * This method pulls all the data out of a readable stream, and writes it to the supplied destination.
883     *
884     * @param self $dest
885     * @return void
886     */
887    public function pipe(self $dest): void
888    {
889        $source = $this;
890        $this->onMessage = function ($source, $data) use ($dest) {
891            $dest->send($data);
892        };
893        $this->onClose = function () use ($dest) {
894            $dest->close();
895        };
896        $dest->onBufferFull = function () use ($source) {
897            $source->pauseRecv();
898        };
899        $dest->onBufferDrain = function () use ($source) {
900            $source->resumeRecv();
901        };
902    }
903
904    /**
905     * Remove $length of data from receive buffer.
906     *
907     * @param int $length
908     * @return void
909     */
910    public function consumeRecvBuffer(int $length): void
911    {
912        $this->recvBuffer = substr($this->recvBuffer, $length);
913    }
914
915    /**
916     * Close connection.
917     *
918     * @param mixed $data
919     * @param bool $raw
920     * @return void
921     */
922    public function close(mixed $data = null, bool $raw = false): void
923    {
924        if ($this->status === self::STATUS_CONNECTING) {
925            $this->destroy();
926            return;
927        }
928
929        if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
930            return;
931        }
932
933        if ($data !== null) {
934            $this->send($data, $raw);
935        }
936
937        $this->status = self::STATUS_CLOSING;
938
939        if ($this->sendBuffer === '') {
940            $this->destroy();
941        } else {
942            $this->pauseRecv();
943        }
944    }
945
946    /**
947     * Is ipv4.
948     *
949     * return bool.
950     */
951    public function isIpV4(): bool
952    {
953        if ($this->transport === 'unix') {
954            return false;
955        }
956        return !str_contains($this->getRemoteIp(), ':');
957    }
958
959    /**
960     * Is ipv6.
961     *
962     * return bool.
963     */
964    public function isIpV6(): bool
965    {
966        if ($this->transport === 'unix') {
967            return false;
968        }
969        return str_contains($this->getRemoteIp(), ':');
970    }
971
972    /**
973     * Get the real socket.
974     *
975     * @return resource
976     */
977    public function getSocket()
978    {
979        return $this->socket;
980    }
981
982    /**
983     * Check whether send buffer will be full.
984     *
985     * @return void
986     */
987    protected function checkBufferWillFull(): void
988    {
989        if ($this->onBufferFull && $this->maxSendBufferSize <= strlen($this->sendBuffer)) {
990            try {
991                ($this->onBufferFull)($this);
992            } catch (Throwable $e) {
993                $this->error($e);
994            }
995        }
996    }
997
998    /**
999     * Whether send buffer is full.
1000     *
1001     * @return bool
1002     */
1003    protected function bufferIsFull(): bool
1004    {
1005        // Buffer has been marked as full but still has data to send then the packet is discarded.
1006        if ($this->maxSendBufferSize <= strlen($this->sendBuffer)) {
1007            if ($this->onError) {
1008                try {
1009                    ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
1010                } catch (Throwable $e) {
1011                    $this->error($e);
1012                }
1013            }
1014            return true;
1015        }
1016        return false;
1017    }
1018
1019    /**
1020     * Whether send buffer is Empty.
1021     *
1022     * @return bool
1023     */
1024    public function bufferIsEmpty(): bool
1025    {
1026        return empty($this->sendBuffer);
1027    }
1028
1029    /**
1030     * Destroy connection.
1031     *
1032     * @return void
1033     */
1034    public function destroy(): void
1035    {
1036        // Avoid repeated calls.
1037        if ($this->status === self::STATUS_CLOSED) {
1038            return;
1039        }
1040        // Remove event listener.
1041        if($this->eventLoop !== null){
1042            $this->eventLoop->offReadable($this->socket);
1043            $this->eventLoop->offWritable($this->socket);
1044            if (DIRECTORY_SEPARATOR === '\\' && method_exists($this->eventLoop, 'offExcept')) {
1045                $this->eventLoop->offExcept($this->socket);
1046            }
1047        }
1048
1049        // Close socket.
1050        try {
1051            @fclose($this->socket);
1052        } catch (Throwable) {
1053        }
1054
1055        $this->status = self::STATUS_CLOSED;
1056        // Try to emit onClose callback.
1057        if ($this->onClose) {
1058            try {
1059                ($this->onClose)($this);
1060            } catch (Throwable $e) {
1061                $this->error($e);
1062            }
1063        }
1064        // Try to emit protocol::onClose
1065        if ($this->protocol && method_exists($this->protocol, 'onClose')) {
1066            try {
1067                $this->protocol::onClose($this);
1068            } catch (Throwable $e) {
1069                $this->error($e);
1070            }
1071        }
1072        $this->sendBuffer = $this->recvBuffer = '';
1073        $this->currentPackageLength = 0;
1074        $this->isPaused = $this->sslHandshakeCompleted = false;
1075        if ($this->status === self::STATUS_CLOSED) {
1076            // Cleaning up the callback to avoid memory leaks.
1077            $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = $this->eventLoop = $this->errorHandler = null;
1078            // Remove from worker->connections.
1079            if ($this->worker) {
1080                unset($this->worker->connections[$this->realId]);
1081            }
1082            $this->worker = null;
1083            unset(static::$connections[$this->realId]);
1084        }
1085    }
1086
1087    /**
1088     * Get the json_encode information.
1089     *
1090     * @return array
1091     */
1092    public function jsonSerialize(): array
1093    {
1094        return [
1095            'id' => $this->id,
1096            'status' => $this->getStatus(),
1097            'transport' => $this->transport,
1098            'getRemoteIp' => $this->getRemoteIp(),
1099            'remotePort' => $this->getRemotePort(),
1100            'getRemoteAddress' => $this->getRemoteAddress(),
1101            'getLocalIp' => $this->getLocalIp(),
1102            'getLocalPort' => $this->getLocalPort(),
1103            'getLocalAddress' => $this->getLocalAddress(),
1104            'isIpV4' => $this->isIpV4(),
1105            'isIpV6' => $this->isIpV6(),
1106        ];
1107    }
1108
1109    /**
1110     * __wakeup.
1111     *
1112     * @return void
1113     */
1114    public function __wakeup()
1115    {
1116        $this->isSafe = false;
1117    }
1118
1119    /**
1120     * Destruct.
1121     *
1122     * @return void
1123     */
1124    public function __destruct()
1125    {
1126        static $mod;
1127        if (!$this->isSafe) {
1128            return;
1129        }
1130        self::$statistics['connection_count']--;
1131        if (Worker::getGracefulStop()) {
1132            $mod ??= ceil((self::$statistics['connection_count'] + 1) / 3);
1133
1134            if (0 === self::$statistics['connection_count'] % $mod) {
1135                $pid = function_exists('posix_getpid') ? posix_getpid() : 0;
1136                Worker::log('worker[' . $pid . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
1137            }
1138
1139            if (0 === self::$statistics['connection_count']) {
1140                Worker::stopAll();
1141            }
1142        }
1143    }
1144}