From a5dc8b6da29bc1f9e8b304f285222ee2d1b044d0 Mon Sep 17 00:00:00 2001 From: schlagmichdoch Date: Mon, 5 Feb 2024 02:06:53 +0100 Subject: [PATCH] Split transfer into message and data transfer and rewrite FileChunkerLogic completely. Condense all Relaying of the Websocket Fallback into one message type 'ws-relay' --- public/scripts/network.js | 662 +++++++++++++++++++++++--------------- server/ws-server.js | 18 +- 2 files changed, 410 insertions(+), 270 deletions(-) diff --git a/public/scripts/network.js b/public/scripts/network.js index 02e379b..1d72781 100644 --- a/public/scripts/network.js +++ b/public/scripts/network.js @@ -134,88 +134,78 @@ class ServerConnection { this.send({ type: 'leave-public-room' }); } - _onMessage(msg) { - msg = JSON.parse(msg); - if (msg.type !== 'ping') console.log('WS receive:', msg); - switch (msg.type) { + _onMessage(message) { + const messageJSON = JSON.parse(message); + if (messageJSON.type !== 'ping' && messageJSON.type !== 'ws-relay') console.log('WS receive:', messageJSON); + switch (messageJSON.type) { case 'ws-config': - this._setWsConfig(msg.wsConfig); + this._setWsConfig(messageJSON.wsConfig); break; case 'peers': - this._onPeers(msg); + this._onPeers(messageJSON); break; case 'peer-joined': - Events.fire('peer-joined', msg); + Events.fire('peer-joined', messageJSON); break; case 'peer-left': - Events.fire('peer-left', msg); + Events.fire('peer-left', messageJSON); break; case 'signal': - Events.fire('signal', msg); + Events.fire('signal', messageJSON); break; case 'ping': this.send({ type: 'pong' }); break; case 'display-name': - this._onDisplayName(msg); + this._onDisplayName(messageJSON); break; case 'pair-device-initiated': - Events.fire('pair-device-initiated', msg); + Events.fire('pair-device-initiated', messageJSON); break; case 'pair-device-joined': - Events.fire('pair-device-joined', msg); + Events.fire('pair-device-joined', messageJSON); break; case 'pair-device-join-key-invalid': Events.fire('pair-device-join-key-invalid'); break; case 'pair-device-canceled': - Events.fire('pair-device-canceled', msg.pairKey); + Events.fire('pair-device-canceled', messageJSON.pairKey); break; case 'join-key-rate-limit': Events.fire('notify-user', Localization.getTranslation("notifications.rate-limit-join-key")); break; case 'secret-room-deleted': - Events.fire('secret-room-deleted', msg.roomSecret); + Events.fire('secret-room-deleted', messageJSON.roomSecret); break; case 'room-secret-regenerated': - Events.fire('room-secret-regenerated', msg); + Events.fire('room-secret-regenerated', messageJSON); break; case 'public-room-id-invalid': - Events.fire('public-room-id-invalid', msg.publicRoomId); + Events.fire('public-room-id-invalid', messageJSON.publicRoomId); break; case 'public-room-created': - Events.fire('public-room-created', msg.roomId); + Events.fire('public-room-created', messageJSON.roomId); break; case 'public-room-left': Events.fire('public-room-left'); break; - case 'request': - case 'header': - case 'partition': - case 'partition-received': - case 'progress': - case 'files-transfer-response': - case 'file-transfer-complete': - case 'message-transfer-complete': - case 'text': - case 'display-name-changed': - case 'ws-chunk': + case 'ws-relay': // ws-fallback if (this._wsConfig.wsFallback) { - Events.fire('ws-relay', JSON.stringify(msg)); + Events.fire('ws-relay', {peerId: messageJSON.sender.id, message: message}); } else { console.log("WS receive: message type is for websocket fallback only but websocket fallback is not activated on this instance.") } break; default: - console.error('WS receive: unknown message type', msg); + console.error('WS receive: unknown message type', messageJSON); } } send(msg) { if (!this._isConnected()) return; - if (msg.type !== 'pong') console.log("WS send:", msg) + if (msg.type !== 'pong' && msg.type !== 'ws-relay') console.log("WS send:", msg) this._socket.send(JSON.stringify(msg)); } @@ -339,19 +329,26 @@ class Peer { this._evaluateAutoAccept(); } + // Is overwritten in expanding classes + _onServerSignalMessage(message) {} + + // Is overwritten in expanding classes + _refresh() {} + + _onDisconnected() {} + _setIsCaller(isCaller) { this._isCaller = isCaller; } - sendJSON(message) { - this._send(JSON.stringify(message)); - } + // Is overwritten in expanding classes + _sendMessage(message) {} // Is overwritten in expanding classes - _send(message) {} + _sendData(data) {} _sendDisplayName(displayName) { - this.sendJSON({type: 'display-name-changed', displayName: displayName}); + this._sendMessage({type: 'display-name-changed', displayName: displayName}); } _isSameBrowser() { @@ -478,7 +475,7 @@ class Peer { this._filesRequested = files; - this.sendJSON({type: 'request', + this._sendMessage({type: 'request', header: header, totalSize: totalSize, imagesOnly: imagesOnly, @@ -502,34 +499,20 @@ class Peer { this._sendFile(file); } - _sendFile(file) { - this.sendJSON({ + _sendHeader(file) { + this._sendMessage({ type: 'header', size: file.size, name: file.name, mime: file.type }); - this._chunker = new FileChunker(file, - chunk => this._send(chunk), - offset => this._onPartitionEnd(offset)); - this._chunker.nextPartition(); } - _onPartitionEnd(offset) { - this.sendJSON({ type: 'partition', offset: offset }); - } - - _onReceivedPartitionEnd(offset) { - this.sendJSON({ type: 'partition-received', offset: offset }); - } + // Is overwritten in expanding classes + _sendFile(file) {} _requestResendFromOffset(offset) { - this.sendJSON({ type: 'request-resend-from-offset', offset: offset }); - } - - _sendNextPartition() { - if (!this._chunker || this._chunker.isFileEnd()) return; - this._chunker.nextPartition(); + this._sendMessage({ type: 'request-resend-from-offset', offset: offset }); } _onRequestResendFromOffset(offset) { @@ -539,22 +522,14 @@ class Peer { } _sendProgress(progress) { - this.sendJSON({ type: 'progress', progress: progress }); + this._sendMessage({ type: 'progress', progress: progress }); + } + + _onData(data) { + this._onChunkReceived(data); } _onMessage(message) { - if (typeof message !== 'string') { - this._onChunkReceived(message); - return; - } - - try { - message = JSON.parse(message); - } catch (e) { - console.warn("Peer: Received JSON is malformed"); - return; - } - switch (message.type) { case 'request': this._onFilesTransferRequest(message); @@ -562,15 +537,12 @@ class Peer { case 'header': this._onFileHeader(message); break; - case 'partition': - this._onReceivedPartitionEnd(message); - break; - case 'partition-received': - this._sendNextPartition(); - break; case 'progress': this._onProgress(message.progress); break; + case 'bytes-received-confirmation': + this._onBytesReceivedConfirmation(message.bytesReceived); + break; case 'request-resend-from-offset': this._onRequestResendFromOffset(message.offset); break; @@ -589,19 +561,21 @@ class Peer { case 'display-name-changed': this._onDisplayNameChanged(message); break; + default: + console.warn('RTC: Unknown message type:', message.type); } } _onFilesTransferRequest(request) { if (this._requestPending) { // Only accept one request at a time per peer - this.sendJSON({type: 'files-transfer-response', accepted: false}); + this._sendMessage({type: 'files-transfer-response', accepted: false}); return; } if (window.iOS && request.totalSize >= 200*1024*1024) { // iOS Safari can only put 400MB at once to memory. // Request to send them in chunks of 200MB instead: - this.sendJSON({type: 'files-transfer-response', accepted: false, reason: 'ios-memory-limit'}); + this._sendMessage({type: 'files-transfer-response', accepted: false, reason: 'ios-memory-limit'}); return; } @@ -621,7 +595,7 @@ class Peer { } _respondToFileTransferRequest(accepted) { - this.sendJSON({type: 'files-transfer-response', accepted: accepted}); + this._sendMessage({type: 'files-transfer-response', accepted: accepted}); if (accepted) { this._requestAccepted = this._requestPending; this._totalBytesReceived = 0; @@ -634,14 +608,16 @@ class Peer { _onFileHeader(header) { if (this._requestAccepted && this._requestAccepted.header.length) { this._lastProgress = 0; - this._digester = new FileDigester({size: header.size, name: header.name, mime: header.mime}, - this._requestAccepted.totalSize, - this._totalBytesReceived, - fileBlob => this._onFileReceived(fileBlob) - ); + this._addFileDigester(header); } } + _addFileDigester(header) {} + + _sendBytesReceivedConfirmation(bytesReceived) { + this._sendMessage({type: 'bytes-received-confirmation', bytesReceived: bytesReceived}); + } + _abortTransfer() { Events.fire('set-progress', {peerId: this._peerId, progress: 1, status: 'wait'}); Events.fire('notify-user', Localization.getTranslation("notifications.files-incorrect")); @@ -680,11 +656,16 @@ class Peer { Events.fire('set-progress', {peerId: this._peerId, progress: progress, status: 'transfer'}); } + _onBytesReceivedConfirmation(bytesReceived) { + if (!this._chunker) return; + this._chunker._onBytesReceived(bytesReceived); + } + async _onFileReceived(fileBlob) { const acceptedHeader = this._requestAccepted.header.shift(); this._totalBytesReceived += fileBlob.size; - this.sendJSON({type: 'file-transfer-complete'}); + this._sendMessage({type: 'file-transfer-complete'}); const sameSize = fileBlob.size === acceptedHeader.size; const sameName = fileBlob.name === acceptedHeader.name @@ -745,14 +726,14 @@ class Peer { sendText(text) { const unescaped = btoa(unescape(encodeURIComponent(text))); - this.sendJSON({ type: 'text', text: unescaped }); + this._sendMessage({ type: 'text', text: unescaped }); } _onTextReceived(message) { if (!message.text) return; const escaped = decodeURIComponent(escape(atob(message.text))); Events.fire('text-received', { text: escaped, peerId: this._peerId }); - this.sendJSON({ type: 'message-transfer-complete' }); + this._sendMessage({ type: 'message-transfer-complete' }); } _onDisplayNameChanged(message) { @@ -794,11 +775,11 @@ class RTCPeer extends Peer { this._connect(); } - _isConnected() { + _connected() { return this._conn && this._conn.connectionState === 'connected'; } - _isConnecting() { + _connecting() { return this._conn && ( this._conn.connectionState === 'new' @@ -806,26 +787,50 @@ class RTCPeer extends Peer { ); } - _isChannelOpen() { - return this._channel && this._channel.readyState === 'open'; + _messageChannelOpen() { + return this._messageChannel && this._messageChannel.readyState === 'open'; } - _isChannelConnecting() { - return this._channel && this._channel.readyState === 'connecting'; + _dataChannelOpen() { + return this._dataChannel && this._dataChannel.readyState === 'open'; } - _isStable() { - return this._isChannelOpen() && this._isConnected(); + _messageChannelConnecting() { + return this._messageChannel && this._messageChannel.readyState === 'connecting'; + } + + _dataChannelConnecting() { + return this._dataChannel && this._dataChannel.readyState === 'connecting'; + } + + _channelOpen() { + return this._messageChannelOpen() && this._dataChannelOpen(); + } + + _channelConnecting() { + return (this._dataChannelConnecting() || this._dataChannelOpen()) + && (this._messageChannelConnecting() || this._messageChannelOpen()); + } + + _stable() { + return this._connected() && this._channelOpen(); } _connect() { - if (this._isStable()) return; + if (this._stable()) return; Events.fire('peer-connecting', this._peerId); this._openConnection(); - // TOdo: one channel for messages - one for data? - this._openChannel(); + this._openMessageChannel(); + this._openDataChannel(); + + this._evaluatePendingInboundMessages() + .then((count) => { + if (count) { + console.log("Pending inbound messages evaluated."); + } + }); } _openConnection() { @@ -839,13 +844,6 @@ class RTCPeer extends Peer { conn.onicecandidateerror = e => this._onIceCandidateError(e); this._conn = conn; - - this._evaluatePendingInboundMessages() - .then((count) => { - if (count) { - console.log("Pending inbound messages evaluated."); - } - }); } async _onNegotiationNeeded() { @@ -879,7 +877,7 @@ class RTCPeer extends Peer { break; case 'failed': console.warn('RTC connection failed'); - // TOdo: implement ws fallback as real fallback + // Todo: if error is "TURN server needed" -> fallback to WS if activated this._refresh(); } } @@ -892,40 +890,58 @@ class RTCPeer extends Peer { console.error(error); } - _openChannel() { - const channel = this._conn.createDataChannel('data-channel', { - ordered: true, - negotiated: true, - id: 0 - }); - channel.binaryType = 'arraybuffer'; - channel.onopen = _ => this._onChannelOpened(); - channel.onclose = _ => this._onChannelClosed(); - channel.onerror = e => this._onChannelError(e); - channel.onmessage = e => this._onMessage(e.data); - - this._channel = channel; + _openMessageChannel() { + const messageCallback = e => this._onMessage(e.data); + this._messageChannel = this._openChannel("message-channel", 1, "json", messageCallback); } - _onChannelOpened() { - console.log('RTC: Channel opened with', this._peerId); - console.debug(this.getConnectionHash()) - console.debug(this._conn) - console.debug(this._channel) + _openDataChannel() { + const messageCallback = e => this._onData(e.data); + this._dataChannel = this._openChannel("data-channel", 0, "raw", messageCallback); + } + + _openChannel(label, id, protocol, messageCallback) { + const channel = this._conn.createDataChannel(label, { + ordered: true, + negotiated: true, + id: id, + protocol: protocol + }); + channel.binaryType = "arraybuffer"; + channel.onopen = e => this._onChannelOpened(e); + channel.onclose = e => this._onChannelClosed(e); + channel.onerror = e => this._onChannelError(e); + channel.onmessage = messageCallback; + + return channel; + } + + _onChannelOpened(e) { + console.log(`RTC: Channel ${e.target.label} opened with`, this._peerId); + + // wait until all channels are open + if (!this._stable()) return; + Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()}); super._onPeerConnected(); - while (this._isChannelOpen() && this.pendingOutboundMessages.length > 0) { - this._sendViaChannel(this.pendingOutboundMessages.shift()); + + this._sendPendingOutboundMessaged(); + } + + _sendPendingOutboundMessaged() { + while (this._stable() && this.pendingOutboundMessages.length > 0) { + this._sendViaMessageChannel(this.pendingOutboundMessages.shift()); } } - _onChannelClosed() { - console.log('RTC: Channel closed', this._peerId); + _onChannelClosed(e) { + console.log(`RTC: Channel ${e.target.label} closed`, this._peerId); this._refresh(); } - _onChannelError(error) { - console.error(error); + _onChannelError(e) { + console.warn(`RTC: Channel ${e.target.label} error`, this._peerId); + console.error(e.error); } @@ -949,22 +965,29 @@ class RTCPeer extends Peer { } _handleLocalCandidate(candidate) { - console.log("RTC: Sending local candidate"); - this._sendSignal({ signalType: 'candidate', candidate: candidate }); + if (this.localIceCandidatesSent) return; + + console.log("RTC: Local candidate created", candidate); if (candidate === null) { this.localIceCandidatesSent = true; + return; } + + this._sendSignal({ signalType: 'candidate', candidate: candidate }); } async _handleRemoteCandidate(candidate) { - console.log("RTC: Received remote candidate"); - if (candidate !== null) { - await this._conn.addIceCandidate(candidate); - } - else { + if (this.remoteIceCandidatesReceived) return; + + console.log("RTC: Received remote candidate", candidate); + + if (candidate === null) { this.remoteIceCandidatesReceived = true; + return; } + + await this._conn.addIceCandidate(candidate); } async _evaluatePendingInboundMessages() { @@ -972,15 +995,14 @@ class RTCPeer extends Peer { while (this.pendingInboundMessages.length > 0) { const message = this.pendingInboundMessages.shift(); console.log("Evaluate pending inbound message:", message); - await this.onServerMessage(message); + await this._onServerSignalMessage(message); inboundMessagesEvaluatedCount++; } return inboundMessagesEvaluatedCount; } - async onServerMessage(message) { + async _onServerSignalMessage(message) { if (this._conn === null) { - console.debug("Not ready yet. Pending needed indeed?") this.pendingInboundMessages.push(message); return; } @@ -1009,14 +1031,26 @@ class RTCPeer extends Peer { this._connect(); // reopen the channel } + _onDisconnected() { + this._closeChannelAndConnection(); + } + _closeChannelAndConnection() { - if (this._channel) { - this._channel.onopen = null; - this._channel.onclose = null; - this._channel.onerror = null; - this._channel.onmessage = null; - this._channel.close(); - this._channel = null; + if (this._dataChannel) { + this._dataChannel.onopen = null; + this._dataChannel.onclose = null; + this._dataChannel.onerror = null; + this._dataChannel.onmessage = null; + this._dataChannel.close(); + this._dataChannel = null; + } + if (this._messageChannel) { + this._messageChannel.onopen = null; + this._messageChannel.onclose = null; + this._messageChannel.onerror = null; + this._messageChannel.onmessage = null; + this._messageChannel.close(); + this._messageChannel = null; } if (this._conn) { this._conn.onnegotiationneeded = null; @@ -1044,20 +1078,26 @@ class RTCPeer extends Peer { this._disconnect(); } - _send(message) { - // Todo: if channel or connection is closed or disconnected: do not send - // put messages in queue and send after reconnection. - // this._pendingMessages[]; - if (!this._isStable() || this.pendingOutboundMessages.length > 0) { + _sendMessage(message) { + if (!this._stable() || this.pendingOutboundMessages.length > 0) { // queue messages if not connected OR if connected AND queue is not empty this.pendingOutboundMessages.push(message); return; } - this._sendViaChannel(message); + this._sendViaMessageChannel(message); } - _sendViaChannel(message) { - this._channel.send(message); + _sendViaMessageChannel(message) { + console.log('RTC Send:', message); + this._messageChannel.send(JSON.stringify(message)); + } + + _sendData(data) { + this._sendViaDataChannel(data) + } + + _sendViaDataChannel(data) { + this._dataChannel.send(data); } _sendSignal(message) { @@ -1068,8 +1108,35 @@ class RTCPeer extends Peer { this._server.send(message); } - _sendDisplayName(displayName) { - super._sendDisplayName(displayName); + async _sendFile(file) { + this._sendHeader(file); + this._chunker = new FileChunkerRTC( + file, + chunk => this._sendData(chunk), + this._conn, + this._dataChannel + ); + this._chunker._readChunk(); + } + + _onMessage(message) { + // Todo: Test speed increase without prints? --> print only on debug mode via URL argument `?debug_mode=true` + console.log('RTC Receive:', JSON.parse(message)); + try { + message = JSON.parse(message); + } catch (e) { + console.warn("RTCPeer: Received JSON is malformed"); + return; + } + super._onMessage(message); + } + + _addFileDigester(header) { + this._digester = new FileDigester({size: header.size, name: header.name, mime: header.mime}, + this._requestAccepted.totalSize, + this._totalBytesReceived, + fileBlob => this._onFileReceived(fileBlob) + ); } getConnectionHash() { @@ -1097,54 +1164,6 @@ class RTCPeer extends Peer { } return hash; } - - _onBeforeUnload(e) { - if (this._busy) { - e.preventDefault(); - return Localization.getTranslation("notifications.unfinished-transfers-warning"); - } - } - - _onPageHide() { - this._disconnect(); - } - - _disconnect() { - if (this._conn && this._channel) { - this._channel.onclose = null; - this._channel.close(); - } - Events.fire('peer-disconnected', this._peerId); - } - - _onChannelClosed() { - console.log('RTC: channel closed', this._peerId); - Events.fire('peer-disconnected', this._peerId); - if (!this._isCaller) return; - this._connect(); // reopen the channel - } - - _onConnectionStateChange() { - console.log('RTC: state changed:', this._conn.connectionState); - switch (this._conn.connectionState) { - case 'disconnected': - Events.fire('peer-disconnected', this._peerId); - this._onError('rtc connection disconnected'); - break; - case 'failed': - Events.fire('peer-disconnected', this._peerId); - this._onError('rtc connection failed'); - break; - } - } - - _onMessage(message) { - if (typeof message === 'string') { - // Todo: Test speed increase without prints? --> print only on debug mode via URL argument `?debug_mode=true` - console.log('RTC:', JSON.parse(message)); - } - super._onMessage(message); - } } class WSPeer extends Peer { @@ -1153,19 +1172,38 @@ class WSPeer extends Peer { super(serverConnection, isCaller, peerId, roomType, roomId); this.rtcSupported = false; + this.signalSuccessful = false; if (!this._isCaller) return; // we will listen for a caller + this._sendSignal(); } - _send(chunk) { - this.sendJSON({ - type: 'ws-chunk', - chunk: arrayBufferToBase64(chunk) + _sendFile(file) { + this._sendHeader(file); + this._chunker = new FileChunkerWS( + file, + chunk => this._sendData(chunk) + ); + this._chunker._readChunk(); + } + + _sendData(data) { + this._sendMessage({ + type: 'chunk', + chunk: arrayBufferToBase64(data) }); } - sendJSON(message) { + _sendMessage(message) { + message = { + type: 'ws-relay', + message: message + }; + this._sendMessageViaServer(message); + } + + _sendMessageViaServer(message) { message.to = this._peerId; message.roomType = this._getRoomTypes()[0]; message.roomId = this._roomIds[this._getRoomTypes()[0]]; @@ -1173,16 +1211,67 @@ class WSPeer extends Peer { } _sendSignal(connected = false) { - this.sendJSON({type: 'signal', connected: connected}); + this._sendMessageViaServer({type: 'signal', connected: connected}); } - onServerMessage(message) { + _onServerSignalMessage(message) { this._peerId = message.sender.id; - Events.fire('peer-connected', {peerId: message.sender.id, connectionHash: this.getConnectionHash()}) - if (message.connected) return; + + Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()}) + + if (message.connected) { + this.signalSuccessful = true; + return; + } + this._sendSignal(true); } + _onMessage(message) { + console.log('WS Receive:', message); + super._onMessage(message); + } + + _addFileDigester(header) { + this._digester = new FileDigester({size: header.size, name: header.name, mime: header.mime}, + this._requestAccepted.totalSize, + this._totalBytesReceived, + fileBlob => this._onFileReceived(fileBlob), + bytesReceived => this._sendBytesReceivedConfirmation(bytesReceived) + ); + } + + _onWsRelay(message) { + try { + message = JSON.parse(message).message; + } + catch (e) { + console.warn("WSPeer: Received JSON is malformed"); + return; + } + + if (message.type === 'chunk') { + const data = base64ToArrayBuffer(message.chunk); + this._onData(data); + + } + else { + this._onMessage(message); + } + } + + _refresh() { + this.signalSuccessful = true; + + if (!this._isCaller) return; // we will listen for a caller + + this._sendSignal(); + } + + _onDisconnected() { + this.signalSuccessful = false; + } + getConnectionHash() { // Todo: implement SubtleCrypto asymmetric encryption and create connectionHash from public keys return ""; @@ -1194,7 +1283,7 @@ class PeersManager { constructor(serverConnection) { this.peers = {}; this._server = serverConnection; - Events.on('signal', e => this._onMessage(e.detail)); + Events.on('signal', e => this._onSignal(e.detail)); Events.on('peers', e => this._onPeers(e.detail)); Events.on('files-selected', e => this._onFilesSelected(e.detail)); Events.on('respond-to-files-transfer-request', e => this._onRespondToFileTransferRequest(e.detail)) @@ -1217,7 +1306,7 @@ class PeersManager { Events.on('notify-peer-display-name-changed', e => this._notifyPeerDisplayNameChanged(e.detail)); Events.on('auto-accept-updated', e => this._onAutoAcceptUpdated(e.detail.roomSecret, e.detail.autoAccept)); Events.on('ws-disconnected', _ => this._onWsDisconnected()); - Events.on('ws-relay', e => this._onWsRelay(e.detail)); + Events.on('ws-relay', e => this._onWsRelay(e.detail.peerId, e.detail.message)); Events.on('ws-config', e => this._onWsConfig(e.detail)); } @@ -1225,9 +1314,9 @@ class PeersManager { this._wsConfig = wsConfig; } - _onMessage(message) { + _onSignal(message) { const peerId = message.sender.id; - this.peers[peerId].onServerMessage(message); + this.peers[peerId]._onServerSignalMessage(message); } _refreshPeer(isCaller, peerId, roomType, roomId) { @@ -1282,12 +1371,14 @@ class PeersManager { }) } - _onWsRelay(message) { + _onWsRelay(peerId, message) { if (!this._wsConfig.wsFallback) return; - const messageJSON = JSON.parse(message); - if (messageJSON.type === 'ws-chunk') message = base64ToArrayBuffer(messageJSON.chunk); - this.peers[messageJSON.sender.id]._onMessage(message); + const peer = this.peers[peerId]; + + if (!peer || peer.rtcSupported) return; + + peer._onWsRelay(message); } _onRespondToFileTransferRequest(detail) { @@ -1352,7 +1443,7 @@ class PeersManager { if (!peer) return; - peer._closeChannelAndConnection(); + peer._onDisconnected(); } _onRoomSecretsDeleted(roomSecrets) { @@ -1446,83 +1537,142 @@ class PeersManager { class FileChunker { - constructor(file, onChunk, onPartitionEnd) { - this._chunkSize = 64000; // 64 KB - this._maxPartitionSize = 1e6; // 1 MB - this._offset = 0; - this._partitionSize = 0; + constructor(file, onChunkCallback) { + this._chunkSize = 65536; // 64 KB + this._maxBytesSentWithoutConfirmation = 1048576; // 1 MB + + this._bytesSent = 0; + this._bytesReceived = 0; + this._file = file; - this._onChunk = onChunk; - this._onPartitionEnd = onPartitionEnd; + this._onChunk = onChunkCallback; + this._reader = new FileReader(); this._reader.addEventListener('load', e => this._onChunkRead(e.target.result)); - } - nextPartition() { - this._partitionSize = 0; - this._readChunk(); + this._currentlySending = false; } _readChunk() { - const chunk = this._file.slice(this._offset, this._offset + this._chunkSize); + if (this._currentlySending) return; + + this._currentlySending = true; + const chunk = this._file.slice(this._bytesSent, this._bytesSent + this._chunkSize); this._reader.readAsArrayBuffer(chunk); } - _onChunkRead(chunk) { - this._offset += chunk.byteLength; - this._partitionSize += chunk.byteLength; - this._onChunk(chunk); - if (this.isFileEnd()) return; - if (this._isPartitionEnd()) { - this._onPartitionEnd(this._offset); - return; - } + _onChunkRead(chunk) {} + + _onBytesReceived(bytesReceived) {} + + _restartFromOffset(offset) { + this._bytesSent = offset; this._readChunk(); } - _restartFromOffset(offset) { - this._offset = offset; - this.nextPartition(); + _isFileEnd() { + return this._bytesSent >= this._file.size; + } +} + +class FileChunkerRTC extends FileChunker { + + constructor(file, onChunkCallback, peerConnection, dataChannel) { + super(file, onChunkCallback); + + this._chunkSize = peerConnection && peerConnection.sctp + ? Math.min(peerConnection.sctp.maxMessageSize, 1048576) // 1 MB max + : 262144; // 256 KB + + this._peerConnection = peerConnection; + this._dataChannel = dataChannel; + + this._highWatermark = 4194304; // 4 MB + this._lowWatermark = 1048576; // 1 MB + + // Set buffer threshold + this._dataChannel.bufferedAmountLowThreshold = this._lowWatermark; + this._dataChannel.addEventListener('bufferedamountlow', _ => this._readChunk()); } - repeatPartition() { - this._offset -= this._partitionSize; - this.nextPartition(); + _onChunkRead(chunk) { + this._currentlySending = false; + + this._onChunk(chunk); + this._bytesSent += chunk.byteLength; + + // Pause sending when reaching the high watermark or file end + if (this._dataChannel.bufferedAmount > this._highWatermark || this._isFileEnd()) return; + + this._readChunk(); } - _isPartitionEnd() { - return this._partitionSize >= this._maxPartitionSize; + _onBytesReceived(bytesReceived) { + this._bytesReceived = bytesReceived; + } +} + +class FileChunkerWS extends FileChunker { + + constructor(file, onChunkCallback) { + super(file, onChunkCallback); } - isFileEnd() { - return this._offset >= this._file.size; + _onChunkRead(chunk) { + this._currentlySending = false; + + this._onChunk(chunk); + this._bytesSent += chunk.byteLength; + + // if too many bytes sent without confirmation by receiver or if end of file -> abort + const bytesCurrentlySent = this._bytesSent - this._bytesReceived; + if (bytesCurrentlySent > this._maxBytesSentWithoutConfirmation - this._chunkSize || this._isFileEnd()) return; + + this._readChunk(); + } + + _onBytesReceived(bytesReceived) { + this._bytesReceived = bytesReceived; + this._readChunk(); } } class FileDigester { - constructor(meta, totalSize, totalBytesReceived, callback) { + constructor(meta, totalSize, totalBytesReceived, fileCompleteCallback, bytesReceivedCallback = null) { this._buffer = []; this._bytesReceived = 0; + this._bytesReceivedSinceLastTime = 0; + this._maxBytesWithoutConfirmation = 1048576; // 1 MB + this._bytesReceivedCallback = bytesReceivedCallback this._size = meta.size; this._name = meta.name; this._mime = meta.mime; this._totalSize = totalSize; this._totalBytesReceived = totalBytesReceived; - this._callback = callback; + this._onFileCompleteCallback = fileCompleteCallback; } unchunk(chunk) { this._buffer.push(chunk); this._bytesReceived += chunk.byteLength || chunk.size; + this._bytesReceivedSinceLastTime += chunk.byteLength || chunk.size; + + // If more than half of maxBytesWithoutConfirmation received -> request more + if (this._bytesReceivedCallback && 2 * this._bytesReceivedSinceLastTime > this._maxBytesWithoutConfirmation) { + this._bytesReceivedCallback(this._bytesReceived); + this._bytesReceivedSinceLastTime = 0; + } + this.progress = (this._totalBytesReceived + this._bytesReceived) / this._totalSize; if (isNaN(this.progress)) this.progress = 1 if (this._bytesReceived < this._size) return; + // we are done const blob = new Blob(this._buffer) this._buffer = null; - this._callback(new File([blob], this._name, { + this._onFileCompleteCallback(new File([blob], this._name, { type: this._mime, lastModified: new Date().getTime() })); diff --git a/server/ws-server.js b/server/ws-server.js index da24375..589c424 100644 --- a/server/ws-server.js +++ b/server/ws-server.js @@ -89,22 +89,12 @@ export default class PairDropWsServer { this._onLeavePublicRoom(sender); break; case 'signal': - this._signalAndRelay(sender, message); + this._signalAndWsRelay(sender, message); break; - case 'request': - case 'header': - case 'partition': - case 'partition-received': - case 'progress': - case 'files-transfer-response': - case 'file-transfer-complete': - case 'message-transfer-complete': - case 'text': - case 'display-name-changed': - case 'ws-chunk': + case 'ws-relay': // relay ws-fallback if (this._conf.wsFallback) { - this._signalAndRelay(sender, message); + this._signalAndWsRelay(sender, message); } else { console.log("Websocket fallback is not activated on this instance.") @@ -112,7 +102,7 @@ export default class PairDropWsServer { } } - _signalAndRelay(sender, message) { + _signalAndWsRelay(sender, message) { const room = message.roomType === 'ip' ? sender.ip : message.roomId;