RollingFileWriteStream.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. const debug = require("debug")("streamroller:RollingFileWriteStream");
  2. const _ = require("lodash");
  3. const async = require("async");
  4. const fs = require("fs-extra");
  5. const zlib = require("zlib");
  6. const path = require("path");
  7. const newNow = require("./now");
  8. const format = require("date-format");
  9. const { Writable } = require("stream");
  10. const FILENAME_SEP = ".";
  11. const ZIP_EXT = ".gz";
  12. const moveAndMaybeCompressFile = (
  13. sourceFilePath,
  14. targetFilePath,
  15. needCompress,
  16. done
  17. ) => {
  18. if (sourceFilePath === targetFilePath) {
  19. debug(
  20. `moveAndMaybeCompressFile: source and target are the same, not doing anything`
  21. );
  22. return done();
  23. }
  24. fs.access(sourceFilePath, fs.constants.W_OK | fs.constants.R_OK, e => {
  25. if (e) {
  26. debug(
  27. `moveAndMaybeCompressFile: source file path does not exist. not moving. sourceFilePath=${sourceFilePath}`
  28. );
  29. return done();
  30. }
  31. debug(
  32. `moveAndMaybeCompressFile: moving file from ${sourceFilePath} to ${targetFilePath} ${
  33. needCompress ? "with" : "without"
  34. } compress`
  35. );
  36. if (needCompress) {
  37. fs.createReadStream(sourceFilePath)
  38. .pipe(zlib.createGzip())
  39. .pipe(fs.createWriteStream(targetFilePath))
  40. .on("finish", () => {
  41. debug(
  42. `moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`
  43. );
  44. fs.unlink(sourceFilePath, done);
  45. });
  46. } else {
  47. debug(
  48. `moveAndMaybeCompressFile: deleting file=${targetFilePath}, renaming ${sourceFilePath} to ${targetFilePath}`
  49. );
  50. fs.unlink(targetFilePath, () => {
  51. fs.rename(sourceFilePath, targetFilePath, done);
  52. });
  53. }
  54. });
  55. };
  56. /**
  57. * RollingFileWriteStream is mainly used when writing to a file rolling by date or size.
  58. * RollingFileWriteStream inhebites from stream.Writable
  59. */
  60. class RollingFileWriteStream extends Writable {
  61. /**
  62. * Create a RollingFileWriteStream
  63. * @constructor
  64. * @param {string} filePath - The file path to write.
  65. * @param {object} options - The extra options
  66. * @param {number} options.numToKeep - The max numbers of files to keep.
  67. * @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte.
  68. * This should be more than 1024. The default is Number.MAX_SAFE_INTEGER.
  69. * @param {string} options.mode - The mode of the files. The default is '0644'. Refer to stream.writable for more.
  70. * @param {string} options.flags - The default is 'a'. Refer to stream.flags for more.
  71. * @param {boolean} options.compress - Whether to compress backup files.
  72. * @param {boolean} options.keepFileExt - Whether to keep the file extension.
  73. * @param {string} options.pattern - The date string pattern in the file name.
  74. * @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file.
  75. */
  76. constructor(filePath, options) {
  77. debug(`creating RollingFileWriteStream. path=${filePath}`);
  78. super(options);
  79. this.options = this._parseOption(options);
  80. this.fileObject = path.parse(filePath);
  81. if (this.fileObject.dir === "") {
  82. this.fileObject = path.parse(path.join(process.cwd(), filePath));
  83. }
  84. this.justTheFile = this._formatFileName({ isHotFile: true });
  85. this.filename = path.join(this.fileObject.dir, this.justTheFile);
  86. this.state = {
  87. currentSize: 0
  88. };
  89. if (this.options.pattern) {
  90. this.state.currentDate = format(this.options.pattern, newNow());
  91. }
  92. if (this.options.flags === "a") {
  93. this._setExistingSizeAndDate();
  94. }
  95. debug(
  96. `create new file with no hot file. name=${
  97. this.justTheFile
  98. }, state=${JSON.stringify(this.state)}`
  99. );
  100. this._renewWriteStream();
  101. }
  102. _setExistingSizeAndDate() {
  103. try {
  104. const stats = fs.statSync(this.filename);
  105. this.state.currentSize = stats.size;
  106. if (this.options.pattern) {
  107. this.state.currentDate = format(this.options.pattern, stats.birthtime);
  108. }
  109. } catch (e) {
  110. //file does not exist, that's fine - move along
  111. return;
  112. }
  113. }
  114. _parseOption(rawOptions) {
  115. const defaultOptions = {
  116. maxSize: Number.MAX_SAFE_INTEGER,
  117. numToKeep: Number.MAX_SAFE_INTEGER,
  118. encoding: "utf8",
  119. mode: parseInt("0644", 8),
  120. flags: "a",
  121. compress: false,
  122. keepFileExt: false,
  123. alwaysIncludePattern: false
  124. };
  125. const options = _.defaults({}, rawOptions, defaultOptions);
  126. if (options.maxSize <= 0) {
  127. throw new Error(`options.maxSize (${options.maxSize}) should be > 0`);
  128. }
  129. if (options.numToKeep <= 0) {
  130. throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`);
  131. }
  132. debug(`creating stream with option=${JSON.stringify(options)}`);
  133. return options;
  134. }
  135. _shouldRoll(callback) {
  136. if (
  137. this.state.currentDate &&
  138. this.state.currentDate !== format(this.options.pattern, newNow())
  139. ) {
  140. debug(
  141. `_shouldRoll: rolling by date because ${
  142. this.state.currentDate
  143. } !== ${format(this.options.pattern, newNow())}`
  144. );
  145. this._roll({ isNextPeriod: true }, callback);
  146. return;
  147. }
  148. if (this.state.currentSize >= this.options.maxSize) {
  149. debug(
  150. `_shouldRoll: rolling by size because ${this.state.currentSize} >= ${this.options.maxSize}`
  151. );
  152. this._roll({ isNextPeriod: false }, callback);
  153. return;
  154. }
  155. callback();
  156. }
  157. _write(chunk, encoding, callback) {
  158. this._shouldRoll(() => {
  159. debug(
  160. `writing chunk. ` +
  161. `file=${this.currentFileStream.path} ` +
  162. `state=${JSON.stringify(this.state)} ` +
  163. `chunk=${chunk}`
  164. );
  165. this.currentFileStream.write(chunk, encoding, e => {
  166. this.state.currentSize += chunk.length;
  167. callback(e);
  168. });
  169. });
  170. }
  171. // Sorted from the oldest to the latest
  172. _getExistingFiles(cb) {
  173. fs.readdir(this.fileObject.dir, (e, files) => {
  174. debug(`_getExistingFiles: files=${files}`);
  175. const existingFileDetails = _.compact(
  176. _.map(files, n => {
  177. const parseResult = this._parseFileName(n);
  178. debug(`_getExistingFiles: parsed ${n} as `, parseResult);
  179. if (!parseResult) {
  180. return;
  181. }
  182. return _.assign({ fileName: n }, parseResult);
  183. })
  184. );
  185. cb(
  186. null,
  187. _.sortBy(
  188. existingFileDetails,
  189. n =>
  190. (n.date
  191. ? format.parse(this.options.pattern, n.date).valueOf()
  192. : newNow().valueOf()) - n.index
  193. )
  194. );
  195. });
  196. }
  197. // need file name instead of file abs path.
  198. _parseFileName(fileName) {
  199. let isCompressed = false;
  200. if (fileName.endsWith(ZIP_EXT)) {
  201. fileName = fileName.slice(0, -1 * ZIP_EXT.length);
  202. isCompressed = true;
  203. }
  204. let metaStr;
  205. if (this.options.keepFileExt) {
  206. const prefix = this.fileObject.name + FILENAME_SEP;
  207. const suffix = this.fileObject.ext;
  208. if (!fileName.startsWith(prefix) || !fileName.endsWith(suffix)) {
  209. return;
  210. }
  211. metaStr = fileName.slice(prefix.length, -1 * suffix.length);
  212. debug(
  213. `metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}, suffix=${suffix}`
  214. );
  215. } else {
  216. const prefix = this.fileObject.base;
  217. if (!fileName.startsWith(prefix)) {
  218. return;
  219. }
  220. metaStr = fileName.slice(prefix.length + 1);
  221. debug(`metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}`);
  222. }
  223. if (!metaStr) {
  224. return {
  225. index: 0,
  226. isCompressed
  227. };
  228. }
  229. if (this.options.pattern) {
  230. const items = _.split(metaStr, FILENAME_SEP);
  231. const indexStr = items[items.length - 1];
  232. debug("items: ", items, ", indexStr: ", indexStr);
  233. if (indexStr !== undefined && indexStr.match(/^\d+$/)) {
  234. const dateStr = metaStr.slice(0, -1 * (indexStr.length + 1));
  235. debug(`dateStr is ${dateStr}`);
  236. if (dateStr) {
  237. return {
  238. index: parseInt(indexStr, 10),
  239. date: dateStr,
  240. isCompressed
  241. };
  242. }
  243. }
  244. debug(`metaStr is ${metaStr}`);
  245. return {
  246. index: 0,
  247. date: metaStr,
  248. isCompressed
  249. };
  250. } else {
  251. if (metaStr.match(/^\d+$/)) {
  252. return {
  253. index: parseInt(metaStr, 10),
  254. isCompressed
  255. };
  256. }
  257. }
  258. return;
  259. }
  260. _formatFileName({ date, index, isHotFile }) {
  261. debug(
  262. `_formatFileName: date=${date}, index=${index}, isHotFile=${isHotFile}`
  263. );
  264. const dateStr =
  265. date ||
  266. _.get(this, "state.currentDate") ||
  267. format(this.options.pattern, newNow());
  268. const indexOpt = index || _.get(this, "state.currentIndex");
  269. const oriFileName = this.fileObject.base;
  270. if (isHotFile) {
  271. debug(
  272. `_formatFileName: includePattern? ${this.options.alwaysIncludePattern}, pattern: ${this.options.pattern}`
  273. );
  274. if (this.options.alwaysIncludePattern && this.options.pattern) {
  275. debug(
  276. `_formatFileName: is hot file, and include pattern, so: ${oriFileName +
  277. FILENAME_SEP +
  278. dateStr}`
  279. );
  280. return this.options.keepFileExt
  281. ? this.fileObject.name + FILENAME_SEP + dateStr + this.fileObject.ext
  282. : oriFileName + FILENAME_SEP + dateStr;
  283. }
  284. debug(`_formatFileName: is hot file so, filename: ${oriFileName}`);
  285. return oriFileName;
  286. }
  287. let fileNameExtraItems = [];
  288. if (this.options.pattern) {
  289. fileNameExtraItems.push(dateStr);
  290. }
  291. if (indexOpt && this.options.maxSize < Number.MAX_SAFE_INTEGER) {
  292. fileNameExtraItems.push(indexOpt);
  293. }
  294. let fileName;
  295. if (this.options.keepFileExt) {
  296. const baseFileName =
  297. this.fileObject.name +
  298. FILENAME_SEP +
  299. fileNameExtraItems.join(FILENAME_SEP);
  300. fileName = baseFileName + this.fileObject.ext;
  301. } else {
  302. fileName =
  303. oriFileName + FILENAME_SEP + fileNameExtraItems.join(FILENAME_SEP);
  304. }
  305. if (this.options.compress) {
  306. fileName += ZIP_EXT;
  307. }
  308. debug(`_formatFileName: ${fileName}`);
  309. return fileName;
  310. }
  311. _moveOldFiles(isNextPeriod, cb) {
  312. const currentFilePath = this.currentFileStream.path;
  313. debug(`numToKeep = ${this.options.numToKeep}`);
  314. const finishedRolling = () => {
  315. if (isNextPeriod) {
  316. this.state.currentSize = 0;
  317. this.state.currentDate = format(this.options.pattern, newNow());
  318. debug(`rolling for next period. state=${JSON.stringify(this.state)}`);
  319. } else {
  320. this.state.currentSize = 0;
  321. debug(
  322. `rolling during the same period. state=${JSON.stringify(this.state)}`
  323. );
  324. }
  325. this._renewWriteStream();
  326. // wait for the file to be open before cleaning up old ones,
  327. // otherwise the daysToKeep calculations can be off
  328. this.currentFileStream.write("", "utf8", () => this._clean(cb));
  329. };
  330. this._getExistingFiles((e, files) => {
  331. const filesToMove = [];
  332. const todaysFiles = this.state.currentDate
  333. ? files.filter(f => f.date === this.state.currentDate)
  334. : files;
  335. for (let i = todaysFiles.length; i >= 0; i--) {
  336. debug(`i = ${i}`);
  337. const sourceFilePath =
  338. i === 0
  339. ? currentFilePath
  340. : path.format({
  341. dir: this.fileObject.dir,
  342. base: this._formatFileName({
  343. date: this.state.currentDate,
  344. index: i
  345. })
  346. });
  347. const targetFilePath = path.format({
  348. dir: this.fileObject.dir,
  349. base: this._formatFileName({
  350. date: this.state.currentDate,
  351. index: i + 1
  352. })
  353. });
  354. filesToMove.push({ sourceFilePath, targetFilePath });
  355. }
  356. debug(`filesToMove = `, filesToMove);
  357. async.eachOfSeries(
  358. filesToMove,
  359. (files, idx, cb1) => {
  360. debug(
  361. `src=${files.sourceFilePath}, tgt=${
  362. files.sourceFilePath
  363. }, idx=${idx}, pos=${filesToMove.length - 1 - idx}`
  364. );
  365. moveAndMaybeCompressFile(
  366. files.sourceFilePath,
  367. files.targetFilePath,
  368. this.options.compress && filesToMove.length - 1 - idx === 0,
  369. cb1
  370. );
  371. },
  372. finishedRolling
  373. );
  374. });
  375. }
  376. _roll({ isNextPeriod }, cb) {
  377. debug(`rolling, isNextPeriod ? ${isNextPeriod}`);
  378. debug(`_roll: closing the current stream`);
  379. this.currentFileStream.end("", this.options.encoding, () => {
  380. this._moveOldFiles(isNextPeriod, cb);
  381. });
  382. }
  383. _renewWriteStream() {
  384. fs.ensureDirSync(this.fileObject.dir);
  385. this.justTheFile = this._formatFileName({
  386. date: this.state.currentDate,
  387. index: 0,
  388. isHotFile: true
  389. });
  390. const filePath = path.format({
  391. dir: this.fileObject.dir,
  392. base: this.justTheFile
  393. });
  394. const ops = _.pick(this.options, ["flags", "encoding", "mode"]);
  395. this.currentFileStream = fs.createWriteStream(filePath, ops);
  396. this.currentFileStream.on("error", e => {
  397. this.emit("error", e);
  398. });
  399. }
  400. _clean(cb) {
  401. this._getExistingFiles((e, existingFileDetails) => {
  402. debug(
  403. `numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}`
  404. );
  405. debug("existing files are: ", existingFileDetails);
  406. if (
  407. this.options.numToKeep > 0 &&
  408. existingFileDetails.length > this.options.numToKeep
  409. ) {
  410. const fileNamesToRemove = _.slice(
  411. existingFileDetails.map(f => f.fileName),
  412. 0,
  413. existingFileDetails.length - this.options.numToKeep - 1
  414. );
  415. this._deleteFiles(fileNamesToRemove, cb);
  416. return;
  417. }
  418. cb();
  419. });
  420. }
  421. _deleteFiles(fileNames, done) {
  422. debug(`files to delete: ${fileNames}`);
  423. async.each(
  424. _.map(fileNames, f => path.format({ dir: this.fileObject.dir, base: f })),
  425. fs.unlink,
  426. done
  427. );
  428. return;
  429. }
  430. }
  431. module.exports = RollingFileWriteStream;