compressed_protocol.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. 'use strict';
  2. // connection mixins
  3. // implementation of http://dev.mysql.com/doc/internals/en/compression.html
  4. const zlib = require('zlib');
  5. const PacketParser = require('./packet_parser.js');
  6. class Queue {
  7. constructor() {
  8. this._queue = [];
  9. this._running = false;
  10. }
  11. push(fn) {
  12. this._queue.push(fn);
  13. if (!this._running) {
  14. this._running = true;
  15. process.nextTick(() => this._next());
  16. }
  17. }
  18. _next() {
  19. const task = this._queue.shift();
  20. if (!task) {
  21. this._running = false;
  22. return;
  23. }
  24. task({
  25. done: () => process.nextTick(() => this._next()),
  26. });
  27. }
  28. }
  29. function handleCompressedPacket(packet) {
  30. // eslint-disable-next-line consistent-this, no-invalid-this
  31. const connection = this;
  32. const deflatedLength = packet.readInt24();
  33. const body = packet.readBuffer();
  34. if (deflatedLength !== 0) {
  35. connection.inflateQueue.push((task) => {
  36. zlib.inflate(body, (err, data) => {
  37. if (err) {
  38. connection._handleNetworkError(err);
  39. return;
  40. }
  41. connection._bumpCompressedSequenceId(packet.numPackets);
  42. connection._inflatedPacketsParser.execute(data);
  43. task.done();
  44. });
  45. });
  46. } else {
  47. connection.inflateQueue.push((task) => {
  48. connection._bumpCompressedSequenceId(packet.numPackets);
  49. connection._inflatedPacketsParser.execute(body);
  50. task.done();
  51. });
  52. }
  53. }
  54. function writeCompressed(buffer) {
  55. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  56. // note: sending a MySQL Packet of the size 2^24−5 to 2^24−1 via compression
  57. // leads to at least one extra compressed packet.
  58. // (this is because "length of the packet before compression" need to fit
  59. // into 3 byte unsigned int. "length of the packet before compression" includes
  60. // 4 byte packet header, hence 2^24−5)
  61. const MAX_COMPRESSED_LENGTH = 16777210;
  62. let start;
  63. if (buffer.length > MAX_COMPRESSED_LENGTH) {
  64. for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) {
  65. writeCompressed.call(
  66. // eslint-disable-next-line no-invalid-this
  67. this,
  68. buffer.slice(start, start + MAX_COMPRESSED_LENGTH)
  69. );
  70. }
  71. return;
  72. }
  73. // eslint-disable-next-line no-invalid-this, consistent-this
  74. const connection = this;
  75. let packetLen = buffer.length;
  76. const compressHeader = Buffer.allocUnsafe(7);
  77. // seqqueue is used here because zlib async execution is routed via thread pool
  78. // internally and when we have multiple compressed packets arriving we need
  79. // to assemble uncompressed result sequentially
  80. (function (seqId) {
  81. connection.deflateQueue.push((task) => {
  82. zlib.deflate(buffer, (err, compressed) => {
  83. if (err) {
  84. connection._handleFatalError(err);
  85. return;
  86. }
  87. let compressedLength = compressed.length;
  88. if (compressedLength < packetLen) {
  89. compressHeader.writeUInt8(compressedLength & 0xff, 0);
  90. compressHeader.writeUInt16LE(compressedLength >> 8, 1);
  91. compressHeader.writeUInt8(seqId, 3);
  92. compressHeader.writeUInt8(packetLen & 0xff, 4);
  93. compressHeader.writeUInt16LE(packetLen >> 8, 5);
  94. connection.writeUncompressed(compressHeader);
  95. connection.writeUncompressed(compressed);
  96. } else {
  97. // http://dev.mysql.com/doc/internals/en/uncompressed-payload.html
  98. // To send an uncompressed payload:
  99. // - set length of payload before compression to 0
  100. // - the compressed payload contains the uncompressed payload instead.
  101. compressedLength = packetLen;
  102. packetLen = 0;
  103. compressHeader.writeUInt8(compressedLength & 0xff, 0);
  104. compressHeader.writeUInt16LE(compressedLength >> 8, 1);
  105. compressHeader.writeUInt8(seqId, 3);
  106. compressHeader.writeUInt8(packetLen & 0xff, 4);
  107. compressHeader.writeUInt16LE(packetLen >> 8, 5);
  108. connection.writeUncompressed(compressHeader);
  109. connection.writeUncompressed(buffer);
  110. }
  111. task.done();
  112. });
  113. });
  114. })(connection.compressedSequenceId);
  115. connection._bumpCompressedSequenceId(1);
  116. }
  117. function enableCompression(connection) {
  118. connection._lastWrittenPacketId = 0;
  119. connection._lastReceivedPacketId = 0;
  120. connection._handleCompressedPacket = handleCompressedPacket;
  121. connection._inflatedPacketsParser = new PacketParser((p) => {
  122. connection.handlePacket(p);
  123. }, 4);
  124. connection._inflatedPacketsParser._lastPacket = 0;
  125. connection.packetParser = new PacketParser((packet) => {
  126. connection._handleCompressedPacket(packet);
  127. }, 7);
  128. connection.writeUncompressed = connection.write;
  129. connection.write = writeCompressed;
  130. connection.inflateQueue = new Queue();
  131. connection.deflateQueue = new Queue();
  132. }
  133. module.exports = {
  134. enableCompression: enableCompression,
  135. Queue: Queue,
  136. };