Spaces:
Paused
Paused
| ; | |
| // connection mixins | |
| // implementation of http://dev.mysql.com/doc/internals/en/compression.html | |
| const zlib = require('zlib'); | |
| const PacketParser = require('./packet_parser.js'); | |
| function handleCompressedPacket(packet) { | |
| // eslint-disable-next-line consistent-this, no-invalid-this | |
| const connection = this; | |
| const deflatedLength = packet.readInt24(); | |
| const body = packet.readBuffer(); | |
| if (deflatedLength !== 0) { | |
| connection.inflateQueue.push(task => { | |
| zlib.inflate(body, (err, data) => { | |
| if (err) { | |
| connection._handleNetworkError(err); | |
| return; | |
| } | |
| connection._bumpCompressedSequenceId(packet.numPackets); | |
| connection._inflatedPacketsParser.execute(data); | |
| task.done(); | |
| }); | |
| }); | |
| } else { | |
| connection.inflateQueue.push(task => { | |
| connection._bumpCompressedSequenceId(packet.numPackets); | |
| connection._inflatedPacketsParser.execute(body); | |
| task.done(); | |
| }); | |
| } | |
| } | |
| function writeCompressed(buffer) { | |
| // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html | |
| // note: sending a MySQL Packet of the size 2^24β5 to 2^24β1 via compression | |
| // leads to at least one extra compressed packet. | |
| // (this is because "length of the packet before compression" need to fit | |
| // into 3 byte unsigned int. "length of the packet before compression" includes | |
| // 4 byte packet header, hence 2^24β5) | |
| const MAX_COMPRESSED_LENGTH = 16777210; | |
| let start; | |
| if (buffer.length > MAX_COMPRESSED_LENGTH) { | |
| for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) { | |
| writeCompressed.call( | |
| // eslint-disable-next-line no-invalid-this | |
| this, | |
| buffer.slice(start, start + MAX_COMPRESSED_LENGTH) | |
| ); | |
| } | |
| return; | |
| } | |
| // eslint-disable-next-line no-invalid-this, consistent-this | |
| const connection = this; | |
| let packetLen = buffer.length; | |
| const compressHeader = Buffer.allocUnsafe(7); | |
| // seqqueue is used here because zlib async execution is routed via thread pool | |
| // internally and when we have multiple compressed packets arriving we need | |
| // to assemble uncompressed result sequentially | |
| (function(seqId) { | |
| connection.deflateQueue.push(task => { | |
| zlib.deflate(buffer, (err, compressed) => { | |
| if (err) { | |
| connection._handleFatalError(err); | |
| return; | |
| } | |
| let compressedLength = compressed.length; | |
| if (compressedLength < packetLen) { | |
| compressHeader.writeUInt8(compressedLength & 0xff, 0); | |
| compressHeader.writeUInt16LE(compressedLength >> 8, 1); | |
| compressHeader.writeUInt8(seqId, 3); | |
| compressHeader.writeUInt8(packetLen & 0xff, 4); | |
| compressHeader.writeUInt16LE(packetLen >> 8, 5); | |
| connection.writeUncompressed(compressHeader); | |
| connection.writeUncompressed(compressed); | |
| } else { | |
| // http://dev.mysql.com/doc/internals/en/uncompressed-payload.html | |
| // To send an uncompressed payload: | |
| // - set length of payload before compression to 0 | |
| // - the compressed payload contains the uncompressed payload instead. | |
| compressedLength = packetLen; | |
| packetLen = 0; | |
| compressHeader.writeUInt8(compressedLength & 0xff, 0); | |
| compressHeader.writeUInt16LE(compressedLength >> 8, 1); | |
| compressHeader.writeUInt8(seqId, 3); | |
| compressHeader.writeUInt8(packetLen & 0xff, 4); | |
| compressHeader.writeUInt16LE(packetLen >> 8, 5); | |
| connection.writeUncompressed(compressHeader); | |
| connection.writeUncompressed(buffer); | |
| } | |
| task.done(); | |
| }); | |
| }); | |
| })(connection.compressedSequenceId); | |
| connection._bumpCompressedSequenceId(1); | |
| } | |
| function enableCompression(connection) { | |
| connection._lastWrittenPacketId = 0; | |
| connection._lastReceivedPacketId = 0; | |
| connection._handleCompressedPacket = handleCompressedPacket; | |
| connection._inflatedPacketsParser = new PacketParser(p => { | |
| connection.handlePacket(p); | |
| }, 4); | |
| connection._inflatedPacketsParser._lastPacket = 0; | |
| connection.packetParser = new PacketParser(packet => { | |
| connection._handleCompressedPacket(packet); | |
| }, 7); | |
| connection.writeUncompressed = connection.write; | |
| connection.write = writeCompressed; | |
| const seqqueue = require('seq-queue'); | |
| connection.inflateQueue = seqqueue.createQueue(); | |
| connection.deflateQueue = seqqueue.createQueue(); | |
| } | |
| module.exports = { | |
| enableCompression: enableCompression | |
| }; | |