From 63a1f078f4cbc36ed0c1b808e04ea714b7420d27 Mon Sep 17 00:00:00 2001 From: Richard Hansen Date: Mon, 29 Mar 2021 02:50:32 -0400 Subject: [PATCH] collab_client: Redo server message queueing Move server message queue processing out of `handleUserChanges()` for the following reasons: * Fix a race condition: Before this change the client would stop processing incoming messages and stop sending changes to the server if a `NEW_CHANGES` message arrived while the user was composing a character and waiting for an `ACCEPT_COMMIT` message. * Improve readability: The `handleUserChanges()` function is for handling changes from the local user, not for handling changes from other users. * Simplify the code. --- src/static/js/ace.js | 2 +- src/static/js/ace2_inner.js | 22 ++--- src/static/js/collab_client.js | 166 +++++++++++++-------------------- 3 files changed, 75 insertions(+), 115 deletions(-) diff --git a/src/static/js/ace.js b/src/static/js/ace.js index 059bac76e..0772648af 100644 --- a/src/static/js/ace.js +++ b/src/static/js/ace.js @@ -141,7 +141,7 @@ const Ace2Editor = function () { this.getDebugProperty = (prop) => info.ace_getDebugProperty(prop); this.getInInternationalComposition = - () => loaded ? info.ace_getInInternationalComposition() : false; + () => loaded ? info.ace_getInInternationalComposition() : null; // prepareUserChangeset: // Returns null if no new changes or ACE not ready. Otherwise, bundles up all user changes diff --git a/src/static/js/ace2_inner.js b/src/static/js/ace2_inner.js index d57773196..a8cc3cb98 100644 --- a/src/static/js/ace2_inner.js +++ b/src/static/js/ace2_inner.js @@ -3504,16 +3504,7 @@ function Ace2Inner(editorInfo, cssManagers) { const teardown = () => _teardownActions.forEach((a) => a()); - let inInternationalComposition = false; - const handleCompositionEvent = (evt) => { - // international input events, fired in FF3, at least; allow e.g. Japanese input - if (evt.type === 'compositionstart') { - inInternationalComposition = true; - } else if (evt.type === 'compositionend') { - inInternationalComposition = false; - } - }; - + let inInternationalComposition = null; editorInfo.ace_getInInternationalComposition = () => inInternationalComposition; const bindTheEventHandlers = () => { @@ -3602,8 +3593,15 @@ function Ace2Inner(editorInfo, cssManagers) { }); }); - $(document.documentElement).on('compositionstart', handleCompositionEvent); - $(document.documentElement).on('compositionend', handleCompositionEvent); + $(document.documentElement).on('compositionstart', () => { + if (inInternationalComposition) return; + inInternationalComposition = new Promise((resolve) => { + $(document.documentElement).one('compositionend', () => { + inInternationalComposition = null; + resolve(); + }); + }); + }); }; const topLevel = (n) => { diff --git a/src/static/js/collab_client.js b/src/static/js/collab_client.js index 680c293b9..a9c289ee3 100644 --- a/src/static/js/collab_client.js +++ b/src/static/js/collab_client.js @@ -50,8 +50,6 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) const userSet = {}; // userId -> userInfo userSet[userId] = initialUserInfo; - const msgQueue = []; - let isPendingRevision = false; const callbacks = { @@ -75,7 +73,11 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) } const handleUserChanges = () => { - if (editor.getInInternationalComposition()) return; + if (editor.getInInternationalComposition()) { + // handleUserChanges() will be called again once composition ends so there's no need to set up + // a future call before returning. + return; + } const now = Date.now(); if ((!getSocket()) || channelState === 'CONNECTING') { if (channelState === 'CONNECTING' && (now - initialStartConnectTime) > 20000) { @@ -88,10 +90,10 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) } if (committing) { - if (msgQueue.length === 0 && (now - lastCommitTime) > 20000) { + if (now - lastCommitTime > 20000) { // a commit is taking too long setChannelState('DISCONNECTED', 'slowcommit'); - } else if (msgQueue.length === 0 && (now - lastCommitTime) > 5000) { + } else if (now - lastCommitTime > 5000) { callbacks.onConnectionTrouble('SLOW'); } else { // run again in a few seconds, to detect a disconnect @@ -106,27 +108,6 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) return; } - // apply msgQueue changeset. - if (msgQueue.length !== 0) { - let msg; - while ((msg = msgQueue.shift())) { - const newRev = msg.newRev; - rev = newRev; - if (msg.type === 'ACCEPT_COMMIT') { - acceptCommit(); - } else if (msg.type === 'NEW_CHANGES') { - const changeset = msg.changeset; - const author = (msg.author || ''); - const apool = msg.apool; - - editor.applyChangesToBase(changeset, author, apool); - } - } - if (isPendingRevision) { - setIsPendingRevision(false); - } - } - let sentMessage = false; // Check if there are any pending revisions to be received from server. // Allow only if there are no pending revisions to be received from server @@ -182,6 +163,21 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) }); }; + const serverMessageTaskQueue = new class { + constructor() { + this._promiseChain = Promise.resolve(); + } + + async enqueue(fn) { + const taskPromise = this._promiseChain.then(fn); + // Use .catch() to prevent rejections from halting the queue. + this._promiseChain = taskPromise.catch(() => {}); + // Do NOT do `return await this._promiseChain;` because the caller would not see an error if + // fn() throws/rejects (due to the .catch() added above). + return await taskPromise; + } + }(); + const handleMessageFromServer = (evt) => { if (!getSocket()) return; if (!evt.data) return; @@ -190,95 +186,61 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) const msg = wrapper.data; if (msg.type === 'NEW_CHANGES') { - const newRev = msg.newRev; - const changeset = msg.changeset; - const author = (msg.author || ''); - const apool = msg.apool; - - // When inInternationalComposition, msg pushed msgQueue. - if (msgQueue.length > 0 || editor.getInInternationalComposition()) { - const oldRev = msgQueue.length > 0 ? msgQueue[msgQueue.length - 1].newRev : rev; - if (newRev !== (oldRev + 1)) { - window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${oldRev + 1}`); + serverMessageTaskQueue.enqueue(async () => { + // Avoid updating the DOM while the user is composing a character. Notes about this `await`: + // * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not + // currently composing a character then execution will continue without error. + // * We assume that it is not possible for a new 'compositionstart' event to fire after + // the `await` but before the next line of code after the `await` (or, if it is + // possible, that the chances are so small or the consequences so minor that it's not + // worth addressing). + await editor.getInInternationalComposition(); + const {newRev, changeset, author = '', apool} = msg; + if (newRev !== (rev + 1)) { + window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); // setChannelState("DISCONNECTED", "badmessage_newchanges"); return; } - msgQueue.push(msg); - return; - } - - if (newRev !== (rev + 1)) { - window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); - // setChannelState("DISCONNECTED", "badmessage_newchanges"); - return; - } - rev = newRev; - - editor.applyChangesToBase(changeset, author, apool); + rev = newRev; + editor.applyChangesToBase(changeset, author, apool); + }); } else if (msg.type === 'ACCEPT_COMMIT') { - const newRev = msg.newRev; - if (msgQueue.length > 0) { - if (newRev !== (msgQueue[msgQueue.length - 1].newRev + 1)) { - window.console.warn('bad message revision on ACCEPT_COMMIT: ' + - `${newRev} not ${msgQueue[msgQueue.length - 1][0] + 1}`); + serverMessageTaskQueue.enqueue(() => { + const newRev = msg.newRev; + if (newRev !== (rev + 1)) { + window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${rev + 1}`); // setChannelState("DISCONNECTED", "badmessage_acceptcommit"); return; } - msgQueue.push(msg); - return; - } - - if (newRev !== (rev + 1)) { - window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${rev + 1}`); - // setChannelState("DISCONNECTED", "badmessage_acceptcommit"); - return; - } - rev = newRev; - acceptCommit(); + rev = newRev; + acceptCommit(); + }); } else if (msg.type === 'CLIENT_RECONNECT') { // Server sends a CLIENT_RECONNECT message when there is a client reconnect. // Server also returns all pending revisions along with this CLIENT_RECONNECT message - if (msg.noChanges) { - // If no revisions are pending, just make everything normal - setIsPendingRevision(false); - return; - } - - const headRev = msg.headRev; - const newRev = msg.newRev; - const changeset = msg.changeset; - const author = (msg.author || ''); - const apool = msg.apool; - - if (msgQueue.length > 0) { - if (newRev !== (msgQueue[msgQueue.length - 1].newRev + 1)) { - window.console.warn('bad message revision on CLIENT_RECONNECT: ' + - `${newRev} not ${msgQueue[msgQueue.length - 1][0] + 1}`); + serverMessageTaskQueue.enqueue(() => { + if (msg.noChanges) { + // If no revisions are pending, just make everything normal + setIsPendingRevision(false); + return; + } + const {headRev, newRev, changeset, author = '', apool} = msg; + if (newRev !== (rev + 1)) { + window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${rev + 1}`); // setChannelState("DISCONNECTED", "badmessage_acceptcommit"); return; } - msg.type = 'NEW_CHANGES'; - msgQueue.push(msg); - return; - } - - if (newRev !== (rev + 1)) { - window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${rev + 1}`); - // setChannelState("DISCONNECTED", "badmessage_acceptcommit"); - return; - } - - rev = newRev; - if (author === pad.getUserId()) { - acceptCommit(); - } else { - editor.applyChangesToBase(changeset, author, apool); - } - - if (newRev === headRev) { - // Once we have applied all pending revisions, make everything normal - setIsPendingRevision(false); - } + rev = newRev; + if (author === pad.getUserId()) { + acceptCommit(); + } else { + editor.applyChangesToBase(changeset, author, apool); + } + if (newRev === headRev) { + // Once we have applied all pending revisions, make everything normal + setIsPendingRevision(false); + } + }); } else if (msg.type === 'NO_COMMIT_PENDING') { if (committing) { // server missed our commit message; abort that commit