socket.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports/index');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parseqs = require('parseqs');
  11. /**
  12. * Module exports.
  13. */
  14. module.exports = Socket;
  15. /**
  16. * Socket constructor.
  17. *
  18. * @param {String|Object} uri or options
  19. * @param {Object} options
  20. * @api public
  21. */
  22. function Socket (uri, opts) {
  23. if (!(this instanceof Socket)) return new Socket(uri, opts);
  24. opts = opts || {};
  25. if (uri && 'object' === typeof uri) {
  26. opts = uri;
  27. uri = null;
  28. }
  29. if (uri) {
  30. uri = parseuri(uri);
  31. opts.hostname = uri.host;
  32. opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
  33. opts.port = uri.port;
  34. if (uri.query) opts.query = uri.query;
  35. } else if (opts.host) {
  36. opts.hostname = parseuri(opts.host).host;
  37. }
  38. this.secure = null != opts.secure ? opts.secure
  39. : (global.location && 'https:' === location.protocol);
  40. if (opts.hostname && !opts.port) {
  41. // if no port is specified manually, use the protocol default
  42. opts.port = this.secure ? '443' : '80';
  43. }
  44. this.agent = opts.agent || false;
  45. this.hostname = opts.hostname ||
  46. (global.location ? location.hostname : 'localhost');
  47. this.port = opts.port || (global.location && location.port
  48. ? location.port
  49. : (this.secure ? 443 : 80));
  50. this.query = opts.query || {};
  51. if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
  52. this.upgrade = false !== opts.upgrade;
  53. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  54. this.forceJSONP = !!opts.forceJSONP;
  55. this.jsonp = false !== opts.jsonp;
  56. this.forceBase64 = !!opts.forceBase64;
  57. this.enablesXDR = !!opts.enablesXDR;
  58. this.timestampParam = opts.timestampParam || 't';
  59. this.timestampRequests = opts.timestampRequests;
  60. this.transports = opts.transports || ['polling', 'websocket'];
  61. this.transportOptions = opts.transportOptions || {};
  62. this.readyState = '';
  63. this.writeBuffer = [];
  64. this.prevBufferLen = 0;
  65. this.policyPort = opts.policyPort || 843;
  66. this.rememberUpgrade = opts.rememberUpgrade || false;
  67. this.binaryType = null;
  68. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  69. this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
  70. if (true === this.perMessageDeflate) this.perMessageDeflate = {};
  71. if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
  72. this.perMessageDeflate.threshold = 1024;
  73. }
  74. // SSL options for Node.js client
  75. this.pfx = opts.pfx || null;
  76. this.key = opts.key || null;
  77. this.passphrase = opts.passphrase || null;
  78. this.cert = opts.cert || null;
  79. this.ca = opts.ca || null;
  80. this.ciphers = opts.ciphers || null;
  81. this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
  82. this.forceNode = !!opts.forceNode;
  83. // other options for Node.js client
  84. var freeGlobal = typeof global === 'object' && global;
  85. if (freeGlobal.global === freeGlobal) {
  86. if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
  87. this.extraHeaders = opts.extraHeaders;
  88. }
  89. if (opts.localAddress) {
  90. this.localAddress = opts.localAddress;
  91. }
  92. }
  93. // set on handshake
  94. this.id = null;
  95. this.upgrades = null;
  96. this.pingInterval = null;
  97. this.pingTimeout = null;
  98. // set on heartbeat
  99. this.pingIntervalTimer = null;
  100. this.pingTimeoutTimer = null;
  101. this.open();
  102. }
  103. Socket.priorWebsocketSuccess = false;
  104. /**
  105. * Mix in `Emitter`.
  106. */
  107. Emitter(Socket.prototype);
  108. /**
  109. * Protocol version.
  110. *
  111. * @api public
  112. */
  113. Socket.protocol = parser.protocol; // this is an int
  114. /**
  115. * Expose deps for legacy compatibility
  116. * and standalone browser access.
  117. */
  118. Socket.Socket = Socket;
  119. Socket.Transport = require('./transport');
  120. Socket.transports = require('./transports/index');
  121. Socket.parser = require('engine.io-parser');
  122. /**
  123. * Creates transport of the given type.
  124. *
  125. * @param {String} transport name
  126. * @return {Transport}
  127. * @api private
  128. */
  129. Socket.prototype.createTransport = function (name) {
  130. debug('creating transport "%s"', name);
  131. var query = clone(this.query);
  132. // append engine.io protocol identifier
  133. query.EIO = parser.protocol;
  134. // transport name
  135. query.transport = name;
  136. // per-transport options
  137. var options = this.transportOptions[name] || {};
  138. // session id if we already have one
  139. if (this.id) query.sid = this.id;
  140. var transport = new transports[name]({
  141. query: query,
  142. socket: this,
  143. agent: options.agent || this.agent,
  144. hostname: options.hostname || this.hostname,
  145. port: options.port || this.port,
  146. secure: options.secure || this.secure,
  147. path: options.path || this.path,
  148. forceJSONP: options.forceJSONP || this.forceJSONP,
  149. jsonp: options.jsonp || this.jsonp,
  150. forceBase64: options.forceBase64 || this.forceBase64,
  151. enablesXDR: options.enablesXDR || this.enablesXDR,
  152. timestampRequests: options.timestampRequests || this.timestampRequests,
  153. timestampParam: options.timestampParam || this.timestampParam,
  154. policyPort: options.policyPort || this.policyPort,
  155. pfx: options.pfx || this.pfx,
  156. key: options.key || this.key,
  157. passphrase: options.passphrase || this.passphrase,
  158. cert: options.cert || this.cert,
  159. ca: options.ca || this.ca,
  160. ciphers: options.ciphers || this.ciphers,
  161. rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
  162. perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
  163. extraHeaders: options.extraHeaders || this.extraHeaders,
  164. forceNode: options.forceNode || this.forceNode,
  165. localAddress: options.localAddress || this.localAddress,
  166. requestTimeout: options.requestTimeout || this.requestTimeout,
  167. protocols: options.protocols || void (0)
  168. });
  169. return transport;
  170. };
  171. function clone (obj) {
  172. var o = {};
  173. for (var i in obj) {
  174. if (obj.hasOwnProperty(i)) {
  175. o[i] = obj[i];
  176. }
  177. }
  178. return o;
  179. }
  180. /**
  181. * Initializes transport to use and starts probe.
  182. *
  183. * @api private
  184. */
  185. Socket.prototype.open = function () {
  186. var transport;
  187. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
  188. transport = 'websocket';
  189. } else if (0 === this.transports.length) {
  190. // Emit error on next tick so it can be listened to
  191. var self = this;
  192. setTimeout(function () {
  193. self.emit('error', 'No transports available');
  194. }, 0);
  195. return;
  196. } else {
  197. transport = this.transports[0];
  198. }
  199. this.readyState = 'opening';
  200. // Retry with the next transport if the transport is disabled (jsonp: false)
  201. try {
  202. transport = this.createTransport(transport);
  203. } catch (e) {
  204. this.transports.shift();
  205. this.open();
  206. return;
  207. }
  208. transport.open();
  209. this.setTransport(transport);
  210. };
  211. /**
  212. * Sets the current transport. Disables the existing one (if any).
  213. *
  214. * @api private
  215. */
  216. Socket.prototype.setTransport = function (transport) {
  217. debug('setting transport %s', transport.name);
  218. var self = this;
  219. if (this.transport) {
  220. debug('clearing existing transport %s', this.transport.name);
  221. this.transport.removeAllListeners();
  222. }
  223. // set up transport
  224. this.transport = transport;
  225. // set up transport listeners
  226. transport
  227. .on('drain', function () {
  228. self.onDrain();
  229. })
  230. .on('packet', function (packet) {
  231. self.onPacket(packet);
  232. })
  233. .on('error', function (e) {
  234. self.onError(e);
  235. })
  236. .on('close', function () {
  237. self.onClose('transport close');
  238. });
  239. };
  240. /**
  241. * Probes a transport.
  242. *
  243. * @param {String} transport name
  244. * @api private
  245. */
  246. Socket.prototype.probe = function (name) {
  247. debug('probing transport "%s"', name);
  248. var transport = this.createTransport(name, { probe: 1 });
  249. var failed = false;
  250. var self = this;
  251. Socket.priorWebsocketSuccess = false;
  252. function onTransportOpen () {
  253. if (self.onlyBinaryUpgrades) {
  254. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  255. failed = failed || upgradeLosesBinary;
  256. }
  257. if (failed) return;
  258. debug('probe transport "%s" opened', name);
  259. transport.send([{ type: 'ping', data: 'probe' }]);
  260. transport.once('packet', function (msg) {
  261. if (failed) return;
  262. if ('pong' === msg.type && 'probe' === msg.data) {
  263. debug('probe transport "%s" pong', name);
  264. self.upgrading = true;
  265. self.emit('upgrading', transport);
  266. if (!transport) return;
  267. Socket.priorWebsocketSuccess = 'websocket' === transport.name;
  268. debug('pausing current transport "%s"', self.transport.name);
  269. self.transport.pause(function () {
  270. if (failed) return;
  271. if ('closed' === self.readyState) return;
  272. debug('changing transport and sending upgrade packet');
  273. cleanup();
  274. self.setTransport(transport);
  275. transport.send([{ type: 'upgrade' }]);
  276. self.emit('upgrade', transport);
  277. transport = null;
  278. self.upgrading = false;
  279. self.flush();
  280. });
  281. } else {
  282. debug('probe transport "%s" failed', name);
  283. var err = new Error('probe error');
  284. err.transport = transport.name;
  285. self.emit('upgradeError', err);
  286. }
  287. });
  288. }
  289. function freezeTransport () {
  290. if (failed) return;
  291. // Any callback called by transport should be ignored since now
  292. failed = true;
  293. cleanup();
  294. transport.close();
  295. transport = null;
  296. }
  297. // Handle any error that happens while probing
  298. function onerror (err) {
  299. var error = new Error('probe error: ' + err);
  300. error.transport = transport.name;
  301. freezeTransport();
  302. debug('probe transport "%s" failed because of error: %s', name, err);
  303. self.emit('upgradeError', error);
  304. }
  305. function onTransportClose () {
  306. onerror('transport closed');
  307. }
  308. // When the socket is closed while we're probing
  309. function onclose () {
  310. onerror('socket closed');
  311. }
  312. // When the socket is upgraded while we're probing
  313. function onupgrade (to) {
  314. if (transport && to.name !== transport.name) {
  315. debug('"%s" works - aborting "%s"', to.name, transport.name);
  316. freezeTransport();
  317. }
  318. }
  319. // Remove all listeners on the transport and on self
  320. function cleanup () {
  321. transport.removeListener('open', onTransportOpen);
  322. transport.removeListener('error', onerror);
  323. transport.removeListener('close', onTransportClose);
  324. self.removeListener('close', onclose);
  325. self.removeListener('upgrading', onupgrade);
  326. }
  327. transport.once('open', onTransportOpen);
  328. transport.once('error', onerror);
  329. transport.once('close', onTransportClose);
  330. this.once('close', onclose);
  331. this.once('upgrading', onupgrade);
  332. transport.open();
  333. };
  334. /**
  335. * Called when connection is deemed open.
  336. *
  337. * @api public
  338. */
  339. Socket.prototype.onOpen = function () {
  340. debug('socket open');
  341. this.readyState = 'open';
  342. Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  343. this.emit('open');
  344. this.flush();
  345. // we check for `readyState` in case an `open`
  346. // listener already closed the socket
  347. if ('open' === this.readyState && this.upgrade && this.transport.pause) {
  348. debug('starting upgrade probes');
  349. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  350. this.probe(this.upgrades[i]);
  351. }
  352. }
  353. };
  354. /**
  355. * Handles a packet.
  356. *
  357. * @api private
  358. */
  359. Socket.prototype.onPacket = function (packet) {
  360. if ('opening' === this.readyState || 'open' === this.readyState ||
  361. 'closing' === this.readyState) {
  362. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  363. this.emit('packet', packet);
  364. // Socket is live - any packet counts
  365. this.emit('heartbeat');
  366. switch (packet.type) {
  367. case 'open':
  368. this.onHandshake(JSON.parse(packet.data));
  369. break;
  370. case 'pong':
  371. this.setPing();
  372. this.emit('pong');
  373. break;
  374. case 'error':
  375. var err = new Error('server error');
  376. err.code = packet.data;
  377. this.onError(err);
  378. break;
  379. case 'message':
  380. this.emit('data', packet.data);
  381. this.emit('message', packet.data);
  382. break;
  383. }
  384. } else {
  385. debug('packet received with socket readyState "%s"', this.readyState);
  386. }
  387. };
  388. /**
  389. * Called upon handshake completion.
  390. *
  391. * @param {Object} handshake obj
  392. * @api private
  393. */
  394. Socket.prototype.onHandshake = function (data) {
  395. this.emit('handshake', data);
  396. this.id = data.sid;
  397. this.transport.query.sid = data.sid;
  398. this.upgrades = this.filterUpgrades(data.upgrades);
  399. this.pingInterval = data.pingInterval;
  400. this.pingTimeout = data.pingTimeout;
  401. this.onOpen();
  402. // In case open handler closes socket
  403. if ('closed' === this.readyState) return;
  404. this.setPing();
  405. // Prolong liveness of socket on heartbeat
  406. this.removeListener('heartbeat', this.onHeartbeat);
  407. this.on('heartbeat', this.onHeartbeat);
  408. };
  409. /**
  410. * Resets ping timeout.
  411. *
  412. * @api private
  413. */
  414. Socket.prototype.onHeartbeat = function (timeout) {
  415. clearTimeout(this.pingTimeoutTimer);
  416. var self = this;
  417. self.pingTimeoutTimer = setTimeout(function () {
  418. if ('closed' === self.readyState) return;
  419. self.onClose('ping timeout');
  420. }, timeout || (self.pingInterval + self.pingTimeout));
  421. };
  422. /**
  423. * Pings server every `this.pingInterval` and expects response
  424. * within `this.pingTimeout` or closes connection.
  425. *
  426. * @api private
  427. */
  428. Socket.prototype.setPing = function () {
  429. var self = this;
  430. clearTimeout(self.pingIntervalTimer);
  431. self.pingIntervalTimer = setTimeout(function () {
  432. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  433. self.ping();
  434. self.onHeartbeat(self.pingTimeout);
  435. }, self.pingInterval);
  436. };
  437. /**
  438. * Sends a ping packet.
  439. *
  440. * @api private
  441. */
  442. Socket.prototype.ping = function () {
  443. var self = this;
  444. this.sendPacket('ping', function () {
  445. self.emit('ping');
  446. });
  447. };
  448. /**
  449. * Called on `drain` event
  450. *
  451. * @api private
  452. */
  453. Socket.prototype.onDrain = function () {
  454. this.writeBuffer.splice(0, this.prevBufferLen);
  455. // setting prevBufferLen = 0 is very important
  456. // for example, when upgrading, upgrade packet is sent over,
  457. // and a nonzero prevBufferLen could cause problems on `drain`
  458. this.prevBufferLen = 0;
  459. if (0 === this.writeBuffer.length) {
  460. this.emit('drain');
  461. } else {
  462. this.flush();
  463. }
  464. };
  465. /**
  466. * Flush write buffers.
  467. *
  468. * @api private
  469. */
  470. Socket.prototype.flush = function () {
  471. if ('closed' !== this.readyState && this.transport.writable &&
  472. !this.upgrading && this.writeBuffer.length) {
  473. debug('flushing %d packets in socket', this.writeBuffer.length);
  474. this.transport.send(this.writeBuffer);
  475. // keep track of current length of writeBuffer
  476. // splice writeBuffer and callbackBuffer on `drain`
  477. this.prevBufferLen = this.writeBuffer.length;
  478. this.emit('flush');
  479. }
  480. };
  481. /**
  482. * Sends a message.
  483. *
  484. * @param {String} message.
  485. * @param {Function} callback function.
  486. * @param {Object} options.
  487. * @return {Socket} for chaining.
  488. * @api public
  489. */
  490. Socket.prototype.write =
  491. Socket.prototype.send = function (msg, options, fn) {
  492. this.sendPacket('message', msg, options, fn);
  493. return this;
  494. };
  495. /**
  496. * Sends a packet.
  497. *
  498. * @param {String} packet type.
  499. * @param {String} data.
  500. * @param {Object} options.
  501. * @param {Function} callback function.
  502. * @api private
  503. */
  504. Socket.prototype.sendPacket = function (type, data, options, fn) {
  505. if ('function' === typeof data) {
  506. fn = data;
  507. data = undefined;
  508. }
  509. if ('function' === typeof options) {
  510. fn = options;
  511. options = null;
  512. }
  513. if ('closing' === this.readyState || 'closed' === this.readyState) {
  514. return;
  515. }
  516. options = options || {};
  517. options.compress = false !== options.compress;
  518. var packet = {
  519. type: type,
  520. data: data,
  521. options: options
  522. };
  523. this.emit('packetCreate', packet);
  524. this.writeBuffer.push(packet);
  525. if (fn) this.once('flush', fn);
  526. this.flush();
  527. };
  528. /**
  529. * Closes the connection.
  530. *
  531. * @api private
  532. */
  533. Socket.prototype.close = function () {
  534. if ('opening' === this.readyState || 'open' === this.readyState) {
  535. this.readyState = 'closing';
  536. var self = this;
  537. if (this.writeBuffer.length) {
  538. this.once('drain', function () {
  539. if (this.upgrading) {
  540. waitForUpgrade();
  541. } else {
  542. close();
  543. }
  544. });
  545. } else if (this.upgrading) {
  546. waitForUpgrade();
  547. } else {
  548. close();
  549. }
  550. }
  551. function close () {
  552. self.onClose('forced close');
  553. debug('socket closing - telling transport to close');
  554. self.transport.close();
  555. }
  556. function cleanupAndClose () {
  557. self.removeListener('upgrade', cleanupAndClose);
  558. self.removeListener('upgradeError', cleanupAndClose);
  559. close();
  560. }
  561. function waitForUpgrade () {
  562. // wait for upgrade to finish since we can't send packets while pausing a transport
  563. self.once('upgrade', cleanupAndClose);
  564. self.once('upgradeError', cleanupAndClose);
  565. }
  566. return this;
  567. };
  568. /**
  569. * Called upon transport error
  570. *
  571. * @api private
  572. */
  573. Socket.prototype.onError = function (err) {
  574. debug('socket error %j', err);
  575. Socket.priorWebsocketSuccess = false;
  576. this.emit('error', err);
  577. this.onClose('transport error', err);
  578. };
  579. /**
  580. * Called upon transport close.
  581. *
  582. * @api private
  583. */
  584. Socket.prototype.onClose = function (reason, desc) {
  585. if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
  586. debug('socket close with reason: "%s"', reason);
  587. var self = this;
  588. // clear timers
  589. clearTimeout(this.pingIntervalTimer);
  590. clearTimeout(this.pingTimeoutTimer);
  591. // stop event from firing again for transport
  592. this.transport.removeAllListeners('close');
  593. // ensure transport won't stay open
  594. this.transport.close();
  595. // ignore further transport communication
  596. this.transport.removeAllListeners();
  597. // set ready state
  598. this.readyState = 'closed';
  599. // clear session id
  600. this.id = null;
  601. // emit close event
  602. this.emit('close', reason, desc);
  603. // clean buffers after, so users can still
  604. // grab the buffers on `close` event
  605. self.writeBuffer = [];
  606. self.prevBufferLen = 0;
  607. }
  608. };
  609. /**
  610. * Filters upgrades, returning only those matching client transports.
  611. *
  612. * @param {Array} server upgrades
  613. * @api private
  614. *
  615. */
  616. Socket.prototype.filterUpgrades = function (upgrades) {
  617. var filteredUpgrades = [];
  618. for (var i = 0, j = upgrades.length; i < j; i++) {
  619. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  620. }
  621. return filteredUpgrades;
  622. };