websoket/node_modules/streamers/lib/proactive_read_stream.js
developertrinidad08 7e6cf29479 first commit
2023-01-16 18:11:14 -03:00

155 lines
3.6 KiB
JavaScript

var _ = require("underscore");
var bufferStreamModule = require("./buffer_stream");
var BufferWriteStream = bufferStreamModule.BufferWriteStream;
function ProactiveReadStream(source, writeStreamOptions){
var self = this;
function consumer(callback){
if(self._readCompletionCondition){
self._completeReadOperation();
}
callback(consumer);
}
function onEnd(){
self.cancel("end of stream");
}
this._writeStream = new BufferWriteStream(_.extend(writeStreamOptions || {},{
onWrite: consumer,
onEnd: onEnd
}));
source.pipe(this._writeStream);
}
ProactiveReadStream.prototype._completeReadOperation = function(){
if(!this._readCompletionCondition){
return;
}
var consume = 0;
var committedSlice = this._writeStream.getCommittedSlice();
consume = this._readCompletionCondition(committedSlice);
if(consume){
var consumedSlice = committedSlice.slice(0, consume);
this._writeStream.consume(consume);
this._readCompletionCondition = null;
if(this._readCompletionCallback){
var completionCallback = this._readCompletionCallback;
this._readCompletionCallback = null;
completionCallback(consumedSlice);
}
}
};
ProactiveReadStream.prototype.read = function(condition, callback){
if(condition === null){
condition = everything();
}
else if(condition.constructor === Number){
condition = bytes(condition);
}
else if(condition.constructor === String){
condition = until(condition);
}
if(this._readCompletionCondition){
throw new Error("read operation in progress");
}
this._readCompletionCondition = condition;
this._readCompletionCallback = callback;
var self = this;
process.nextTick(function(){
self._completeReadOperation();
});
};
ProactiveReadStream.prototype.cancel = function(message){
message = message || "operation cancelled";
this._readCompletionCondition = null;
if(this._readCompletionCallback){
var callback = this._readCompletionCallback;
this._readCompletionCallback = null;
callback(null, new Error(message));
}
};
function everything(){
return function(buffer){
return buffer.length;
};
}
function bytes(n){
return function(buffer){
if(buffer.length >= n){
return n;
}
else{
return 0;
}
};
}
function until(delimiter, encoding){
if(!Buffer.isBuffer(delimiter)){
delimiter = new Buffer(delimiter, encoding);
}
var delimiterLength = delimiter.length;
var bufferIndex = 0;
return function(buffer){
var end = buffer.length - delimiterLength + 1;
var match;
for(; bufferIndex < end; bufferIndex++){
match = true;
for(var i = 0; i < delimiterLength; i++){
if(delimiter[i] !== buffer[bufferIndex + i]){
match = false;
break;
}
}
if(match){
var consume = bufferIndex + delimiterLength;
bufferIndex = 0;
return consume;
}
}
return 0;
};
}
exports.ProactiveReadStream = ProactiveReadStream;
exports.bytes = bytes;
exports.until = until;
exports.everything = everything;