PadMessageHandler.js: convert handleUserChanges() to Promises

- the call site still expects a nodeback function, so also introduced the
  `nodeify` module to allow that function to work as expected.
This commit is contained in:
Ray Bellis 2019-01-30 10:43:01 +00:00
parent c499a08030
commit d543d5ae6a
2 changed files with 109 additions and 146 deletions

View file

@ -39,6 +39,7 @@ var channels = require("channels");
var stats = require('../stats'); var stats = require('../stats');
var remoteAddress = require("../utils/RemoteAddress").remoteAddress; var remoteAddress = require("../utils/RemoteAddress").remoteAddress;
const thenify = require("thenify").withCallback; const thenify = require("thenify").withCallback;
const nodeify = require("nodeify");
/** /**
* A associative array that saves informations about a session * A associative array that saves informations about a session
@ -61,7 +62,11 @@ stats.gauge('totalUsers', function() {
/** /**
* A changeset queue per pad that is processed by handleUserChanges() * A changeset queue per pad that is processed by handleUserChanges()
*/ */
var padChannels = new channels.channels(thenify(handleUserChanges)); var padChannels = new channels.channels(handleUserChangesCB);
function handleUserChangesCB(data, callback) {
return nodeify(handleUserChanges(data), callback);
}
/** /**
* Saves the Socket class we need to send and receive data from the client * Saves the Socket class we need to send and receive data from the client
@ -591,7 +596,7 @@ function handleUserInfoUpdate(client, message)
* @param client the client that send this message * @param client the client that send this message
* @param message the message from the client * @param message the message from the client
*/ */
function handleUserChanges(data, cb) async function handleUserChanges(data)
{ {
var client = data.client var client = data.client
, message = data.message , message = data.message
@ -602,17 +607,17 @@ function handleUserChanges(data, cb)
// Make sure all required fields are present // Make sure all required fields are present
if (message.data.baseRev == null) { if (message.data.baseRev == null) {
messageLogger.warn("Dropped message, USER_CHANGES Message has no baseRev!"); messageLogger.warn("Dropped message, USER_CHANGES Message has no baseRev!");
return cb(); return;
} }
if (message.data.apool == null) { if (message.data.apool == null) {
messageLogger.warn("Dropped message, USER_CHANGES Message has no apool!"); messageLogger.warn("Dropped message, USER_CHANGES Message has no apool!");
return cb(); return;
} }
if (message.data.changeset == null) { if (message.data.changeset == null) {
messageLogger.warn("Dropped message, USER_CHANGES Message has no changeset!"); messageLogger.warn("Dropped message, USER_CHANGES Message has no changeset!");
return cb(); return;
} }
// TODO: this might happen with other messages too => find one place to copy the session // TODO: this might happen with other messages too => find one place to copy the session
@ -620,7 +625,7 @@ function handleUserChanges(data, cb)
// if the session was valid when the message arrived in the first place // if the session was valid when the message arrived in the first place
if (!sessioninfos[client.id]) { if (!sessioninfos[client.id]) {
messageLogger.warn("Dropped message, disconnect happened in the mean time"); messageLogger.warn("Dropped message, disconnect happened in the mean time");
return cb(); return;
} }
// get all Vars we need // get all Vars we need
@ -632,26 +637,14 @@ function handleUserChanges(data, cb)
// finish processing the changeset, so keep a reference to the session. // finish processing the changeset, so keep a reference to the session.
var thisSession = sessioninfos[client.id]; var thisSession = sessioninfos[client.id];
var r, apool, pad;
// Measure time to process edit // Measure time to process edit
var stopWatch = stats.timer('edits').start(); var stopWatch = stats.timer('edits').start();
async.series([
// get the pad // get the pad
function(callback) { let pad = await padManager.getPad(thisSession.padId);
padManager.getPad(thisSession.padId, function(err, value) {
if (ERR(err, callback)) return;
pad = value;
callback();
});
},
// create the changeset // create the changeset
function(callback) { try {
// ex. _checkChangesetAndPool
try { try {
// Verify that the changeset has valid syntax and is in canonical form // Verify that the changeset has valid syntax and is in canonical form
Changeset.checkRep(changeset); Changeset.checkRep(changeset);
@ -666,7 +659,8 @@ function handleUserChanges(data, cb)
// Validate all added 'author' attribs to be the same value as the current user // Validate all added 'author' attribs to be the same value as the current user
var iterator = Changeset.opIterator(Changeset.unpack(changeset).ops) var iterator = Changeset.opIterator(Changeset.unpack(changeset).ops)
, op , op;
while (iterator.hasNext()) { while (iterator.hasNext()) {
op = iterator.next() op = iterator.next()
@ -677,7 +671,7 @@ function handleUserChanges(data, cb)
op.attribs.split('*').forEach(function(attr) { op.attribs.split('*').forEach(function(attr) {
if (!attr) return; if (!attr) return;
attr = wireApool.getAttrib(attr) attr = wireApool.getAttrib(attr);
if (!attr) return; if (!attr) return;
// the empty author is used in the clearAuthorship functionality so this should be the only exception // the empty author is used in the clearAuthorship functionality so this should be the only exception
@ -691,34 +685,31 @@ function handleUserChanges(data, cb)
// Afaik, it copies the new attributes from the changeset, to the global Attribute Pool // Afaik, it copies the new attributes from the changeset, to the global Attribute Pool
changeset = Changeset.moveOpsToNewPool(changeset, wireApool, pad.pool); changeset = Changeset.moveOpsToNewPool(changeset, wireApool, pad.pool);
} catch(e) { } catch(e) {
// There is an error in this changeset, so just refuse it // There is an error in this changeset, so just refuse it
client.json.send({ disconnect: "badChangeset" }); client.json.send({ disconnect: "badChangeset" });
stats.meter('failedChangesets').mark(); stats.meter('failedChangesets').mark();
return callback(new Error("Can't apply USER_CHANGES, because "+e.message)); throw new Error("Can't apply USER_CHANGES, because " + e.message);
} }
// ex. applyUserChanges // ex. applyUserChanges
apool = pad.pool; let apool = pad.pool;
r = baseRev; let r = baseRev;
// The client's changeset might not be based on the latest revision, // The client's changeset might not be based on the latest revision,
// since other clients are sending changes at the same time. // since other clients are sending changes at the same time.
// Update the changeset so that it can be applied to the latest revision. // Update the changeset so that it can be applied to the latest revision.
// https://github.com/caolan/async#whilst while (r < pad.getHeadRevisionNumber()) {
async.whilst(
function() { return r < pad.getHeadRevisionNumber(); },
function(callback)
{
r++; r++;
pad.getRevisionChangeset(r, function(err, c) { let c = await pad.getRevisionChangeset(r);
if (ERR(err, callback)) return;
// At this point, both "c" (from the pad) and "changeset" (from the // At this point, both "c" (from the pad) and "changeset" (from the
// client) are relative to revision r - 1. The follow function // client) are relative to revision r - 1. The follow function
// rebases "changeset" so that it is relative to revision r // rebases "changeset" so that it is relative to revision r
// and can be applied after "c". // and can be applied after "c".
try { try {
// a changeset can be based on an old revision with the same changes in it // a changeset can be based on an old revision with the same changes in it
// prevent eplite from accepting it TODO: better send the client a NEW_CHANGES // prevent eplite from accepting it TODO: better send the client a NEW_CHANGES
@ -726,41 +717,23 @@ function handleUserChanges(data, cb)
if (baseRev + 1 == r && c == changeset) { if (baseRev + 1 == r && c == changeset) {
client.json.send({disconnect:"badChangeset"}); client.json.send({disconnect:"badChangeset"});
stats.meter('failedChangesets').mark(); stats.meter('failedChangesets').mark();
throw new Error("Won't apply USER_CHANGES, because it contains an already accepted changeset");
return callback(new Error("Won't apply USER_CHANGES, because it contains an already accepted changeset"));
} }
changeset = Changeset.follow(c, changeset, false, apool); changeset = Changeset.follow(c, changeset, false, apool);
} catch(e) { } catch(e) {
client.json.send({disconnect:"badChangeset"}); client.json.send({disconnect:"badChangeset"});
stats.meter('failedChangesets').mark(); stats.meter('failedChangesets').mark();
throw new Error("Can't apply USER_CHANGES, because " + e.message);
return callback(new Error("Can't apply USER_CHANGES, because "+e.message)); }
} }
if ((r - baseRev) % 200 == 0) { let prevText = pad.text();
// don't let the stack get too deep
async.nextTick(callback);
} else {
callback(null);
}
});
},
// use the callback of the series function
callback
);
},
// do correction changesets, and send it to all users
function(callback) {
var prevText = pad.text();
if (Changeset.oldLen(changeset) != prevText.length) { if (Changeset.oldLen(changeset) != prevText.length) {
client.json.send({disconnect:"badChangeset"}); client.json.send({disconnect:"badChangeset"});
stats.meter('failedChangesets').mark(); stats.meter('failedChangesets').mark();
throw new Error("Can't apply USER_CHANGES "+changeset+" with oldLen " + Changeset.oldLen(changeset) + " to document of length " + prevText.length);
return callback(new Error("Can't apply USER_CHANGES "+changeset+" with oldLen " + Changeset.oldLen(changeset) + " to document of length " + prevText.length));
} }
try { try {
@ -768,36 +741,26 @@ function handleUserChanges(data, cb)
} catch(e) { } catch(e) {
client.json.send({ disconnect: "badChangeset" }); client.json.send({ disconnect: "badChangeset" });
stats.meter('failedChangesets').mark(); stats.meter('failedChangesets').mark();
throw e;
return callback(e)
} }
var correctionChangeset = _correctMarkersInPad(pad.atext, pad.pool); let correctionChangeset = _correctMarkersInPad(pad.atext, pad.pool);
if (correctionChangeset) { if (correctionChangeset) {
pad.appendRevision(correctionChangeset); pad.appendRevision(correctionChangeset);
} }
// Make sure the pad always ends with an empty line. // Make sure the pad always ends with an empty line.
if (pad.text().lastIndexOf("\n") != pad.text().length-1) { if (pad.text().lastIndexOf("\n") != pad.text().length-1) {
var nlChangeset = Changeset.makeSplice(pad.text(), pad.text().length-1, var nlChangeset = Changeset.makeSplice(pad.text(), pad.text().length - 1, 0, "\n");
0, "\n");
pad.appendRevision(nlChangeset); pad.appendRevision(nlChangeset);
} }
exports.updatePadClients(pad, function(er) { await exports.updatePadClients(pad);
ERR(er) } catch (err) {
});
callback();
}
],
function(err) {
stopWatch.end()
cb();
if(err) {
console.warn(err.stack || err); console.warn(err.stack || err);
} }
});
stopWatch.end();
} }
exports.updatePadClients = thenify(function(pad, callback) exports.updatePadClients = thenify(function(pad, callback)

View file

@ -48,6 +48,7 @@
"languages4translatewiki": "0.1.3", "languages4translatewiki": "0.1.3",
"log4js": "0.6.35", "log4js": "0.6.35",
"measured-core": "1.11.2", "measured-core": "1.11.2",
"nodeify": "^1.0.1",
"npm": "6.4.1", "npm": "6.4.1",
"object.values": "^1.0.4", "object.values": "^1.0.4",
"request": "2.88.0", "request": "2.88.0",
@ -87,4 +88,3 @@
"version": "1.7.5", "version": "1.7.5",
"license": "Apache-2.0" "license": "Apache-2.0"
} }