developertrinidad08 7e6cf29479 first commit
2023-01-16 18:11:14 -03:00

252 lines
7.2 KiB
JavaScript

// if node
var Stream = require('stream').Stream;
var EventEmitter = require('events').EventEmitter;
var BufferReadStream = require('streamers').BufferReadStream;
var WebSocket = require('streamws');
var util = require('./util');
var BinaryStream = require('./stream').BinaryStream;
// end node
function BinaryClient(socket, options) {
if (!(this instanceof BinaryClient)) return new BinaryClient(socket, options);
EventEmitter.call(this);
var self = this;
this._options = util.extend({
chunkSize: 40960
}, options);
this.streams = {};
if(typeof socket === 'string') {
this._nextId = 0;
this._socket = new WebSocket(socket);
} else {
// Use odd numbered ids for server originated streams
this._nextId = 1;
this._socket = socket;
}
this._socket.binaryType = 'arraybuffer';
this._socket.addEventListener('open', function(){
self.emit('open');
});
// if node
this._socket.on('drain', function(){
var ids = Object.keys(self.streams);
for (var i = 0, ii = ids.length; i < ii; i++) {
self.streams[ids[i]]._onDrain();
}
});
// end node
this._socket.addEventListener('error', function(error){
var ids = Object.keys(self.streams);
for (var i = 0, ii = ids.length; i < ii; i++) {
self.streams[ids[i]]._onError(error);
}
self.emit('error', error);
});
this._socket.addEventListener('close', function(code, message){
var ids = Object.keys(self.streams);
for (var i = 0, ii = ids.length; i < ii; i++) {
self.streams[ids[i]]._onClose();
}
self.emit('close', code, message);
});
this._socket.addEventListener('message', function(data, flags){
util.setZeroTimeout(function(){
// Message format
// [type, payload, bonus ]
//
// Reserved
// [ 0 , X , X ]
//
//
// New stream
// [ 1 , Meta , new streamId ]
//
//
// Data
// [ 2 , Data , streamId ]
//
//
// Pause
// [ 3 , null , streamId ]
//
//
// Resume
// [ 4 , null , streamId ]
//
//
// End
// [ 5 , null , streamId ]
//
//
// Close
// [ 6 , null , streamId ]
//
data = data.data;
try {
data = util.unpack(data);
} catch (ex) {
return self.emit('error', new Error('Received unparsable message: ' + ex));
}
if (!(data instanceof Array))
return self.emit('error', new Error('Received non-array message'));
if (data.length != 3)
return self.emit('error', new Error('Received message with wrong part count: ' + data.length));
if ('number' != typeof data[0])
return self.emit('error', new Error('Received message with non-number type: ' + data[0]));
switch(data[0]) {
case 0:
// Reserved
break;
case 1:
var meta = data[1];
var streamId = data[2];
var binaryStream = self._receiveStream(streamId);
self.emit('stream', binaryStream, meta);
break;
case 2:
var payload = data[1];
var streamId = data[2];
var binaryStream = self.streams[streamId];
if(binaryStream) {
binaryStream._onData(payload);
} else {
self.emit('error', new Error('Received `data` message for unknown stream: ' + streamId));
}
break;
case 3:
var streamId = data[2];
var binaryStream = self.streams[streamId];
if(binaryStream) {
binaryStream._onPause();
} else {
self.emit('error', new Error('Received `pause` message for unknown stream: ' + streamId));
}
break;
case 4:
var streamId = data[2];
var binaryStream = self.streams[streamId];
if(binaryStream) {
binaryStream._onResume();
} else {
self.emit('error', new Error('Received `resume` message for unknown stream: ' + streamId));
}
break;
case 5:
var streamId = data[2];
var binaryStream = self.streams[streamId];
if(binaryStream) {
binaryStream._onEnd();
} else {
self.emit('error', new Error('Received `end` message for unknown stream: ' + streamId));
}
break;
case 6:
var streamId = data[2];
var binaryStream = self.streams[streamId];
if(binaryStream) {
binaryStream._onClose();
} else {
self.emit('error', new Error('Received `close` message for unknown stream: ' + streamId));
}
break;
default:
self.emit('error', new Error('Unrecognized message type received: ' + data[0]));
}
});
});
}
util.inherits(BinaryClient, EventEmitter);
BinaryClient.prototype.send = function(data, meta){
var stream = this.createStream(meta);
if(data instanceof Stream) {
data.pipe(stream);
} else if (util.isNode === true) {
if(Buffer.isBuffer(data)) {
(new BufferReadStream(data, {chunkSize: this._options.chunkSize})).pipe(stream);
} else {
stream.write(data);
}
} else if (util.isNode !== true) {
if(data.constructor == Blob || data.constructor == File) {
(new BlobReadStream(data, {chunkSize: this._options.chunkSize})).pipe(stream);
} else if (data.constructor == ArrayBuffer) {
var blob;
if(binaryFeatures.useArrayBufferView) {
data = new Uint8Array(data);
}
if(binaryFeatures.useBlobBuilder) {
var builder = new BlobBuilder();
builder.append(data);
blob = builder.getBlob()
} else {
blob = new Blob([data]);
}
(new BlobReadStream(blob, {chunkSize: this._options.chunkSize})).pipe(stream);
} else if (typeof data === 'object' && 'BYTES_PER_ELEMENT' in data) {
var blob;
if(!binaryFeatures.useArrayBufferView) {
// Warn
data = data.buffer;
}
if(binaryFeatures.useBlobBuilder) {
var builder = new BlobBuilder();
builder.append(data);
blob = builder.getBlob()
} else {
blob = new Blob([data]);
}
(new BlobReadStream(blob, {chunkSize: this._options.chunkSize})).pipe(stream);
} else {
stream.write(data);
}
}
return stream;
};
BinaryClient.prototype._receiveStream = function(streamId){
var self = this;
var binaryStream = new BinaryStream(this._socket, streamId, false);
binaryStream.on('close', function(){
delete self.streams[streamId];
});
this.streams[streamId] = binaryStream;
return binaryStream;
};
BinaryClient.prototype.createStream = function(meta){
if(this._socket.readyState !== WebSocket.OPEN) {
throw new Error('Client is not yet connected or has closed');
return;
}
var self = this;
var streamId = this._nextId;
this._nextId += 2;
var binaryStream = new BinaryStream(this._socket, streamId, true, meta);
binaryStream.on('close', function(){
delete self.streams[streamId];
});
this.streams[streamId] = binaryStream;
return binaryStream;
};
BinaryClient.prototype.close = BinaryClient.prototype.destroy = function() {
this._socket.close();
};
exports.BinaryClient = BinaryClient;