Skip to content

Commit

Permalink
Support protocol packages larger than 16 MiB #47
Browse files Browse the repository at this point in the history
  • Loading branch information
Dejan Markic committed Apr 18, 2024
1 parent 3cf70c6 commit 7c479e2
Showing 1 changed file with 73 additions and 15 deletions.
88 changes: 73 additions & 15 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class Parser
* @var Executor
*/
protected $executor;
/**
* Current packet for split packet paring
*/
protected $packet = null;

public function __construct(DuplexStreamInterface $stream, Executor $executor)
{
Expand Down Expand Up @@ -165,22 +169,48 @@ public function handleData($data)
return;
}

$packet = $this->buffer->readBuffer($this->pctSize);
if ($this->packet !== null) {
/**
* We are in packet splitting
* Append data
*/
$packet = null;
$this->packet->append($this->buffer->read($this->pctSize));
if ($this->pctSize < 0xffffff) {
/**
* We're done
*/
$packet = $this->packet;
$this->packet = null;
}
} else {
$packet = $this->buffer->readBuffer($this->pctSize);
}
/**
* Remember last packet size as split packets may have ended with 0 length packet.
*/
$lastPctSize = $this->pctSize;
$this->state = self::STATE_STANDBY;
$this->pctSize = self::PACKET_SIZE_HEADER;
if ($this->packet === null && $packet->length() === 0xffffff && $lastPctSize > 0) {
/**
* Start reading split packets
*/
$this->packet = $packet;
} elseif ($packet !== null) {
try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}

try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}

if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
}
}
}
}
Expand Down Expand Up @@ -262,7 +292,7 @@ private function parsePacket(Buffer $packet)
$this->debug(sprintf("AffectedRows: %d, InsertId: %d, WarningCount:%d", $this->affectedRows, $this->insertId, $this->warningCount));
$this->onSuccess();
$this->nextRequest();
} elseif ($fieldCount === 0xFE) {
} elseif ($fieldCount === 0xFE && $packet->length() < 0xfffffe) {
// EOF Packet
$packet->skip(4); // warn, status
if ($this->rsState === self::RS_STATE_ROW) {
Expand Down Expand Up @@ -388,7 +418,35 @@ public function onClose()

public function sendPacket($packet)
{
return $this->stream->write($this->buffer->buildInt3(\strlen($packet)) . $this->buffer->buildInt1($this->seq++) . $packet);
/**
* If packet is longer than 0xffffff (16M), we should split and send many packets
*
*/
$packet_len = \strlen($packet);
$this->debug('sendPacket: len: ' . $packet_len);

if ($packet_len >= 0xffffff) {
$this->debug('Packet split: packet_len: ' . $packet_len);
$ret = null;
while ($packet_len > 0) {
$part = \substr($packet, 0, 0xffffff);
$part_len = \strlen($part);
$ret = $this->stream->write($this->buffer->buildInt3($part_len) . $this->buffer->buildInt1($this->seq++) . $part);
$packet = \substr($packet, $part_len);
$packet_len = \strlen($packet);
// If last part was exactly 0xffffff in size, we need to send an empty packet to signal end
// of packet splitting.
if (\strlen($packet) === 0 && $part_len === 0xffffff) {
$ret = $this->stream->write($this->buffer->buildInt3(0) . $this->buffer->buildInt1($this->seq++));
}
}
$this->debug('Packet sent');
return $ret;
}
/**
* Packet is below 16M
*/
return $this->stream->write($this->buffer->buildInt3($packet_len) . $this->buffer->buildInt1($this->seq++) . $packet);
}

protected function nextRequest($isHandshake = false)
Expand Down

0 comments on commit 7c479e2

Please sign in to comment.