| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- "use strict";
- function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }
- function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }
- function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
- function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }
- function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }
- /* eslint-disable max-classes-per-file */
- var Stream = require('readable-stream');
- var utils = require('./utils');
- var StringBuf = require('./string-buf'); // =============================================================================
- // data chunks - encapsulating incoming data
- var StringChunk = /*#__PURE__*/function () {
- function StringChunk(data, encoding) {
- _classCallCheck(this, StringChunk);
- this._data = data;
- this._encoding = encoding;
- }
- _createClass(StringChunk, [{
- key: "copy",
- // copy to target buffer
- value: function copy(target, targetOffset, offset, length) {
- return this.toBuffer().copy(target, targetOffset, offset, length);
- }
- }, {
- key: "toBuffer",
- value: function toBuffer() {
- if (!this._buffer) {
- this._buffer = Buffer.from(this._data, this._encoding);
- }
- return this._buffer;
- }
- }, {
- key: "length",
- get: function get() {
- return this.toBuffer().length;
- }
- }]);
- return StringChunk;
- }();
- var StringBufChunk = /*#__PURE__*/function () {
- function StringBufChunk(data) {
- _classCallCheck(this, StringBufChunk);
- this._data = data;
- }
- _createClass(StringBufChunk, [{
- key: "copy",
- // copy to target buffer
- value: function copy(target, targetOffset, offset, length) {
- // eslint-disable-next-line no-underscore-dangle
- return this._data._buf.copy(target, targetOffset, offset, length);
- }
- }, {
- key: "toBuffer",
- value: function toBuffer() {
- return this._data.toBuffer();
- }
- }, {
- key: "length",
- get: function get() {
- return this._data.length;
- }
- }]);
- return StringBufChunk;
- }();
- var BufferChunk = /*#__PURE__*/function () {
- function BufferChunk(data) {
- _classCallCheck(this, BufferChunk);
- this._data = data;
- }
- _createClass(BufferChunk, [{
- key: "copy",
- // copy to target buffer
- value: function copy(target, targetOffset, offset, length) {
- this._data.copy(target, targetOffset, offset, length);
- }
- }, {
- key: "toBuffer",
- value: function toBuffer() {
- return this._data;
- }
- }, {
- key: "length",
- get: function get() {
- return this._data.length;
- }
- }]);
- return BufferChunk;
- }(); // =============================================================================
- // ReadWriteBuf - a single buffer supporting simple read-write
- var ReadWriteBuf = /*#__PURE__*/function () {
- function ReadWriteBuf(size) {
- _classCallCheck(this, ReadWriteBuf);
- this.size = size; // the buffer
- this.buffer = Buffer.alloc(size); // read index
- this.iRead = 0; // write index
- this.iWrite = 0;
- }
- _createClass(ReadWriteBuf, [{
- key: "toBuffer",
- value: function toBuffer() {
- if (this.iRead === 0 && this.iWrite === this.size) {
- return this.buffer;
- }
- var buf = Buffer.alloc(this.iWrite - this.iRead);
- this.buffer.copy(buf, 0, this.iRead, this.iWrite);
- return buf;
- }
- }, {
- key: "read",
- value: function read(size) {
- var buf; // read size bytes from buffer and return buffer
- if (size === 0) {
- // special case - return null if no data requested
- return null;
- }
- if (size === undefined || size >= this.length) {
- // if no size specified or size is at least what we have then return all of the bytes
- buf = this.toBuffer();
- this.iRead = this.iWrite;
- return buf;
- } // otherwise return a chunk
- buf = Buffer.alloc(size);
- this.buffer.copy(buf, 0, this.iRead, size);
- this.iRead += size;
- return buf;
- }
- }, {
- key: "write",
- value: function write(chunk, offset, length) {
- // write as many bytes from data from optional source offset
- // and return number of bytes written
- var size = Math.min(length, this.size - this.iWrite);
- chunk.copy(this.buffer, this.iWrite, offset, offset + size);
- this.iWrite += size;
- return size;
- }
- }, {
- key: "length",
- get: function get() {
- return this.iWrite - this.iRead;
- }
- }, {
- key: "eod",
- get: function get() {
- return this.iRead === this.iWrite;
- }
- }, {
- key: "full",
- get: function get() {
- return this.iWrite === this.size;
- }
- }]);
- return ReadWriteBuf;
- }(); // =============================================================================
- // StreamBuf - a multi-purpose read-write stream
- // As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
- // As StreamHub - pipe to multiple writables
- // As readable stream - feed data into the writable part and have some other code read from it.
- // Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
- // integration tests
- var StreamBuf = function StreamBuf(options) {
- options = options || {};
- this.bufSize = options.bufSize || 1024 * 1024;
- this.buffers = []; // batch mode fills a buffer completely before passing the data on
- // to pipes or 'readable' event listeners
- this.batch = options.batch || false;
- this.corked = false; // where in the current writable buffer we're up to
- this.inPos = 0; // where in the current readable buffer we've read up to
- this.outPos = 0; // consuming pipe streams go here
- this.pipes = []; // controls emit('data')
- this.paused = false;
- this.encoding = null;
- };
- utils.inherits(StreamBuf, Stream.Duplex, {
- toBuffer: function toBuffer() {
- switch (this.buffers.length) {
- case 0:
- return null;
- case 1:
- return this.buffers[0].toBuffer();
- default:
- return Buffer.concat(this.buffers.map(function (rwBuf) {
- return rwBuf.toBuffer();
- }));
- }
- },
- // writable
- // event drain - if write returns false (which it won't), indicates when safe to write again.
- // finish - end() has been called
- // pipe(src) - pipe() has been called on readable
- // unpipe(src) - unpipe() has been called on readable
- // error - duh
- _getWritableBuffer: function _getWritableBuffer() {
- if (this.buffers.length) {
- var last = this.buffers[this.buffers.length - 1];
- if (!last.full) {
- return last;
- }
- }
- var buf = new ReadWriteBuf(this.bufSize);
- this.buffers.push(buf);
- return buf;
- },
- _pipe: function _pipe(chunk) {
- var _this = this;
- return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee() {
- var write;
- return regeneratorRuntime.wrap(function _callee$(_context) {
- while (1) {
- switch (_context.prev = _context.next) {
- case 0:
- write = function write(pipe) {
- return new Promise(function (resolve) {
- pipe.write(chunk.toBuffer(), function () {
- resolve();
- });
- });
- };
- _context.next = 3;
- return Promise.all(_this.pipes.map(write));
- case 3:
- case "end":
- return _context.stop();
- }
- }
- }, _callee);
- }))();
- },
- _writeToBuffers: function _writeToBuffers(chunk) {
- var inPos = 0;
- var inLen = chunk.length;
- while (inPos < inLen) {
- // find writable buffer
- var buffer = this._getWritableBuffer(); // write some data
- inPos += buffer.write(chunk, inPos, inLen - inPos);
- }
- },
- write: function write(data, encoding, callback) {
- var _this2 = this;
- return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee2() {
- var chunk;
- return regeneratorRuntime.wrap(function _callee2$(_context2) {
- while (1) {
- switch (_context2.prev = _context2.next) {
- case 0:
- if (encoding instanceof Function) {
- callback = encoding;
- encoding = 'utf8';
- }
- callback = callback || utils.nop; // encapsulate data into a chunk
- if (!(data instanceof StringBuf)) {
- _context2.next = 6;
- break;
- }
- chunk = new StringBufChunk(data);
- _context2.next = 15;
- break;
- case 6:
- if (!(data instanceof Buffer)) {
- _context2.next = 10;
- break;
- }
- chunk = new BufferChunk(data);
- _context2.next = 15;
- break;
- case 10:
- if (!(typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer)) {
- _context2.next = 14;
- break;
- }
- chunk = new StringChunk(data, encoding);
- _context2.next = 15;
- break;
- case 14:
- throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
- case 15:
- if (!_this2.pipes.length) {
- _context2.next = 31;
- break;
- }
- if (!_this2.batch) {
- _context2.next = 21;
- break;
- }
- _this2._writeToBuffers(chunk);
- while (!_this2.corked && _this2.buffers.length > 1) {
- _this2._pipe(_this2.buffers.shift());
- }
- _context2.next = 29;
- break;
- case 21:
- if (_this2.corked) {
- _context2.next = 27;
- break;
- }
- _context2.next = 24;
- return _this2._pipe(chunk);
- case 24:
- callback();
- _context2.next = 29;
- break;
- case 27:
- _this2._writeToBuffers(chunk);
- process.nextTick(callback);
- case 29:
- _context2.next = 34;
- break;
- case 31:
- if (!_this2.paused) {
- _this2.emit('data', chunk.toBuffer());
- }
- _this2._writeToBuffers(chunk);
- _this2.emit('readable');
- case 34:
- return _context2.abrupt("return", true);
- case 35:
- case "end":
- return _context2.stop();
- }
- }
- }, _callee2);
- }))();
- },
- cork: function cork() {
- this.corked = true;
- },
- _flush: function _flush()
- /* destination */
- {
- // if we have comsumers...
- if (this.pipes.length) {
- // and there's stuff not written
- while (this.buffers.length) {
- this._pipe(this.buffers.shift());
- }
- }
- },
- uncork: function uncork() {
- this.corked = false;
- this._flush();
- },
- end: function end(chunk, encoding, callback) {
- var _this3 = this;
- var writeComplete = function writeComplete(error) {
- if (error) {
- callback(error);
- } else {
- _this3._flush();
- _this3.pipes.forEach(function (pipe) {
- pipe.end();
- });
- _this3.emit('finish');
- }
- };
- if (chunk) {
- this.write(chunk, encoding, writeComplete);
- } else {
- writeComplete();
- }
- },
- // readable
- // event readable - some data is now available
- // event data - switch to flowing mode - feeds chunks to handler
- // event end - no more data
- // event close - optional, indicates upstream close
- // event error - duh
- read: function read(size) {
- var buffers; // read min(buffer, size || infinity)
- if (size) {
- buffers = [];
- while (size && this.buffers.length && !this.buffers[0].eod) {
- var first = this.buffers[0];
- var buffer = first.read(size);
- size -= buffer.length;
- buffers.push(buffer);
- if (first.eod && first.full) {
- this.buffers.shift();
- }
- }
- return Buffer.concat(buffers);
- }
- buffers = this.buffers.map(function (buf) {
- return buf.toBuffer();
- }).filter(Boolean);
- this.buffers = [];
- return Buffer.concat(buffers);
- },
- setEncoding: function setEncoding(encoding) {
- // causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
- this.encoding = encoding;
- },
- pause: function pause() {
- this.paused = true;
- },
- resume: function resume() {
- this.paused = false;
- },
- isPaused: function isPaused() {
- return !!this.paused;
- },
- pipe: function pipe(destination) {
- // add destination to pipe list & write current buffer
- this.pipes.push(destination);
- if (!this.paused && this.buffers.length) {
- this.end();
- }
- },
- unpipe: function unpipe(destination) {
- // remove destination from pipe list
- this.pipes = this.pipes.filter(function (pipe) {
- return pipe !== destination;
- });
- },
- unshift: function unshift()
- /* chunk */
- {
- // some numpty has read some data that's not for them and they want to put it back!
- // Might implement this some day
- throw new Error('Not Implemented');
- },
- wrap: function wrap()
- /* stream */
- {
- // not implemented
- throw new Error('Not Implemented');
- }
- });
- module.exports = StreamBuf;
- //# sourceMappingURL=stream-buf.js.map
|