Implement new status 'connecting', automatic reconnect on disconnect and auto resume of transfer + sending of queued messages. (fixes #260 and #247)

This commit is contained in:
schlagmichdoch 2024-02-04 18:02:10 +01:00
parent b36105b1cf
commit f22abca783
3 changed files with 589 additions and 316 deletions

View file

@ -88,7 +88,9 @@ class ServerConnection {
_onOpen() {
console.log('WS: server connected');
Events.fire('ws-connected');
if (this._isReconnect) Events.fire('notify-user', Localization.getTranslation("notifications.connected"));
if (this._isReconnect) {
Events.fire('notify-user', Localization.getTranslation("notifications.connected"));
}
}
_onPairDeviceInitiate() {
@ -101,6 +103,7 @@ class ServerConnection {
_onPairDeviceJoin(pairKey) {
if (!this._isConnected()) {
// Todo: instead use pending outbound ws queue
setTimeout(() => this._onPairDeviceJoin(pairKey), 1000);
return;
}
@ -336,6 +339,10 @@ class Peer {
this._evaluateAutoAccept();
}
_setIsCaller(isCaller) {
this._isCaller = isCaller;
}
sendJSON(message) {
this._send(JSON.stringify(message));
}
@ -433,6 +440,14 @@ class Peer {
: false;
}
_onPeerConnected() {
if (this._digester) {
// Reconnection during receiving of file. Send request for restart
const offset = this._digester._bytesReceived;
this._requestResendFromOffset(offset);
}
}
async requestFileTransfer(files) {
let header = [];
let totalSize = 0;
@ -472,8 +487,8 @@ class Peer {
Events.fire('set-progress', {peerId: this._peerId, progress: 0, status: 'wait'})
}
async sendFiles() {
for (let i=0; i<this._filesRequested.length; i++) {
sendFiles() {
for (let i = 0; i < this._filesRequested.length; i++) {
this._filesQueue.push(this._filesRequested[i]);
}
this._filesRequested = null
@ -487,7 +502,7 @@ class Peer {
this._sendFile(file);
}
async _sendFile(file) {
_sendFile(file) {
this.sendJSON({
type: 'header',
size: file.size,
@ -508,11 +523,21 @@ class Peer {
this.sendJSON({ type: 'partition-received', offset: offset });
}
_requestResendFromOffset(offset) {
this.sendJSON({ type: 'request-resend-from-offset', offset: offset });
}
_sendNextPartition() {
if (!this._chunker || this._chunker.isFileEnd()) return;
this._chunker.nextPartition();
}
_onRequestResendFromOffset(offset) {
console.log("Restart requested from offset:", offset)
if (!this._chunker) return;
this._chunker._restartFromOffset(offset);
}
_sendProgress(progress) {
this.sendJSON({ type: 'progress', progress: progress });
}
@ -522,25 +547,35 @@ class Peer {
this._onChunkReceived(message);
return;
}
const messageJSON = JSON.parse(message);
switch (messageJSON.type) {
try {
message = JSON.parse(message);
} catch (e) {
console.warn("Peer: Received JSON is malformed");
return;
}
switch (message.type) {
case 'request':
this._onFilesTransferRequest(messageJSON);
this._onFilesTransferRequest(message);
break;
case 'header':
this._onFileHeader(messageJSON);
this._onFileHeader(message);
break;
case 'partition':
this._onReceivedPartitionEnd(messageJSON);
this._onReceivedPartitionEnd(message);
break;
case 'partition-received':
this._sendNextPartition();
break;
case 'progress':
this._onDownloadProgress(messageJSON.progress);
this._onProgress(message.progress);
break;
case 'request-resend-from-offset':
this._onRequestResendFromOffset(message.offset);
break;
case 'files-transfer-response':
this._onFileTransferRequestResponded(messageJSON);
this._onFileTransferRequestResponded(message);
break;
case 'file-transfer-complete':
this._onFileTransferCompleted();
@ -549,10 +584,10 @@ class Peer {
this._onMessageTransferCompleted();
break;
case 'text':
this._onTextReceived(messageJSON);
this._onTextReceived(message);
break;
case 'display-name-changed':
this._onDisplayNameChanged(messageJSON);
this._onDisplayNameChanged(message);
break;
}
}
@ -620,21 +655,28 @@ class Peer {
if(!this._digester || !(chunk.byteLength || chunk.size)) return;
this._digester.unchunk(chunk);
const progress = this._digester.progress;
if (progress > 1) {
this._abortTransfer();
return;
}
this._onDownloadProgress(progress);
if (progress === 1) {
this._digester = null;
}
Events.fire('set-progress', {peerId: this._peerId, progress: progress, status: 'receive'});
// occasionally notify sender about our progress
if (progress - this._lastProgress < 0.005 && progress !== 1) return;
this._lastProgress = progress;
this._sendProgress(progress);
if (progress - this._lastProgress >= 0.005 || progress === 1) {
this._lastProgress = progress;
this._sendProgress(progress);
}
}
_onDownloadProgress(progress) {
_onProgress(progress) {
Events.fire('set-progress', {peerId: this._peerId, progress: progress, status: 'transfer'});
}
@ -654,25 +696,33 @@ class Peer {
Events.fire('file-received', fileBlob);
this._filesReceived.push(fileBlob);
if (!this._requestAccepted.header.length) {
this._busy = false;
Events.fire('set-progress', {peerId: this._peerId, progress: 0, status: 'process'});
Events.fire('files-received', {peerId: this._peerId, files: this._filesReceived, imagesOnly: this._requestAccepted.imagesOnly, totalSize: this._requestAccepted.totalSize});
this._filesReceived = [];
this._requestAccepted = null;
}
if (this._requestAccepted.header.length) return;
// We are done receiving
this._busy = false;
Events.fire('set-progress', {peerId: this._peerId, progress: 0, status: 'process'});
Events.fire('files-received', {
peerId: this._peerId,
files: this._filesReceived,
imagesOnly: this._requestAccepted.imagesOnly,
totalSize: this._requestAccepted.totalSize
});
this._filesReceived = [];
this._requestAccepted = null;
}
_onFileTransferCompleted() {
this._chunker = null;
if (!this._filesQueue.length) {
this._busy = false;
Events.fire('notify-user', Localization.getTranslation("notifications.file-transfer-completed"));
Events.fire('files-sent'); // used by 'Snapdrop & PairDrop for Android' app
}
else {
if (this._filesQueue.length) {
this._dequeueFile();
return;
}
// No more files in queue. Transfer is complete
this._busy = false;
Events.fire('notify-user', Localization.getTranslation("notifications.file-transfer-completed"));
Events.fire('files-sent'); // used by 'Snapdrop & PairDrop for Android' app
}
_onFileTransferRequestResponded(message) {
@ -725,99 +775,293 @@ class RTCPeer extends Peer {
super(serverConnection, isCaller, peerId, roomType, roomId);
this.rtcSupported = true;
this.rtcConfig = rtcConfig
this.rtcConfig = rtcConfig;
this.pendingInboundMessages = [];
this.pendingOutboundMessages = [];
Events.on('beforeunload', e => this._onBeforeUnload(e));
Events.on('pagehide', _ => this._onPageHide());
if (!this._isCaller) return; // we will listen for a caller
this._connect();
}
_connect() {
if (!this._conn || this._conn.signalingState === "closed") this._openConnection();
_isConnected() {
return this._conn && this._conn.connectionState === 'connected';
}
if (this._isCaller) {
this._openChannel();
}
else {
this._conn.ondatachannel = e => this._onChannelOpened(e);
}
_isConnecting() {
return this._conn
&& (
this._conn.connectionState === 'new'
|| this._conn.connectionState === 'connecting'
);
}
_isChannelOpen() {
return this._channel && this._channel.readyState === 'open';
}
_isChannelConnecting() {
return this._channel && this._channel.readyState === 'connecting';
}
_isStable() {
return this._isChannelOpen() && this._isConnected();
}
_connect() {
if (this._isStable()) return;
Events.fire('peer-connecting', this._peerId);
this._openConnection();
// TOdo: one channel for messages - one for data?
this._openChannel();
}
_openConnection() {
this._conn = new RTCPeerConnection(this.rtcConfig);
this._conn.onicecandidate = e => this._onIceCandidate(e);
this._conn.onicecandidateerror = e => this._onError(e);
this._conn.onconnectionstatechange = _ => this._onConnectionStateChange();
this._conn.oniceconnectionstatechange = e => this._onIceConnectionStateChange(e);
const conn = new RTCPeerConnection(this.rtcConfig);
conn.onnegotiationneeded = _ => this._onNegotiationNeeded();
conn.onsignalingstatechange = _ => this._onSignalingStateChanged();
conn.oniceconnectionstatechange = _ => this._onIceConnectionStateChange();
conn.onicegatheringstatechange = _ => this._onIceGatheringStateChanged();
conn.onconnectionstatechange = _ => this._onConnectionStateChange();
conn.onicecandidate = e => this._onIceCandidate(e);
conn.onicecandidateerror = e => this._onIceCandidateError(e);
this._conn = conn;
this._evaluatePendingInboundMessages()
.then((count) => {
if (count) {
console.log("Pending inbound messages evaluated.");
}
});
}
_openChannel() {
if (!this._conn) return;
async _onNegotiationNeeded() {
console.log('RTC: Negotiation needed');
const channel = this._conn.createDataChannel('data-channel', {
ordered: true,
reliable: true // Obsolete. See https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/reliable
});
channel.onopen = e => this._onChannelOpened(e);
channel.onerror = e => this._onError(e);
this._conn
.createOffer()
.then(d => this._onDescription(d))
.catch(e => this._onError(e));
if (this._isCaller) {
// Creating offer if required
console.log('RTC: Creating offer');
const description = await this._conn.createOffer();
await this._handleLocalDescription(description);
}
}
_onDescription(description) {
// description.sdp = description.sdp.replace('b=AS:30', 'b=AS:1638400');
this._conn
.setLocalDescription(description)
.then(_ => this._sendSignal({ sdp: description }))
.catch(e => this._onError(e));
_onSignalingStateChanged() {
console.log('RTC: Signaling state changed:', this._conn.signalingState);
}
_onIceConnectionStateChange() {
console.log('RTC: ICE connection state changed:', this._conn.iceConnectionState);
}
_onIceGatheringStateChanged() {
console.log('RTC: ICE gathering state changed:', this._conn.iceConnectionState);
}
_onConnectionStateChange() {
console.log('RTC: Connection state changed:', this._conn.connectionState);
switch (this._conn.connectionState) {
case 'disconnected':
this._refresh();
break;
case 'failed':
console.warn('RTC connection failed');
// TOdo: implement ws fallback as real fallback
this._refresh();
}
}
_onIceCandidate(event) {
if (!event.candidate) return;
this._sendSignal({ ice: event.candidate });
this._handleLocalCandidate(event.candidate);
}
onServerMessage(message) {
if (!this._conn) this._connect();
if (message.sdp) {
this._conn
.setRemoteDescription(message.sdp)
.then(_ => {
if (message.sdp.type === 'offer') {
return this._conn
.createAnswer()
.then(d => this._onDescription(d));
}
})
.catch(e => this._onError(e));
}
else if (message.ice) {
this._conn
.addIceCandidate(new RTCIceCandidate(message.ice))
.catch(e => this._onError(e));
}
_onIceCandidateError(error) {
console.error(error);
}
_onChannelOpened(event) {
console.log('RTC: channel opened with', this._peerId);
const channel = event.channel || event.target;
_openChannel() {
const channel = this._conn.createDataChannel('data-channel', {
ordered: true,
negotiated: true,
id: 0
});
channel.binaryType = 'arraybuffer';
channel.onmessage = e => this._onMessage(e.data);
channel.onopen = _ => this._onChannelOpened();
channel.onclose = _ => this._onChannelClosed();
channel.onerror = e => this._onChannelError(e);
channel.onmessage = e => this._onMessage(e.data);
this._channel = channel;
Events.on('beforeunload', e => this._onBeforeUnload(e));
Events.on('pagehide', _ => this._onPageHide());
Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()});
}
_onMessage(message) {
if (typeof message === 'string') {
console.log('RTC:', JSON.parse(message));
_onChannelOpened() {
console.log('RTC: Channel opened with', this._peerId);
console.debug(this.getConnectionHash())
console.debug(this._conn)
console.debug(this._channel)
Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()});
super._onPeerConnected();
while (this._isChannelOpen() && this.pendingOutboundMessages.length > 0) {
this._sendViaChannel(this.pendingOutboundMessages.shift());
}
super._onMessage(message);
}
_onChannelClosed() {
console.log('RTC: Channel closed', this._peerId);
this._refresh();
}
_onChannelError(error) {
console.error(error);
}
async _handleLocalDescription(localDescription) {
await this._conn.setLocalDescription(localDescription);
console.log("RTC: Sending local description");
this._sendSignal({ signalType: 'description', description: localDescription });
}
async _handleRemoteDescription(remoteDescription) {
console.log("RTC: Received remote description");
await this._conn.setRemoteDescription(remoteDescription);
if (!this._isCaller) {
// Creating answer if required
console.log('RTC: Creating answer');
const localDescription = await this._conn.createAnswer();
await this._handleLocalDescription(localDescription);
}
}
_handleLocalCandidate(candidate) {
console.log("RTC: Sending local candidate");
this._sendSignal({ signalType: 'candidate', candidate: candidate });
if (candidate === null) {
this.localIceCandidatesSent = true;
}
}
async _handleRemoteCandidate(candidate) {
console.log("RTC: Received remote candidate");
if (candidate !== null) {
await this._conn.addIceCandidate(candidate);
}
else {
this.remoteIceCandidatesReceived = true;
}
}
async _evaluatePendingInboundMessages() {
let inboundMessagesEvaluatedCount = 0;
while (this.pendingInboundMessages.length > 0) {
const message = this.pendingInboundMessages.shift();
console.log("Evaluate pending inbound message:", message);
await this.onServerMessage(message);
inboundMessagesEvaluatedCount++;
}
return inboundMessagesEvaluatedCount;
}
async onServerMessage(message) {
if (this._conn === null) {
console.debug("Not ready yet. Pending needed indeed?")
this.pendingInboundMessages.push(message);
return;
}
switch (message.signalType) {
case 'description':
await this._handleRemoteDescription(message.description);
break;
case 'candidate':
await this._handleRemoteCandidate(message.candidate);
break;
default:
console.warn(this.name, 'Unknown message type:', message.type);
break;
}
}
_disconnect() {
Events.fire('peer-disconnected', this._peerId);
}
_refresh() {
Events.fire('peer-connecting', this._peerId);
this._closeChannelAndConnection();
this._connect(); // reopen the channel
}
_closeChannelAndConnection() {
if (this._channel) {
this._channel.onopen = null;
this._channel.onclose = null;
this._channel.onerror = null;
this._channel.onmessage = null;
this._channel.close();
this._channel = null;
}
if (this._conn) {
this._conn.onnegotiationneeded = null;
this._conn.onsignalingstatechange = null;
this._conn.oniceconnectionstatechange = null;
this._conn.onicegatheringstatechange = null;
this._conn.onconnectionstatechange = null;
this._conn.onicecandidate = null;
this._conn.onicecandidateerror = null;
this._conn.close();
this._conn = null;
}
this.localIceCandidatesSent = false;
this.remoteIceCandidatesReceived = false;
}
_onBeforeUnload(e) {
if (this._busy) {
e.preventDefault();
return Localization.getTranslation("notifications.unfinished-transfers-warning");
}
}
_onPageHide() {
this._disconnect();
}
_send(message) {
// Todo: if channel or connection is closed or disconnected: do not send
// put messages in queue and send after reconnection.
// this._pendingMessages[];
if (!this._isStable() || this.pendingOutboundMessages.length > 0) {
// queue messages if not connected OR if connected AND queue is not empty
this.pendingOutboundMessages.push(message);
return;
}
this._sendViaChannel(message);
}
_sendViaChannel(message) {
this._channel.send(message);
}
_sendSignal(message) {
message.type = 'signal';
message.to = this._peerId;
message.roomType = this._getRoomTypes()[0];
message.roomId = this._roomIds[this._getRoomTypes()[0]];
this._server.send(message);
}
sendDisplayName(displayName) {
super.sendDisplayName(displayName);
}
getConnectionHash() {
@ -886,54 +1130,12 @@ class RTCPeer extends Peer {
}
}
_onIceConnectionStateChange() {
switch (this._conn.iceConnectionState) {
case 'failed':
this._onError('ICE Gathering failed');
break;
default:
console.log('ICE Gathering', this._conn.iceConnectionState);
_onMessage(message) {
if (typeof message === 'string') {
// Todo: Test speed increase without prints? --> print only on debug mode via URL argument `?debug_mode=true`
console.log('RTC:', JSON.parse(message));
}
}
_onError(error) {
console.error(error);
}
_send(message) {
if (!this._channel) this.refresh();
this._channel.send(message);
}
_sendSignal(signal) {
signal.type = 'signal';
signal.to = this._peerId;
signal.roomType = this._getRoomTypes()[0];
signal.roomId = this._roomIds[this._getRoomTypes()[0]];
this._server.send(signal);
}
refresh() {
// check if channel is open. otherwise create one
if (this._isConnected() || this._isConnecting()) return;
// only reconnect if peer is caller
if (!this._isCaller) return;
this._connect();
}
_isConnected() {
return this._channel && this._channel.readyState === 'open';
}
_isConnecting() {
return this._channel && this._channel.readyState === 'connecting';
}
sendDisplayName(displayName) {
if (!this._isConnected()) return;
super.sendDisplayName(displayName);
super._onMessage(message);
}
}
@ -1020,9 +1222,7 @@ class PeersManager {
this.peers[peerId].onServerMessage(message);
}
_refreshPeer(peerId, roomType, roomId) {
if (!this._peerExists(peerId)) return false;
_refreshPeer(isCaller, peerId, roomType, roomId) {
const peer = this.peers[peerId];
const roomTypesDiffer = Object.keys(peer._roomIds)[0] !== roomType;
const roomIdsDiffer = peer._roomIds[roomType] !== roomId;
@ -1036,17 +1236,22 @@ class PeersManager {
return true;
}
peer.refresh();
// reconnect peer - caller/waiter might be switched
peer._setIsCaller(isCaller);
peer._refresh();
return true;
}
_createOrRefreshPeer(isCaller, peerId, roomType, roomId, rtcSupported) {
if (this._peerExists(peerId)) {
this._refreshPeer(peerId, roomType, roomId);
return;
this._refreshPeer(isCaller, peerId, roomType, roomId);
} else {
this.createPeer(isCaller, peerId, roomType, roomId, rtcSupported);
}
}
createPeer(isCaller, peerId, roomType, roomId, rtcSupported) {
if (window.isRtcSupported && rtcSupported) {
this.peers[peerId] = new RTCPeer(this._server, isCaller, peerId, roomType, roomId, this._wsConfig.rtcConfig);
}
@ -1091,7 +1296,7 @@ class PeersManager {
}
_onPeerLeft(message) {
if (this._peerExists(message.peerId) && this._webRtcSupported(message.peerId)) {
if (this._peerExists(message.peerId) && !this._webRtcSupported(message.peerId)) {
console.log('WSPeer left:', message.peerId);
}
if (message.disconnect === true) {
@ -1136,11 +1341,10 @@ class PeersManager {
_onPeerDisconnected(peerId) {
const peer = this.peers[peerId];
delete this.peers[peerId];
if (!peer || !peer._conn) return;
if (peer._channel) peer._channel.onclose = null;
peer._conn.close();
peer._busy = false;
peer._roomIds = {};
if (!peer) return;
peer._closeChannelAndConnection();
}
_onRoomSecretsDeleted(roomSecrets) {
@ -1268,6 +1472,11 @@ class FileChunker {
this._readChunk();
}
_restartFromOffset(offset) {
this._offset = offset;
this.nextPartition();
}
repeatPartition() {
this._offset -= this._partitionSize;
this.nextPartition();