Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/bt-stream-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const hat = require('hat');
const {Swarm} = require('./swarm');
const BTStream = require('./bt-stream');


const createBTStream = ({dhtPort, hash: _hash}) => {
const dht = new DHT();
const id = '-FRIDGE-' + hat(48);
Expand Down
44 changes: 22 additions & 22 deletions lib/bt-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const log = require('./logger')('BTStream');

const createEmptyReadableStream = () => {
const stream = new Readable();
stream._read = () => null;
setImmediate(() => stream.emit('error', new Error('File not found')));
return stream;
};
Expand Down Expand Up @@ -38,50 +39,49 @@ module.exports = class BTStream {
* @param {number} from Offset from file start position
* @returns {*}
*/
downloadFile({torrent, file, from}) {
const torrentHash = torrent.infoHash;
downloadFile({torrent, file, from, to}) {
const pieces = torrent.pieces;
const pieceSize = torrent.pieceLength;

const result = slicePieces({pieces, pieceSize, file, from});

const result = slicePieces({pieces, pieceSize, file, from, to});
// TODO: lastLength какой-то некорректный.
// Нужно вообще его переосмыслить, потому что дальше должен передаваться реальный последний кусок, а не тот, который мы отрезаем.
// Возможно нужно взять из прошлой реализации
const {firstOffset, lastLength, pieces: targetPieces} = result;

const {length, pieceLength, lastPieceLength} = torrent;
log('download file::', 'torrent', {length, pieceLength, lastPieceLength});
log('download file::', `result`, {firstOffset, lastLength});

const bytesOffset = file.offset;
const pieceOffset = pieces.indexOf(targetPieces[0]) + Math.floor(bytesOffset / pieceSize);

// TODO: создание pie происходит в двух местах. Сократить до одного.
this._pie = new Pie({
swarm: this._swarm,
torrentHash,
pieces: targetPieces,
pieceSize,
pieceIndexOffset: pieces.findIndex((it) => it === targetPieces[0]),
firstOffset,
lastLength,
});
swarm: this._swarm, pieces: targetPieces, pieceSize, pieceOffset, firstOffset, lastLength
}, {from: firstOffset, to: lastLength});

return this._pie.getReadStream();
}

downloadFileByName({torrent, filename, offset}) {
downloadFileByName({torrent, filename, from, to}) {
const file = torrent.files.find((file) => file.name === filename);

if (file) {
return this.downloadFile({torrent, file, from: offset});
return this.downloadFile({torrent, file, from, to});
} else {
// TODO: Зачем эта функциональность? Тут же стоит бросить исключение.
return createEmptyReadableStream();
}
}

/**
* @param {{files: [],}} torrent
* @param {string} filePath
* @param {number} offset Offset from file start position
* @returns {*}
*/
downloadFileByPath({torrent, filePath, offset}) {
log('download file by path::0', {offset});
downloadFileByPath({torrent, filePath, from, to}) {
console.log('download file by path::0', {from, to});
const file = torrent.files.find((file) => file.path === filePath);

if (file) {
return this.downloadFile({torrent, file, from: offset});
return this.downloadFile({torrent, file, from, to});
} else {
// TODO: Зачем эта функциональность? Тут же стоит бросить исключение.
return createEmptyReadableStream();
Expand Down
11 changes: 9 additions & 2 deletions lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ const transports = {

const send = transports.console;

const createLogger = (moduleName) => {
const createLogger = (moduleName, enable) => {
let prefix = null;

const log = (...args) => {
if (!enable) {
return;
}

if (prefix) {
send(moduleName, prefix(), ...args);
} else {
Expand Down Expand Up @@ -43,4 +47,7 @@ const createLogger = (moduleName) => {
return log;
};

module.exports = (moduleName) => createLogger(moduleName);
module.exports = (moduleName, enable) => {
return createLogger(moduleName, enable);
};

171 changes: 124 additions & 47 deletions lib/pie.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,25 @@
const crypto = require('crypto');
const { Readable } = require('stream');
const PieceLoader = require('./piece-loader');
const log = require('./logger')('Pie');
const {createBuffer, verifyPiece} = require('./pie.utils');

const CHUNK_SIZE = 16384;
const READ_STREAM_ITERATION_COUNT = 8;

const createBuffer = (piece, size, hash) => {
const buffersPieces = Object
.keys(piece)
.sort((a, b) => parseInt(a, 10) - parseInt(b, 10))
.map((key) => piece[key]);

const buffer = Buffer.concat(buffersPieces, buffersPieces.length * size);

const isGood = verifyPiece(buffer, hash);
log('verify is', isGood.toString().toUpperCase());

return buffer;
};

/**
* @return {string}
*/
const sha1 = (data) => {
return crypto.createHash('sha1')
.update(data)
.digest('hex');
};

const verifyPiece = (buffer, referenceHash) => {
const pieceHash = sha1(buffer);
return pieceHash === referenceHash;
};


let _buffer = null;
let _buffer1 = null;
module.exports = class Pie {
constructor({pieces, pieceSize, firstOffset, lastLength, swarm, pieceIndexOffset}) {
constructor({pieces, pieceSize, pieceOffset, firstOffset, lastLength, swarm}, {from, to}) {
console.log('Pie::args', {pieces: pieces.length, pieceSize, pieceOffset, firstOffset, lastLength}, {from, to})
this._pieces = pieces;
this._pieceSize = pieceSize;
this._pieceIndexOffset = pieceIndexOffset;
this._firstOffest = firstOffset;
this._pieceOffset = pieceOffset;
this._firstBytesOffset = firstOffset;
this._lastLength = lastLength;
this._swarm = swarm;

this._from = from;
this._to = to;

/**
* @type {PassThrough}
* @private
Expand All @@ -64,13 +40,15 @@ module.exports = class Pie {
*/
this._pieceLoader = null;

// TODO: Remove onWire, because all use `swarm.getFreeWires()`
this._swarm.getWires().forEach((it) => this._onWire(it));
this._swarm.on(this._swarm.EVENT_ADDED_WIRE, (wire) => this._onWire(wire));

this.init();
this._init();
}

async init() {
_init() {
console.log('pie::_init')
this._readStream = Readable.from(this._generatorPieLoader(), {
highWaterMark: READ_STREAM_ITERATION_COUNT,
})
Expand All @@ -97,41 +75,140 @@ module.exports = class Pie {
}

async *_generatorPieLoader() {
const length = this._pieces.length;
const loadedPieces = [];

const length = this._pieces.length;
if (!_buffer) {
for (let i = length - 1; i < length && length > 1; i++) {
const index = i;


// TODO: Иногда загрузка висит на месте.
// TODO: скорее всего нужно убивать его спустя какое-то время.
const pieceSize = index === length - 1 ? this._lastLength : this._pieceSize;
const lastSize = index === length - 1 ? pieceSize % CHUNK_SIZE || CHUNK_SIZE : CHUNK_SIZE;

// console.log('Pie::lastSize', lastSize)
this._pieceLoader = new PieceLoader({
swarm: this._swarm,
chunkSize: CHUNK_SIZE,
pieceIndex: index + this._pieceOffset,
pieceSize,
lastSize: lastSize === 2047 ? 2048 : lastSize,
isDebug: length === 1,
});

const hash = this._pieces[index];
const now = Date.now();

// console.log(`####### start load`);
console.log(`####### Load ${index + 1} of ${length}`)
const piece = await this._pieceLoader.load();
// console.log(`####### finish load`);

// const time = Date.now() - now;
// console.log({time, speed: CHUNK_SIZE / time / 1000});

this._pieceLoader.destroy();
this._pieceLoader = null;

let buffer = createBuffer(piece);
const isGood = verifyPiece(buffer, hash);
console.log(`####### Loaded ${index + 1} of ${length}`, `TARGET`, {i, pieceSize, lastSize}, 'verify is', isGood)
!isGood && console.log('verify is BAD!!!', isGood);

loadedPieces.push(1);

console.log(`*********** buffer size before`, buffer.length);
if (index === 0 && length > 1) {
buffer = Buffer.from(buffer.buffer, this._firstBytesOffset % pieceSize).slice(this._firstBytesOffset % pieceSize);
// buffer = Buffer.from(buffer.buffer, this._from);
} else if (index === length - 1) {
console.log(`!!!!!!!!!!!!`, `create buffer from last`);
buffer = Buffer.from(buffer.buffer, 0, this._lastLength);
// buffer = Buffer.from(buffer.buffer, 0, this._to);
}

console.log(`to cache`)
console.log(`*********** buffer size after `, buffer.length);

_buffer = buffer;
break;
}
}
for (let i = 0; i < length; i++) {
if (length === 1 && _buffer) {
console.log(`from cache`)
// *********** buffer size before 7392962 offset 7343931
// *********** buffer size after 7392962
const pieceSize = this._lastLength;
console.log(`*********** buffer size before`, _buffer.length, `offset`, this._firstBytesOffset % pieceSize);
const buffer = Buffer.from(_buffer, this._firstBytesOffset % pieceSize).slice(this._firstBytesOffset % pieceSize);
console.log(`*********** buffer size after `, buffer.length);

yield buffer;
continue;
}
const index = i;
log(`Load piece ${index + 1} of ${length}`);
if (i === 0 && _buffer1 && length === 660) {
console.log(`####### Loaded ${index + 1} of ${length} from cache`);
yield _buffer1;
continue;
}

// console.log(`####### Load ${index + 1} of ${length}`);

// TODO: Иногда загрузка висит на месте.
// TODO: скорее всего нужно убивать его спустя какое-то время.
const pieceSize = index === length - 1 ? this._lastLength : this._pieceSize;
const lastSize = index === length - 1 ? pieceSize % CHUNK_SIZE : CHUNK_SIZE;
const lastSize = index === length - 1 ? pieceSize % CHUNK_SIZE || CHUNK_SIZE : CHUNK_SIZE;

// console.log('Pie::lastSize', lastSize)
this._pieceLoader = new PieceLoader({
swarm: this._swarm,
wires: this._wires,
chunkSize: CHUNK_SIZE,
pieceIndex: index + this._pieceIndexOffset,
pieceIndex: index + this._pieceOffset,
pieceSize,
lastSize,
lastSize: lastSize === 2047 ? 2048 : lastSize,
});

const hash = this._pieces[index];
const now = Date.now();

// console.log(`####### start load`);
console.log(`####### Load ${index + 1} of ${length}`)
const piece = await this._pieceLoader.load();
// console.log(`####### finish load`);

// const time = Date.now() - now;
// console.log({time, speed: CHUNK_SIZE / time / 1000});

this._pieceLoader.destroy();
this._pieceLoader = null;

const hash = this._pieces[index];
let buffer = createBuffer(piece, CHUNK_SIZE, hash);
let buffer = createBuffer(piece);
const isGood = verifyPiece(buffer, hash);
console.log(`####### Loaded ${index + 1} of ${length}`, `TARGET`, {i, pieceSize, lastSize}, 'verify is', isGood)
!isGood && console.log('verify is BAD!!!', isGood);

loadedPieces.push(1);

if (index === 0) {
buffer = Buffer.from(buffer.buffer, this._firstOffest % pieceSize);
console.log(`*********** buffer size before`, buffer.length);
if (index === 0 && length > 1) {
buffer = Buffer.from(buffer.buffer, this._from % pieceSize).slice(this._from % pieceSize);
// buffer = Buffer.from(buffer.buffer, this._from);
} else if (index === 0) {
buffer = Buffer.from(buffer.buffer, this._firstBytesOffset % pieceSize).slice(this._firstBytesOffset % pieceSize);
} else if (index === length - 1) {
log(`Create buffer from last piece`);
console.log(`!!!!!!!!!!!!`, `create buffer from last`);
buffer = Buffer.from(buffer.buffer, 0, this._lastLength);
// buffer = Buffer.from(buffer.buffer, 0, this._to);
}
console.log(`*********** buffer size after `, buffer.length);

if (i === 1 && length > 2) {
_buffer1 = buffer;
}
yield buffer;
}
}
Expand Down
49 changes: 49 additions & 0 deletions lib/pie.utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const crypto = require('crypto');

/**
* @param piece
* @returns {Buffer}
*/
const createBuffer = (piece) => {
const buffersPieces = Object
.keys(piece)
.sort((a, b) => parseInt(a, 10) - parseInt(b, 10))
.map((key) => piece[key]);

// const length = buffersPieces.reduce((acc, cur) => {
// // console.log('cur.length', cur.length)
// return acc + cur.length;
// }, 0);

// console.log('size', size);
// console.log('length', length);
// console.log('length::after', length - buffersPieces[buffersPieces.length - 1].length);

return Buffer.concat(buffersPieces);
};

/**
* @returns {string}
*/
const sha1 = (data) => {
return crypto.createHash('sha1')
.update(data)
.digest('hex');
};


/**
* @param buffer
* @param referenceHash
* @returns {boolean}
*/
const verifyPiece = (buffer, referenceHash) => {
const pieceHash = sha1(buffer);
return pieceHash === referenceHash;
};


module.exports = {
createBuffer,
verifyPiece,
};
Loading