collab_client: Use class syntax to improve readability

This commit is contained in:
Richard Hansen 2021-04-01 02:40:06 -04:00
parent 5f80c3f3c9
commit e5375fe425

View file

@ -53,150 +53,160 @@ class TaskQueue {
} }
} }
/** Call this when the document is ready, and a new Ace2Editor() has been created and inited. class CollabClient {
ACE's ready callback does not need to have fired yet. constructor(ace2editor, serverVars, initialUserInfo, pad) {
"serverVars" are from calling doc.getCollabClientVars() on the server. */ this.commitDelay = 500; // public so that tests can be sped up
const getCollabClient = (ace2editor, serverVars, initialUserInfo, pad) => {
const editor = ace2editor;
let rev = serverVars.rev; this._editor = ace2editor;
let committing = false; this._initialUserInfo = initialUserInfo;
let stateMessage; this._pad = pad;
let channelState = 'CONNECTING';
let lastCommitTime = 0;
let startConnectTime = Date.now();
let commitDelay = 500;
const userId = initialUserInfo.userId; this._rev = serverVars.rev;
// var socket; this._committing = false;
const userSet = {}; // userId -> userInfo this._stateMessage;
userSet[userId] = initialUserInfo; this._channelState = 'CONNECTING';
this._lastCommitTime = 0;
this._startConnectTime = Date.now();
let isPendingRevision = false; this._userId = this._initialUserInfo.userId;
this._userSet = {}; // userId -> userInfo
this._userSet[this._userId] = this._initialUserInfo;
const callbacks = { this._isPendingRevision = false;
onUserJoin: () => {},
onUserLeave: () => {},
onUpdateUserInfo: () => {},
onChannelStateChange: () => {},
onClientMessage: () => {},
onInternalAction: () => {},
onConnectionTrouble: () => {},
onServerMessage: () => {},
};
// We need to present a working interface even before the socket is connected for the first time. this._callbacks = {
// Use a Gate to block actions until connected. Once connected, the Gate is opened which causes onUserJoin: () => {},
// post-connect actions to start running. onUserLeave: () => {},
let connectedGate = new Gate(); onUpdateUserInfo: () => {},
onChannelStateChange: () => {},
onClientMessage: () => {},
onInternalAction: () => {},
onConnectionTrouble: () => {},
onServerMessage: () => {},
};
if (browser.firefox) { // We need to present a working interface even before the socket is connected for the first
// Prevent "escape" from taking effect and canceling a comet connection; // time. Use a Gate to block actions until connected. Once connected, the Gate is opened which
// doesn't work if focus is on an iframe. // causes post-connect actions to start running.
$(window).bind('keydown', (evt) => { this._connectedGate = new Gate();
if (evt.which === 27) {
evt.preventDefault(); if (browser.firefox) {
} // Prevent "escape" from taking effect and canceling a comet connection;
}); // doesn't work if focus is on an iframe.
$(window).bind('keydown', (evt) => {
if (evt.which === 27) {
evt.preventDefault();
}
});
}
this._serverMessageTaskQueue = new TaskQueue();
this._idleFuncs = [];
this.addHistoricalAuthors(serverVars.historicalAuthorData);
this._tellAceActiveAuthorInfo(this._initialUserInfo);
this._editor.setProperty('userAuthor', this._userId);
this._editor.setBaseAttributedText(serverVars.initialAttributedText, serverVars.apool);
this._editor.setUserChangeNotificationCallback(() => this._handleUserChanges());
} }
const handleUserChanges = () => { _handleUserChanges() {
if (editor.getInInternationalComposition()) { if (this._editor.getInInternationalComposition()) {
// handleUserChanges() will be called again once composition ends so there's no need to set up // _handleUserChanges() will be called again once composition ends so there's no need to set
// a future call before returning. // up a future call before returning.
return; return;
} }
const now = Date.now(); const now = Date.now();
const connecting = ['CONNECTING', 'RECONNECTING'].includes(channelState); const connecting = ['CONNECTING', 'RECONNECTING'].includes(this._channelState);
if (!pad.socket || connecting) { if (!this._pad.socket || connecting) {
if (connecting && now - startConnectTime > 20000) { if (connecting && now - this._startConnectTime > 20000) {
setChannelState('DISCONNECTED', 'initsocketfail'); this.setChannelState('DISCONNECTED', 'initsocketfail');
} else { } else {
// check again in a bit // check again in a bit
setTimeout(handleUserChanges, 1000); setTimeout(() => this._handleUserChanges(), 1000);
} }
return; return;
} }
if (committing) { if (this._committing) {
if (now - lastCommitTime > 20000) { if (now - this._lastCommitTime > 20000) {
// a commit is taking too long // a commit is taking too long
setChannelState('DISCONNECTED', 'slowcommit'); this.setChannelState('DISCONNECTED', 'slowcommit');
} else if (now - lastCommitTime > 5000) { } else if (now - this._lastCommitTime > 5000) {
callbacks.onConnectionTrouble('SLOW'); this._callbacks.onConnectionTrouble('SLOW');
} else { } else {
// run again in a few seconds, to detect a disconnect // run again in a few seconds, to detect a disconnect
setTimeout(handleUserChanges, 3000); setTimeout(() => this._handleUserChanges(), 3000);
} }
return; return;
} }
const earliestCommit = lastCommitTime + commitDelay; const earliestCommit = this._lastCommitTime + this.commitDelay;
if (now < earliestCommit) { if (now < earliestCommit) {
setTimeout(handleUserChanges, earliestCommit - now); setTimeout(() => this._handleUserChanges(), earliestCommit - now);
return; return;
} }
let sentMessage = false; let sentMessage = false;
// Check if there are any pending revisions to be received from server. // Check if there are any pending revisions to be received from server.
// Allow only if there are no pending revisions to be received from server // Allow only if there are no pending revisions to be received from server
if (!isPendingRevision) { if (!this._isPendingRevision) {
const userChangesData = editor.prepareUserChangeset(); const userChangesData = this._editor.prepareUserChangeset();
if (userChangesData.changeset) { if (userChangesData.changeset) {
lastCommitTime = now; this._lastCommitTime = now;
committing = true; this._committing = true;
stateMessage = { this._stateMessage = {
type: 'USER_CHANGES', type: 'USER_CHANGES',
baseRev: rev, baseRev: this._rev,
changeset: userChangesData.changeset, changeset: userChangesData.changeset,
apool: userChangesData.apool, apool: userChangesData.apool,
}; };
sendMessage(stateMessage); this.sendMessage(this._stateMessage);
sentMessage = true; sentMessage = true;
callbacks.onInternalAction('commitPerformed'); this._callbacks.onInternalAction('commitPerformed');
} }
} else { } else {
// run again in a few seconds, to check if there was a reconnection attempt // run again in a few seconds, to check if there was a reconnection attempt
setTimeout(handleUserChanges, 3000); setTimeout(() => this._handleUserChanges(), 3000);
} }
if (sentMessage) { if (sentMessage) {
// run again in a few seconds, to detect a disconnect // run again in a few seconds, to detect a disconnect
setTimeout(handleUserChanges, 3000); setTimeout(() => this._handleUserChanges(), 3000);
} }
}; }
const acceptCommit = () => { _acceptCommit() {
editor.applyPreparedChangesetToBase(); this._editor.applyPreparedChangesetToBase();
setStateIdle(); this.setStateIdle();
try { try {
callbacks.onInternalAction('commitAcceptedByServer'); this._callbacks.onInternalAction('commitAcceptedByServer');
callbacks.onConnectionTrouble('OK'); this._callbacks.onConnectionTrouble('OK');
} catch (err) { /* intentionally ignored */ } } catch (err) { /* intentionally ignored */ }
handleUserChanges(); this._handleUserChanges();
}; }
const sendMessage = async (msg) => { async sendMessage(msg) {
await connectedGate; await this._connectedGate;
pad.socket.json.send( this._pad.socket.json.send(
{ {
type: 'COLLABROOM', type: 'COLLABROOM',
component: 'pad', component: 'pad',
data: msg, data: msg,
}); });
}; }
const serverMessageTaskQueue = new TaskQueue(); handleMessageFromServer(evt) {
if (!this._pad.socket) return;
const handleMessageFromServer = (evt) => {
if (!pad.socket) return;
if (!evt.data) return; if (!evt.data) return;
const wrapper = evt; const wrapper = evt;
if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return; if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return;
const msg = wrapper.data; const msg = wrapper.data;
if (msg.type === 'NEW_CHANGES') { if (msg.type === 'NEW_CHANGES') {
serverMessageTaskQueue.enqueue(async () => { this._serverMessageTaskQueue.enqueue(async () => {
// Avoid updating the DOM while the user is composing a character. Notes about this `await`: // Avoid updating the DOM while the user is composing a character. Notes about this `await`:
// * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not // * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not
// currently composing a character then execution will continue without error. // currently composing a character then execution will continue without error.
@ -204,51 +214,54 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, pad) => {
// the `await` but before the next line of code after the `await` (or, if it is // the `await` but before the next line of code after the `await` (or, if it is
// possible, that the chances are so small or the consequences so minor that it's not // possible, that the chances are so small or the consequences so minor that it's not
// worth addressing). // worth addressing).
await editor.getInInternationalComposition(); await this._editor.getInInternationalComposition();
const {newRev, changeset, author = '', apool} = msg; const {newRev, changeset, author = '', apool} = msg;
if (newRev !== (rev + 1)) { const nextRev = this._rev + 1;
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); if (newRev !== nextRev) {
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${nextRev}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges"); // setChannelState("DISCONNECTED", "badmessage_newchanges");
return; return;
} }
rev = newRev; this._rev = newRev;
editor.applyChangesToBase(changeset, author, apool); this._editor.applyChangesToBase(changeset, author, apool);
}); });
} else if (msg.type === 'ACCEPT_COMMIT') { } else if (msg.type === 'ACCEPT_COMMIT') {
serverMessageTaskQueue.enqueue(() => { this._serverMessageTaskQueue.enqueue(() => {
const newRev = msg.newRev; const newRev = msg.newRev;
if (newRev !== (rev + 1)) { const nextRev = this._rev + 1;
window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${rev + 1}`); if (newRev !== nextRev) {
window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${nextRev}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit"); // setChannelState("DISCONNECTED", "badmessage_acceptcommit");
return; return;
} }
rev = newRev; this._rev = newRev;
acceptCommit(); this._acceptCommit();
}); });
} else if (msg.type === 'CLIENT_RECONNECT') { } else if (msg.type === 'CLIENT_RECONNECT') {
// Server sends a CLIENT_RECONNECT message when there is a client reconnect. // Server sends a CLIENT_RECONNECT message when there is a client reconnect.
// Server also returns all pending revisions along with this CLIENT_RECONNECT message // Server also returns all pending revisions along with this CLIENT_RECONNECT message
serverMessageTaskQueue.enqueue(() => { this._serverMessageTaskQueue.enqueue(() => {
if (msg.noChanges) { if (msg.noChanges) {
// If no revisions are pending, just make everything normal // If no revisions are pending, just make everything normal
setIsPendingRevision(false); this.setIsPendingRevision(false);
return; return;
} }
const {headRev, newRev, changeset, author = '', apool} = msg; const {headRev, newRev, changeset, author = '', apool} = msg;
if (newRev !== (rev + 1)) { const nextRev = this._rev + 1;
window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${rev + 1}`); if (newRev !== nextRev) {
window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${nextRev}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit"); // setChannelState("DISCONNECTED", "badmessage_acceptcommit");
return; return;
} }
rev = newRev; this._rev = newRev;
if (author === pad.getUserId()) { if (author === this._pad.getUserId()) {
acceptCommit(); this._acceptCommit();
} else { } else {
editor.applyChangesToBase(changeset, author, apool); this._editor.applyChangesToBase(changeset, author, apool);
} }
if (newRev === headRev) { if (newRev === headRev) {
// Once we have applied all pending revisions, make everything normal // Once we have applied all pending revisions, make everything normal
setIsPendingRevision(false); this.setIsPendingRevision(false);
} }
}); });
} else if (msg.type === 'USER_NEWINFO') { } else if (msg.type === 'USER_NEWINFO') {
@ -257,29 +270,29 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, pad) => {
// Avoid a race condition when setting colors. If our color was set by a // Avoid a race condition when setting colors. If our color was set by a
// query param, ignore our own "new user" message's color value. // query param, ignore our own "new user" message's color value.
if (id === initialUserInfo.userId && initialUserInfo.globalUserColor) { if (id === this._initialUserInfo.userId && this._initialUserInfo.globalUserColor) {
msg.userInfo.colorId = initialUserInfo.globalUserColor; msg.userInfo.colorId = this._initialUserInfo.globalUserColor;
} }
if (userSet[id]) { if (this._userSet[id]) {
userSet[id] = userInfo; this._userSet[id] = userInfo;
callbacks.onUpdateUserInfo(userInfo); this._callbacks.onUpdateUserInfo(userInfo);
} else { } else {
userSet[id] = userInfo; this._userSet[id] = userInfo;
callbacks.onUserJoin(userInfo); this._callbacks.onUserJoin(userInfo);
} }
tellAceActiveAuthorInfo(userInfo); this._tellAceActiveAuthorInfo(userInfo);
} else if (msg.type === 'USER_LEAVE') { } else if (msg.type === 'USER_LEAVE') {
const userInfo = msg.userInfo; const userInfo = msg.userInfo;
const id = userInfo.userId; const id = userInfo.userId;
if (userSet[id]) { if (this._userSet[id]) {
delete userSet[userInfo.userId]; delete this._userSet[userInfo.userId];
fadeAceAuthorInfo(userInfo); this._fadeAceAuthorInfo(userInfo);
callbacks.onUserLeave(userInfo); this._callbacks.onUserLeave(userInfo);
} }
} else if (msg.type === 'CLIENT_MESSAGE') { } else if (msg.type === 'CLIENT_MESSAGE') {
callbacks.onClientMessage(msg.payload); this._callbacks.onClientMessage(msg.payload);
} else if (msg.type === 'CHAT_MESSAGE') { } else if (msg.type === 'CHAT_MESSAGE') {
chat.addMessage(msg, true, false); chat.addMessage(msg, true, false);
} else if (msg.type === 'CHAT_MESSAGES') { } else if (msg.type === 'CHAT_MESSAGES') {
@ -315,179 +328,167 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, pad) => {
if (msg.type === 'NEW_CHANGES') msg.payload = msg; if (msg.type === 'NEW_CHANGES') msg.payload = msg;
hooks.callAll(`handleClientMessage_${msg.type}`, {payload: msg.payload}); hooks.callAll(`handleClientMessage_${msg.type}`, {payload: msg.payload});
}; }
const updateUserInfo = async (userInfo) => { async updateUserInfo(userInfo) {
await connectedGate; await this._connectedGate;
userInfo.userId = userId; userInfo.userId = this._userId;
userSet[userId] = userInfo; this._userSet[this._userId] = userInfo;
tellAceActiveAuthorInfo(userInfo); this._tellAceActiveAuthorInfo(userInfo);
if (!pad.socket) return; if (!this._pad.socket) return;
await sendMessage( await this.sendMessage(
{ {
type: 'USERINFO_UPDATE', type: 'USERINFO_UPDATE',
userInfo, userInfo,
}); });
}; }
const tellAceActiveAuthorInfo = (userInfo) => { _tellAceActiveAuthorInfo(userInfo) {
tellAceAuthorInfo(userInfo.userId, userInfo.colorId); this._tellAceAuthorInfo(userInfo.userId, userInfo.colorId);
}; }
const tellAceAuthorInfo = (userId, colorId, inactive) => { _tellAceAuthorInfo(userId, colorId, inactive) {
if (typeof colorId === 'number') { if (typeof colorId === 'number') {
colorId = clientVars.colorPalette[colorId]; colorId = clientVars.colorPalette[colorId];
} }
const cssColor = colorId; const cssColor = colorId;
if (inactive) { if (inactive) {
editor.setAuthorInfo(userId, { this._editor.setAuthorInfo(userId, {
bgcolor: cssColor, bgcolor: cssColor,
fade: 0.5, fade: 0.5,
}); });
} else { } else {
editor.setAuthorInfo(userId, { this._editor.setAuthorInfo(userId, {
bgcolor: cssColor, bgcolor: cssColor,
}); });
} }
}; }
const fadeAceAuthorInfo = (userInfo) => { _fadeAceAuthorInfo(userInfo) {
tellAceAuthorInfo(userInfo.userId, userInfo.colorId, true); this._tellAceAuthorInfo(userInfo.userId, userInfo.colorId, true);
}; }
const getConnectedUsers = () => Object.values(userSet); getConnectedUsers() {
return Object.values(this._userSet);
}
const addHistoricalAuthors = (hadata) => { addHistoricalAuthors(hadata) {
for (const [author, data] of Object.entries(hadata)) { for (const [author, data] of Object.entries(hadata)) {
if (!userSet[author]) { if (!this._userSet[author]) {
tellAceAuthorInfo(author, data.colorId, true); this._tellAceAuthorInfo(author, data.colorId, true);
} }
} }
}; }
const setChannelState = (newChannelState, moreInfo) => { setChannelState(newChannelState, moreInfo) {
if (newChannelState === channelState) return; if (newChannelState === this._channelState) return;
if (channelState === 'CONNECTED') { if (this._channelState === 'CONNECTED') {
// The old channel state is CONNECTED, which means we have just disconnected. Re-initialize // The old channel state is CONNECTED, which means we have just disconnected. Re-initialize
// connectedGate so that actions are deferred until connected again. Do this before calling // this._connectedGate so that actions are deferred until connected again. Do this before
// onChannelStateChange() so that the event handler can create deferred actions if desired. // calling onChannelStateChange() so that the event handler can create deferred actions if
connectedGate = new Gate(); // desired.
this._connectedGate = new Gate();
} }
channelState = newChannelState; this._channelState = newChannelState;
callbacks.onChannelStateChange(channelState, moreInfo); this._callbacks.onChannelStateChange(this._channelState, moreInfo);
switch (channelState) { switch (this._channelState) {
case 'CONNECTING': case 'CONNECTING':
case 'RECONNECTING': case 'RECONNECTING':
startConnectTime = Date.now(); this._startConnectTime = Date.now();
break; break;
case 'CONNECTED': case 'CONNECTED':
connectedGate.open(); this._connectedGate.open();
break; break;
} }
}; }
const sendClientMessage = async (msg) => { async sendClientMessage(msg) {
await sendMessage( await this.sendMessage(
{ {
type: 'CLIENT_MESSAGE', type: 'CLIENT_MESSAGE',
payload: msg, payload: msg,
}); });
}; }
const getCurrentRevisionNumber = () => rev; getCurrentRevisionNumber() {
return this._rev;
}
const getMissedChanges = () => { getMissedChanges() {
const obj = {}; const obj = {};
obj.userInfo = userSet[userId]; obj.userInfo = this._userSet[this._userId];
obj.baseRev = rev; obj.baseRev = this._rev;
if (committing && stateMessage) { if (this._committing && this._stateMessage) {
obj.committedChangeset = stateMessage.changeset; obj.committedChangeset = this._stateMessage.changeset;
obj.committedChangesetAPool = stateMessage.apool; obj.committedChangesetAPool = this._stateMessage.apool;
editor.applyPreparedChangesetToBase(); this._editor.applyPreparedChangesetToBase();
} }
const userChangesData = editor.prepareUserChangeset(); const userChangesData = this._editor.prepareUserChangeset();
if (userChangesData.changeset) { if (userChangesData.changeset) {
obj.furtherChangeset = userChangesData.changeset; obj.furtherChangeset = userChangesData.changeset;
obj.furtherChangesetAPool = userChangesData.apool; obj.furtherChangesetAPool = userChangesData.apool;
} }
return obj; return obj;
}; }
const setStateIdle = () => { setStateIdle() {
committing = false; this._committing = false;
callbacks.onInternalAction('newlyIdle'); this._callbacks.onInternalAction('newlyIdle');
schedulePerhapsCallIdleFuncs(); this._schedulePerhapsCallIdleFuncs();
}; }
const setIsPendingRevision = (value) => { setIsPendingRevision(value) {
isPendingRevision = value; this._isPendingRevision = value;
}; }
const idleFuncs = []; callWhenNotCommitting(func) {
this._idleFuncs.push(func);
this._schedulePerhapsCallIdleFuncs();
}
const callWhenNotCommitting = (func) => { _schedulePerhapsCallIdleFuncs() {
idleFuncs.push(func);
schedulePerhapsCallIdleFuncs();
};
const schedulePerhapsCallIdleFuncs = () => {
setTimeout(() => { setTimeout(() => {
if (!committing) { if (!this._committing) {
while (idleFuncs.length > 0) { while (this._idleFuncs.length > 0) {
const f = idleFuncs.shift(); const f = this._idleFuncs.shift();
f(); f();
} }
} }
}, 0); }, 0);
}; }
const self = { setOnUserJoin(cb) {
setOnUserJoin: (cb) => { this._callbacks.onUserJoin = cb;
callbacks.onUserJoin = cb; }
},
setOnUserLeave: (cb) => {
callbacks.onUserLeave = cb;
},
setOnUpdateUserInfo: (cb) => {
callbacks.onUpdateUserInfo = cb;
},
setOnChannelStateChange: (cb) => {
callbacks.onChannelStateChange = cb;
},
setOnClientMessage: (cb) => {
callbacks.onClientMessage = cb;
},
setOnInternalAction: (cb) => {
callbacks.onInternalAction = cb;
},
setOnConnectionTrouble: (cb) => {
callbacks.onConnectionTrouble = cb;
},
updateUserInfo,
handleMessageFromServer,
getConnectedUsers,
sendClientMessage,
sendMessage,
getCurrentRevisionNumber,
getMissedChanges,
callWhenNotCommitting,
addHistoricalAuthors,
setChannelState,
setStateIdle,
setIsPendingRevision,
set commitDelay(ms) { commitDelay = ms; },
get commitDelay() { return commitDelay; },
};
addHistoricalAuthors(serverVars.historicalAuthorData); setOnUserLeave(cb) {
tellAceActiveAuthorInfo(initialUserInfo); this._callbacks.onUserLeave = cb;
}
editor.setProperty('userAuthor', userId); setOnUpdateUserInfo(cb) {
editor.setBaseAttributedText(serverVars.initialAttributedText, serverVars.apool); this._callbacks.onUpdateUserInfo = cb;
editor.setUserChangeNotificationCallback(handleUserChanges); }
return self; setOnChannelStateChange(cb) {
}; this._callbacks.onChannelStateChange = cb;
}
exports.getCollabClient = getCollabClient; setOnClientMessage(cb) {
this._callbacks.onClientMessage = cb;
}
setOnInternalAction(cb) {
this._callbacks.onInternalAction = cb;
}
setOnConnectionTrouble(cb) {
this._callbacks.onConnectionTrouble = cb;
}
}
/** Call this when the document is ready, and a new Ace2Editor() has been created and inited.
ACE's ready callback does not need to have fired yet.
"serverVars" are from calling doc.getCollabClientVars() on the server. */
exports.getCollabClient = (ace2editor, serverVars, initialUserInfo, pad) => (
new CollabClient(ace2editor, serverVars, initialUserInfo, pad));