stream-buf.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. "use strict";
  2. function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }
  3. function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }
  4. function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
  5. function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }
  6. function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }
  7. /* eslint-disable max-classes-per-file */
  8. var Stream = require('readable-stream');
  9. var utils = require('./utils');
  10. var StringBuf = require('./string-buf'); // =============================================================================
  11. // data chunks - encapsulating incoming data
  12. var StringChunk = /*#__PURE__*/function () {
  13. function StringChunk(data, encoding) {
  14. _classCallCheck(this, StringChunk);
  15. this._data = data;
  16. this._encoding = encoding;
  17. }
  18. _createClass(StringChunk, [{
  19. key: "copy",
  20. // copy to target buffer
  21. value: function copy(target, targetOffset, offset, length) {
  22. return this.toBuffer().copy(target, targetOffset, offset, length);
  23. }
  24. }, {
  25. key: "toBuffer",
  26. value: function toBuffer() {
  27. if (!this._buffer) {
  28. this._buffer = Buffer.from(this._data, this._encoding);
  29. }
  30. return this._buffer;
  31. }
  32. }, {
  33. key: "length",
  34. get: function get() {
  35. return this.toBuffer().length;
  36. }
  37. }]);
  38. return StringChunk;
  39. }();
  40. var StringBufChunk = /*#__PURE__*/function () {
  41. function StringBufChunk(data) {
  42. _classCallCheck(this, StringBufChunk);
  43. this._data = data;
  44. }
  45. _createClass(StringBufChunk, [{
  46. key: "copy",
  47. // copy to target buffer
  48. value: function copy(target, targetOffset, offset, length) {
  49. // eslint-disable-next-line no-underscore-dangle
  50. return this._data._buf.copy(target, targetOffset, offset, length);
  51. }
  52. }, {
  53. key: "toBuffer",
  54. value: function toBuffer() {
  55. return this._data.toBuffer();
  56. }
  57. }, {
  58. key: "length",
  59. get: function get() {
  60. return this._data.length;
  61. }
  62. }]);
  63. return StringBufChunk;
  64. }();
  65. var BufferChunk = /*#__PURE__*/function () {
  66. function BufferChunk(data) {
  67. _classCallCheck(this, BufferChunk);
  68. this._data = data;
  69. }
  70. _createClass(BufferChunk, [{
  71. key: "copy",
  72. // copy to target buffer
  73. value: function copy(target, targetOffset, offset, length) {
  74. this._data.copy(target, targetOffset, offset, length);
  75. }
  76. }, {
  77. key: "toBuffer",
  78. value: function toBuffer() {
  79. return this._data;
  80. }
  81. }, {
  82. key: "length",
  83. get: function get() {
  84. return this._data.length;
  85. }
  86. }]);
  87. return BufferChunk;
  88. }(); // =============================================================================
  89. // ReadWriteBuf - a single buffer supporting simple read-write
  90. var ReadWriteBuf = /*#__PURE__*/function () {
  91. function ReadWriteBuf(size) {
  92. _classCallCheck(this, ReadWriteBuf);
  93. this.size = size; // the buffer
  94. this.buffer = Buffer.alloc(size); // read index
  95. this.iRead = 0; // write index
  96. this.iWrite = 0;
  97. }
  98. _createClass(ReadWriteBuf, [{
  99. key: "toBuffer",
  100. value: function toBuffer() {
  101. if (this.iRead === 0 && this.iWrite === this.size) {
  102. return this.buffer;
  103. }
  104. var buf = Buffer.alloc(this.iWrite - this.iRead);
  105. this.buffer.copy(buf, 0, this.iRead, this.iWrite);
  106. return buf;
  107. }
  108. }, {
  109. key: "read",
  110. value: function read(size) {
  111. var buf; // read size bytes from buffer and return buffer
  112. if (size === 0) {
  113. // special case - return null if no data requested
  114. return null;
  115. }
  116. if (size === undefined || size >= this.length) {
  117. // if no size specified or size is at least what we have then return all of the bytes
  118. buf = this.toBuffer();
  119. this.iRead = this.iWrite;
  120. return buf;
  121. } // otherwise return a chunk
  122. buf = Buffer.alloc(size);
  123. this.buffer.copy(buf, 0, this.iRead, size);
  124. this.iRead += size;
  125. return buf;
  126. }
  127. }, {
  128. key: "write",
  129. value: function write(chunk, offset, length) {
  130. // write as many bytes from data from optional source offset
  131. // and return number of bytes written
  132. var size = Math.min(length, this.size - this.iWrite);
  133. chunk.copy(this.buffer, this.iWrite, offset, offset + size);
  134. this.iWrite += size;
  135. return size;
  136. }
  137. }, {
  138. key: "length",
  139. get: function get() {
  140. return this.iWrite - this.iRead;
  141. }
  142. }, {
  143. key: "eod",
  144. get: function get() {
  145. return this.iRead === this.iWrite;
  146. }
  147. }, {
  148. key: "full",
  149. get: function get() {
  150. return this.iWrite === this.size;
  151. }
  152. }]);
  153. return ReadWriteBuf;
  154. }(); // =============================================================================
  155. // StreamBuf - a multi-purpose read-write stream
  156. // As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
  157. // As StreamHub - pipe to multiple writables
  158. // As readable stream - feed data into the writable part and have some other code read from it.
  159. // Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
  160. // integration tests
  161. var StreamBuf = function StreamBuf(options) {
  162. options = options || {};
  163. this.bufSize = options.bufSize || 1024 * 1024;
  164. this.buffers = []; // batch mode fills a buffer completely before passing the data on
  165. // to pipes or 'readable' event listeners
  166. this.batch = options.batch || false;
  167. this.corked = false; // where in the current writable buffer we're up to
  168. this.inPos = 0; // where in the current readable buffer we've read up to
  169. this.outPos = 0; // consuming pipe streams go here
  170. this.pipes = []; // controls emit('data')
  171. this.paused = false;
  172. this.encoding = null;
  173. };
  174. utils.inherits(StreamBuf, Stream.Duplex, {
  175. toBuffer: function toBuffer() {
  176. switch (this.buffers.length) {
  177. case 0:
  178. return null;
  179. case 1:
  180. return this.buffers[0].toBuffer();
  181. default:
  182. return Buffer.concat(this.buffers.map(function (rwBuf) {
  183. return rwBuf.toBuffer();
  184. }));
  185. }
  186. },
  187. // writable
  188. // event drain - if write returns false (which it won't), indicates when safe to write again.
  189. // finish - end() has been called
  190. // pipe(src) - pipe() has been called on readable
  191. // unpipe(src) - unpipe() has been called on readable
  192. // error - duh
  193. _getWritableBuffer: function _getWritableBuffer() {
  194. if (this.buffers.length) {
  195. var last = this.buffers[this.buffers.length - 1];
  196. if (!last.full) {
  197. return last;
  198. }
  199. }
  200. var buf = new ReadWriteBuf(this.bufSize);
  201. this.buffers.push(buf);
  202. return buf;
  203. },
  204. _pipe: function _pipe(chunk) {
  205. var _this = this;
  206. return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee() {
  207. var write;
  208. return regeneratorRuntime.wrap(function _callee$(_context) {
  209. while (1) {
  210. switch (_context.prev = _context.next) {
  211. case 0:
  212. write = function write(pipe) {
  213. return new Promise(function (resolve) {
  214. pipe.write(chunk.toBuffer(), function () {
  215. resolve();
  216. });
  217. });
  218. };
  219. _context.next = 3;
  220. return Promise.all(_this.pipes.map(write));
  221. case 3:
  222. case "end":
  223. return _context.stop();
  224. }
  225. }
  226. }, _callee);
  227. }))();
  228. },
  229. _writeToBuffers: function _writeToBuffers(chunk) {
  230. var inPos = 0;
  231. var inLen = chunk.length;
  232. while (inPos < inLen) {
  233. // find writable buffer
  234. var buffer = this._getWritableBuffer(); // write some data
  235. inPos += buffer.write(chunk, inPos, inLen - inPos);
  236. }
  237. },
  238. write: function write(data, encoding, callback) {
  239. var _this2 = this;
  240. return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee2() {
  241. var chunk;
  242. return regeneratorRuntime.wrap(function _callee2$(_context2) {
  243. while (1) {
  244. switch (_context2.prev = _context2.next) {
  245. case 0:
  246. if (encoding instanceof Function) {
  247. callback = encoding;
  248. encoding = 'utf8';
  249. }
  250. callback = callback || utils.nop; // encapsulate data into a chunk
  251. if (!(data instanceof StringBuf)) {
  252. _context2.next = 6;
  253. break;
  254. }
  255. chunk = new StringBufChunk(data);
  256. _context2.next = 15;
  257. break;
  258. case 6:
  259. if (!(data instanceof Buffer)) {
  260. _context2.next = 10;
  261. break;
  262. }
  263. chunk = new BufferChunk(data);
  264. _context2.next = 15;
  265. break;
  266. case 10:
  267. if (!(typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer)) {
  268. _context2.next = 14;
  269. break;
  270. }
  271. chunk = new StringChunk(data, encoding);
  272. _context2.next = 15;
  273. break;
  274. case 14:
  275. throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
  276. case 15:
  277. if (!_this2.pipes.length) {
  278. _context2.next = 31;
  279. break;
  280. }
  281. if (!_this2.batch) {
  282. _context2.next = 21;
  283. break;
  284. }
  285. _this2._writeToBuffers(chunk);
  286. while (!_this2.corked && _this2.buffers.length > 1) {
  287. _this2._pipe(_this2.buffers.shift());
  288. }
  289. _context2.next = 29;
  290. break;
  291. case 21:
  292. if (_this2.corked) {
  293. _context2.next = 27;
  294. break;
  295. }
  296. _context2.next = 24;
  297. return _this2._pipe(chunk);
  298. case 24:
  299. callback();
  300. _context2.next = 29;
  301. break;
  302. case 27:
  303. _this2._writeToBuffers(chunk);
  304. process.nextTick(callback);
  305. case 29:
  306. _context2.next = 34;
  307. break;
  308. case 31:
  309. if (!_this2.paused) {
  310. _this2.emit('data', chunk.toBuffer());
  311. }
  312. _this2._writeToBuffers(chunk);
  313. _this2.emit('readable');
  314. case 34:
  315. return _context2.abrupt("return", true);
  316. case 35:
  317. case "end":
  318. return _context2.stop();
  319. }
  320. }
  321. }, _callee2);
  322. }))();
  323. },
  324. cork: function cork() {
  325. this.corked = true;
  326. },
  327. _flush: function _flush()
  328. /* destination */
  329. {
  330. // if we have comsumers...
  331. if (this.pipes.length) {
  332. // and there's stuff not written
  333. while (this.buffers.length) {
  334. this._pipe(this.buffers.shift());
  335. }
  336. }
  337. },
  338. uncork: function uncork() {
  339. this.corked = false;
  340. this._flush();
  341. },
  342. end: function end(chunk, encoding, callback) {
  343. var _this3 = this;
  344. var writeComplete = function writeComplete(error) {
  345. if (error) {
  346. callback(error);
  347. } else {
  348. _this3._flush();
  349. _this3.pipes.forEach(function (pipe) {
  350. pipe.end();
  351. });
  352. _this3.emit('finish');
  353. }
  354. };
  355. if (chunk) {
  356. this.write(chunk, encoding, writeComplete);
  357. } else {
  358. writeComplete();
  359. }
  360. },
  361. // readable
  362. // event readable - some data is now available
  363. // event data - switch to flowing mode - feeds chunks to handler
  364. // event end - no more data
  365. // event close - optional, indicates upstream close
  366. // event error - duh
  367. read: function read(size) {
  368. var buffers; // read min(buffer, size || infinity)
  369. if (size) {
  370. buffers = [];
  371. while (size && this.buffers.length && !this.buffers[0].eod) {
  372. var first = this.buffers[0];
  373. var buffer = first.read(size);
  374. size -= buffer.length;
  375. buffers.push(buffer);
  376. if (first.eod && first.full) {
  377. this.buffers.shift();
  378. }
  379. }
  380. return Buffer.concat(buffers);
  381. }
  382. buffers = this.buffers.map(function (buf) {
  383. return buf.toBuffer();
  384. }).filter(Boolean);
  385. this.buffers = [];
  386. return Buffer.concat(buffers);
  387. },
  388. setEncoding: function setEncoding(encoding) {
  389. // causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
  390. this.encoding = encoding;
  391. },
  392. pause: function pause() {
  393. this.paused = true;
  394. },
  395. resume: function resume() {
  396. this.paused = false;
  397. },
  398. isPaused: function isPaused() {
  399. return !!this.paused;
  400. },
  401. pipe: function pipe(destination) {
  402. // add destination to pipe list & write current buffer
  403. this.pipes.push(destination);
  404. if (!this.paused && this.buffers.length) {
  405. this.end();
  406. }
  407. },
  408. unpipe: function unpipe(destination) {
  409. // remove destination from pipe list
  410. this.pipes = this.pipes.filter(function (pipe) {
  411. return pipe !== destination;
  412. });
  413. },
  414. unshift: function unshift()
  415. /* chunk */
  416. {
  417. // some numpty has read some data that's not for them and they want to put it back!
  418. // Might implement this some day
  419. throw new Error('Not Implemented');
  420. },
  421. wrap: function wrap()
  422. /* stream */
  423. {
  424. // not implemented
  425. throw new Error('Not Implemented');
  426. }
  427. });
  428. module.exports = StreamBuf;
  429. //# sourceMappingURL=stream-buf.js.map