AxiosTransformStream.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. 'use strict';
  2. import stream from 'stream';
  3. import utils from '../utils.js';
  4. const kInternals = Symbol('internals');
  5. class AxiosTransformStream extends stream.Transform {
  6. constructor(options) {
  7. options = utils.toFlatObject(
  8. options,
  9. {
  10. maxRate: 0,
  11. chunkSize: 64 * 1024,
  12. minChunkSize: 100,
  13. timeWindow: 500,
  14. ticksRate: 2,
  15. samplesCount: 15,
  16. },
  17. null,
  18. (prop, source) => {
  19. return !utils.isUndefined(source[prop]);
  20. }
  21. );
  22. super({
  23. readableHighWaterMark: options.chunkSize,
  24. });
  25. const internals = (this[kInternals] = {
  26. timeWindow: options.timeWindow,
  27. chunkSize: options.chunkSize,
  28. maxRate: options.maxRate,
  29. minChunkSize: options.minChunkSize,
  30. bytesSeen: 0,
  31. isCaptured: false,
  32. notifiedBytesLoaded: 0,
  33. ts: Date.now(),
  34. bytes: 0,
  35. onReadCallback: null,
  36. });
  37. this.on('newListener', (event) => {
  38. if (event === 'progress') {
  39. if (!internals.isCaptured) {
  40. internals.isCaptured = true;
  41. }
  42. }
  43. });
  44. }
  45. _read(size) {
  46. const internals = this[kInternals];
  47. if (internals.onReadCallback) {
  48. internals.onReadCallback();
  49. }
  50. return super._read(size);
  51. }
  52. _transform(chunk, encoding, callback) {
  53. const internals = this[kInternals];
  54. const maxRate = internals.maxRate;
  55. const readableHighWaterMark = this.readableHighWaterMark;
  56. const timeWindow = internals.timeWindow;
  57. const divider = 1000 / timeWindow;
  58. const bytesThreshold = maxRate / divider;
  59. const minChunkSize =
  60. internals.minChunkSize !== false
  61. ? Math.max(internals.minChunkSize, bytesThreshold * 0.01)
  62. : 0;
  63. const pushChunk = (_chunk, _callback) => {
  64. const bytes = Buffer.byteLength(_chunk);
  65. internals.bytesSeen += bytes;
  66. internals.bytes += bytes;
  67. internals.isCaptured && this.emit('progress', internals.bytesSeen);
  68. if (this.push(_chunk)) {
  69. process.nextTick(_callback);
  70. } else {
  71. internals.onReadCallback = () => {
  72. internals.onReadCallback = null;
  73. process.nextTick(_callback);
  74. };
  75. }
  76. };
  77. const transformChunk = (_chunk, _callback) => {
  78. const chunkSize = Buffer.byteLength(_chunk);
  79. let chunkRemainder = null;
  80. let maxChunkSize = readableHighWaterMark;
  81. let bytesLeft;
  82. let passed = 0;
  83. if (maxRate) {
  84. const now = Date.now();
  85. if (!internals.ts || (passed = now - internals.ts) >= timeWindow) {
  86. internals.ts = now;
  87. bytesLeft = bytesThreshold - internals.bytes;
  88. internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
  89. passed = 0;
  90. }
  91. bytesLeft = bytesThreshold - internals.bytes;
  92. }
  93. if (maxRate) {
  94. if (bytesLeft <= 0) {
  95. // next time window
  96. return setTimeout(() => {
  97. _callback(null, _chunk);
  98. }, timeWindow - passed);
  99. }
  100. if (bytesLeft < maxChunkSize) {
  101. maxChunkSize = bytesLeft;
  102. }
  103. }
  104. if (maxChunkSize && chunkSize > maxChunkSize && chunkSize - maxChunkSize > minChunkSize) {
  105. chunkRemainder = _chunk.subarray(maxChunkSize);
  106. _chunk = _chunk.subarray(0, maxChunkSize);
  107. }
  108. pushChunk(
  109. _chunk,
  110. chunkRemainder
  111. ? () => {
  112. process.nextTick(_callback, null, chunkRemainder);
  113. }
  114. : _callback
  115. );
  116. };
  117. transformChunk(chunk, function transformNextChunk(err, _chunk) {
  118. if (err) {
  119. return callback(err);
  120. }
  121. if (_chunk) {
  122. transformChunk(_chunk, transformNextChunk);
  123. } else {
  124. callback(null);
  125. }
  126. });
  127. }
  128. }
  129. export default AxiosTransformStream;