// if node var Stream = require('stream').Stream; var EventEmitter = require('events').EventEmitter; var BufferReadStream = require('streamers').BufferReadStream; var WebSocket = require('streamws'); var util = require('./util'); var BinaryStream = require('./stream').BinaryStream; // end node function BinaryClient(socket, options) { if (!(this instanceof BinaryClient)) return new BinaryClient(socket, options); EventEmitter.call(this); var self = this; this._options = util.extend({ chunkSize: 40960 }, options); this.streams = {}; if(typeof socket === 'string') { this._nextId = 0; this._socket = new WebSocket(socket); } else { // Use odd numbered ids for server originated streams this._nextId = 1; this._socket = socket; } this._socket.binaryType = 'arraybuffer'; this._socket.addEventListener('open', function(){ self.emit('open'); }); // if node this._socket.on('drain', function(){ var ids = Object.keys(self.streams); for (var i = 0, ii = ids.length; i < ii; i++) { self.streams[ids[i]]._onDrain(); } }); // end node this._socket.addEventListener('error', function(error){ var ids = Object.keys(self.streams); for (var i = 0, ii = ids.length; i < ii; i++) { self.streams[ids[i]]._onError(error); } self.emit('error', error); }); this._socket.addEventListener('close', function(code, message){ var ids = Object.keys(self.streams); for (var i = 0, ii = ids.length; i < ii; i++) { self.streams[ids[i]]._onClose(); } self.emit('close', code, message); }); this._socket.addEventListener('message', function(data, flags){ util.setZeroTimeout(function(){ // Message format // [type, payload, bonus ] // // Reserved // [ 0 , X , X ] // // // New stream // [ 1 , Meta , new streamId ] // // // Data // [ 2 , Data , streamId ] // // // Pause // [ 3 , null , streamId ] // // // Resume // [ 4 , null , streamId ] // // // End // [ 5 , null , streamId ] // // // Close // [ 6 , null , streamId ] // data = data.data; try { data = util.unpack(data); } catch (ex) { return self.emit('error', new Error('Received unparsable message: ' + ex)); } if (!(data instanceof Array)) return self.emit('error', new Error('Received non-array message')); if (data.length != 3) return self.emit('error', new Error('Received message with wrong part count: ' + data.length)); if ('number' != typeof data[0]) return self.emit('error', new Error('Received message with non-number type: ' + data[0])); switch(data[0]) { case 0: // Reserved break; case 1: var meta = data[1]; var streamId = data[2]; var binaryStream = self._receiveStream(streamId); self.emit('stream', binaryStream, meta); break; case 2: var payload = data[1]; var streamId = data[2]; var binaryStream = self.streams[streamId]; if(binaryStream) { binaryStream._onData(payload); } else { self.emit('error', new Error('Received `data` message for unknown stream: ' + streamId)); } break; case 3: var streamId = data[2]; var binaryStream = self.streams[streamId]; if(binaryStream) { binaryStream._onPause(); } else { self.emit('error', new Error('Received `pause` message for unknown stream: ' + streamId)); } break; case 4: var streamId = data[2]; var binaryStream = self.streams[streamId]; if(binaryStream) { binaryStream._onResume(); } else { self.emit('error', new Error('Received `resume` message for unknown stream: ' + streamId)); } break; case 5: var streamId = data[2]; var binaryStream = self.streams[streamId]; if(binaryStream) { binaryStream._onEnd(); } else { self.emit('error', new Error('Received `end` message for unknown stream: ' + streamId)); } break; case 6: var streamId = data[2]; var binaryStream = self.streams[streamId]; if(binaryStream) { binaryStream._onClose(); } else { self.emit('error', new Error('Received `close` message for unknown stream: ' + streamId)); } break; default: self.emit('error', new Error('Unrecognized message type received: ' + data[0])); } }); }); } util.inherits(BinaryClient, EventEmitter); BinaryClient.prototype.send = function(data, meta){ var stream = this.createStream(meta); if(data instanceof Stream) { data.pipe(stream); } else if (util.isNode === true) { if(Buffer.isBuffer(data)) { (new BufferReadStream(data, {chunkSize: this._options.chunkSize})).pipe(stream); } else { stream.write(data); } } else if (util.isNode !== true) { if(data.constructor == Blob || data.constructor == File) { (new BlobReadStream(data, {chunkSize: this._options.chunkSize})).pipe(stream); } else if (data.constructor == ArrayBuffer) { var blob; if(binaryFeatures.useArrayBufferView) { data = new Uint8Array(data); } if(binaryFeatures.useBlobBuilder) { var builder = new BlobBuilder(); builder.append(data); blob = builder.getBlob() } else { blob = new Blob([data]); } (new BlobReadStream(blob, {chunkSize: this._options.chunkSize})).pipe(stream); } else if (typeof data === 'object' && 'BYTES_PER_ELEMENT' in data) { var blob; if(!binaryFeatures.useArrayBufferView) { // Warn data = data.buffer; } if(binaryFeatures.useBlobBuilder) { var builder = new BlobBuilder(); builder.append(data); blob = builder.getBlob() } else { blob = new Blob([data]); } (new BlobReadStream(blob, {chunkSize: this._options.chunkSize})).pipe(stream); } else { stream.write(data); } } return stream; }; BinaryClient.prototype._receiveStream = function(streamId){ var self = this; var binaryStream = new BinaryStream(this._socket, streamId, false); binaryStream.on('close', function(){ delete self.streams[streamId]; }); this.streams[streamId] = binaryStream; return binaryStream; }; BinaryClient.prototype.createStream = function(meta){ if(this._socket.readyState !== WebSocket.OPEN) { throw new Error('Client is not yet connected or has closed'); return; } var self = this; var streamId = this._nextId; this._nextId += 2; var binaryStream = new BinaryStream(this._socket, streamId, true, meta); binaryStream.on('close', function(){ delete self.streams[streamId]; }); this.streams[streamId] = binaryStream; return binaryStream; }; BinaryClient.prototype.close = BinaryClient.prototype.destroy = function() { this._socket.close(); }; exports.BinaryClient = BinaryClient;