305 lines
7.1 KiB
JavaScript
305 lines
7.1 KiB
JavaScript
function BlobReadStream(source, options){
|
|
Stream.call(this);
|
|
|
|
options = util.extend({
|
|
readDelay: 0,
|
|
paused: false
|
|
}, options);
|
|
|
|
this._source = source;
|
|
this._start = 0;
|
|
this._readChunkSize = options.chunkSize || source.size;
|
|
this._readDelay = options.readDelay;
|
|
|
|
this.readable = true;
|
|
this.paused = options.paused;
|
|
|
|
this._read();
|
|
}
|
|
|
|
util.inherits(BlobReadStream, Stream);
|
|
|
|
|
|
BlobReadStream.prototype.pause = function(){
|
|
this.paused = true;
|
|
};
|
|
|
|
BlobReadStream.prototype.resume = function(){
|
|
this.paused = false;
|
|
this._read();
|
|
};
|
|
|
|
BlobReadStream.prototype.destroy = function(){
|
|
this.readable = false;
|
|
clearTimeout(this._timeoutId);
|
|
};
|
|
|
|
BlobReadStream.prototype._read = function(){
|
|
|
|
var self = this;
|
|
|
|
function emitReadChunk(){
|
|
self._emitReadChunk();
|
|
}
|
|
|
|
var readDelay = this._readDelay;
|
|
if (readDelay !== 0){
|
|
this._timeoutId = setTimeout(emitReadChunk, readDelay);
|
|
} else {
|
|
util.setZeroTimeout(emitReadChunk);
|
|
}
|
|
|
|
};
|
|
|
|
BlobReadStream.prototype._emitReadChunk = function(){
|
|
|
|
if(this.paused || !this.readable) return;
|
|
|
|
var chunkSize = Math.min(this._source.size - this._start, this._readChunkSize);
|
|
|
|
if(chunkSize === 0){
|
|
this.readable = false;
|
|
this.emit("end");
|
|
return;
|
|
}
|
|
|
|
var sourceEnd = this._start + chunkSize;
|
|
var chunk = (this._source.slice || this._source.webkitSlice || this._source.mozSlice).call(this._source, this._start, sourceEnd);
|
|
|
|
this._start = sourceEnd;
|
|
this._read();
|
|
|
|
this.emit("data", chunk);
|
|
|
|
};
|
|
|
|
/*
|
|
|
|
|
|
|
|
|
|
function BlobWriteStream(options){
|
|
|
|
stream.Stream.call(this);
|
|
|
|
options = _.extend({
|
|
onFull: onFull,
|
|
onEnd: function(){},
|
|
minBlockAllocSize: 0,
|
|
drainDelay:0
|
|
}, options);
|
|
|
|
this._onFull = options.onFull;
|
|
this._onEnd = options.onEnd;
|
|
this._onWrite = options.onWrite;
|
|
|
|
this._minBlockAllocSize = options.minBlockAllocSize;
|
|
this._maxBlockAllocSize = options.maxBlockAllocSize;
|
|
this._drainDelay = options.drainDelay;
|
|
|
|
this._buffer = new Buffer(options.minBlockAllocSize);
|
|
this._destination = this._buffer;
|
|
this._destinationPos = 0;
|
|
|
|
this._writeQueue = [];
|
|
this._pendingOnFull = false;
|
|
this._pendingQueueDrain = false;
|
|
|
|
this.writable = true;
|
|
this.bytesWritten = 0;
|
|
}
|
|
|
|
util.inherits(BlobWriteStream, stream.Stream);
|
|
|
|
BlobWriteStream.prototype.getBuffer = function(){
|
|
return this._buffer;
|
|
};
|
|
|
|
BlobWriteStream.prototype.write = function(data, encoding){
|
|
|
|
if(!this.writable){
|
|
throw new Error("stream is not writable");
|
|
}
|
|
|
|
if(!Buffer.isBuffer(data)){
|
|
data = new Buffer(data, encoding);
|
|
}
|
|
|
|
if(data.length){
|
|
this._writeQueue.push(data);
|
|
}
|
|
|
|
this._commit();
|
|
|
|
return this._writeQueue.length === 0;
|
|
};
|
|
|
|
BlobWriteStream.prototype._commit = function(){
|
|
|
|
var self = this;
|
|
|
|
var destination = this._destination;
|
|
var writeQueue = this._writeQueue;
|
|
|
|
var startDestinationPos = this._destinationPos;
|
|
|
|
while(writeQueue.length && destination.length){
|
|
|
|
var head = writeQueue[0];
|
|
|
|
var copySize = Math.min(destination.length, head.length);
|
|
|
|
head.copy(destination, 0, 0, copySize);
|
|
|
|
head = head.slice(copySize);
|
|
destination = destination.slice(copySize);
|
|
|
|
this.bytesWritten += copySize;
|
|
this._destinationPos += copySize;
|
|
|
|
if(head.length === 0){
|
|
writeQueue.shift();
|
|
}
|
|
else{
|
|
writeQueue[0] = head;
|
|
}
|
|
}
|
|
|
|
this._destination = destination;
|
|
|
|
bytesCommitted = this._destinationPos - startDestinationPos;
|
|
if(bytesCommitted){
|
|
if(this._onWrite){
|
|
|
|
if(writeQueue.length){
|
|
this._pendingQueueDrain = true;
|
|
}
|
|
|
|
// By locking destination the buffer is frozen and the onWrite
|
|
// callback cannot miss any write commits
|
|
this._destination = emptyBuffer;
|
|
|
|
var consumer = this._onWrite;
|
|
this._onWrite = null;
|
|
|
|
consumer.call(this, function(nextCallback){
|
|
util.setZeroTimeout(function(){
|
|
self._destination = destination;
|
|
self._onWrite = nextCallback;
|
|
self._commit();
|
|
});
|
|
}, consumer);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
if(writeQueue.length){
|
|
|
|
this._pendingQueueDrain = true;
|
|
this._growBuffer();
|
|
}
|
|
else if(this._pendingQueueDrain){
|
|
|
|
this._pendingQueueDrain = false;
|
|
|
|
if(this._drainDelay !== 0){
|
|
setTimeout(function(){
|
|
self.emit("drain");
|
|
}, this._drainDelay);
|
|
}
|
|
else{
|
|
util.setZeroTimeout(function(){
|
|
self.emit("drain");
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
BlobWriteStream.prototype._growBuffer = function(){
|
|
|
|
var self = this;
|
|
var writeQueue = this._writeQueue;
|
|
|
|
var requestSize = this._minBlockAllocSize;
|
|
|
|
var maxBlockAllocSize = this._maxBlockAllocSize;
|
|
var add = (maxBlockAllocSize === undefined ? function(a, b){return a + b;} : function(a, b){return Math.min(a + b, maxBlockAllocSize);});
|
|
|
|
for(var i = 0, queueLength = writeQueue.length; i < queueLength; i++){
|
|
requestSize = add(requestSize, writeQueue[i].length);
|
|
}
|
|
|
|
// Prevent concurrent onFull callbacks
|
|
if(this._pendingOnFull){
|
|
return;
|
|
}
|
|
this._pendingOnFull = true;
|
|
|
|
this._onFull(this._buffer, requestSize, function(buffer, destination){
|
|
util.setZeroTimeout(function(){
|
|
|
|
self._pendingOnFull = false;
|
|
|
|
if(!destination){
|
|
if(self.writable){
|
|
self.emit("error", new Error("buffer is full"));
|
|
}
|
|
self.destroy();
|
|
return;
|
|
}
|
|
|
|
self._buffer = buffer;
|
|
self._destination = destination;
|
|
|
|
self._commit();
|
|
});
|
|
});
|
|
};
|
|
|
|
BlobWriteStream.prototype.end = function(data, encoding){
|
|
|
|
var self = this;
|
|
|
|
function _end(){
|
|
self.writable = false;
|
|
self._onEnd();
|
|
}
|
|
|
|
if(data){
|
|
if(this.write(data, encoding)){
|
|
_end();
|
|
}else{
|
|
self.writable = false;
|
|
this.once("drain", _end);
|
|
}
|
|
}
|
|
else{
|
|
_end();
|
|
}
|
|
};
|
|
|
|
BlobWriteStream.prototype.destroy = function(){
|
|
this.writable = false;
|
|
this._pendingQueueDrain = false;
|
|
this._writeQueue = [];
|
|
};
|
|
|
|
BlobWriteStream.prototype.consume = function(consume){
|
|
|
|
this._buffer = this._buffer.slice(consume);
|
|
this._destinationPos -= consume;
|
|
};
|
|
|
|
BlobWriteStream.prototype.getCommittedSlice = function(){
|
|
return this._buffer.slice(0, this._destinationPos);
|
|
};
|
|
|
|
function onFull(buffer, extraSize, callback){
|
|
var newBuffer = new Buffer(buffer.length + extraSize);
|
|
buffer.copy(newBuffer);
|
|
callback(newBuffer, newBuffer.slice(buffer.length));
|
|
}
|
|
*/
|
|
exports.BlobReadStream = BlobReadStream;
|
|
exports.BlobWriteStream = BlobWriteStream; |