/* eslint-disable class-methods-use-this */
const { logging, information } = require('@proceed/machine');
const distribution = require('@proceed/distribution');
const NeoEngine = require('neo-bpmn-engine');
const { setupNeoEngine } = require('./neoEngineSetup.js');
const { getNewInstanceHandler } = require('./hookCallbacks.js');
const { getShouldPassToken } = require('./shouldPassToken.js');
const { getShouldActivateFlowNode } = require('./shouldActivateFlowNode.js');
const { getProcessIds } = require('@proceed/bpmn-helper');
// const Separator = require('./separator.js').default;
setupNeoEngine();
/**
* @memberof module:@proceed/core
* @class
* Every instance is associated with exactly one BPMN process definition and can
* contain multiple instances of said process.
*/
class Engine {
/**
* Creates a new Engine instance.
*/
constructor() {
/**
* Boolean indicating wether this Engine instance has already been started or not.
* @private
* @type {boolean}
*/
this._started = false;
/**
* Boolean indicating wether this Engine is currently pausing
* @private
* @type {boolean}
*/
this._pausing = false;
/**
* The user tasks when encountered in a process instance and awaiting handling.
* @type {object[]}
*/
this.userTasks = [];
/**
* The engines in which we intend to execute the call activities, maps from the id of the call activity to the corresponding engine
* @type {Object}
*/
this.callActivityExecutors = {};
/**
* The globally known IDs for the instances created within this BPMN process
* @type {string[]}
*/
this.instanceIDs = [];
/**
* The ID of the process for this Engine instance
* @type {string}
*/
this.processID = null;
/**
* The name of the file the process description is taken from before being executed in this module
* @type {string}
*/
this.definitionId = null;
/**
* The definitionId for the process description of the imported process executed in this module
*/
this.importDefinitionId = null;
/**
* The NEOBPMNEngine process object containing all process instances.
* @private
*/
this._process = null;
/**
* The logging instance configured with the provided processID.
* @private
*/
this._log = null;
/**
* The description of the process that is deployed to this engine instance
*/
this._bpmn = '';
/** The timestamp of the start of this process, at which it could init new
* instances. They might be created later though.
* @private
* @type {number}
*/
this._processStartTime = null;
/** id, ip and name of this machine
* @type {object}
*/
this.machineInformation = null;
}
static provideScriptExecutor(scriptExecutor) {
if (scriptExecutor) {
NeoEngine.provideScriptExecutor(scriptExecutor);
}
}
/**
* Deploys the process to the NeoBPMN Engine making it ready to start instances
*
* @param {string} definitionId The name of the file of the (main) process (as stored in the `data`)
* @param {string} importDefinitionId --optional-- the definitionId under which we can find the imported process definition we want to start here
* module)
*/
async deployProcess(definitionId, importDefinitionId) {
// Fetch the stored BPMN
let bpmn;
if (!importDefinitionId) {
bpmn = await distribution.db.getProcess(definitionId);
} else {
bpmn = await distribution.db.getImportedProcess(definitionId, importDefinitionId);
this.importDefinitionId = importDefinitionId;
}
const [processId] = await getProcessIds(bpmn);
// validate imports and user tasks on first deploy || assumes validity for imported processes since we expect to have a fully valid main process
if (!importDefinitionId && !(await distribution.db.isProcessValid(definitionId))) {
throw new Error(
`Process ${processId} with definitionId ${definitionId} is invalid. It can't be deployed.`
);
}
if (!importDefinitionId) {
const log = logging.getLogger({
moduleName: 'CORE',
processID: processId,
definitionId,
});
this._log = log;
}
// Every Engine instance is only allowed to be associated with one process
// (possibly multiple instances of that process though)
this.processID = processId;
const { id, name, hostname, port } = await information.getMachineInformation([
'id',
'name',
'hostname',
'port',
]);
const { ip } = distribution.communication
.getAvailableMachines()
.find((machine) => machine.id === id);
this.machineInformation = { id, name: name || hostname, ip, port };
const process = await NeoEngine.BpmnProcess.fromXml(processId, bpmn, {
shouldPassTokenHook: getShouldPassToken(this),
shouldActivateFlowNodeHook: getShouldActivateFlowNode(this, Engine),
});
process.deploy();
this._process = process;
this.definitionId = definitionId;
this._bpmn = bpmn;
}
/**
* Starts the execution of a BPMN process. This can involve the creation of
* multiple instances of the process, if the process contains such events.
* When encountering User Tasks in the ongoing execution, they are added to
* the `userTasks` array property.
* @param {object} processVariables The process variables in the init state
* @param {object|string} instance contains the instance object that came from another engine to be contiued here (might contain only an id of an activity to start)
* @param {function} onStarted function that is executed when the new instance starts
* @param {function} onEnded function that is executed when the new instance ends
* @param {function} onTokenEnded function that is executed when a token ends its execution
*/
startProcess(processVariables, instance, onStarted, onEnded, onTokenEnded) {
if (typeof instance === 'function') {
onTokenEnded = onEnded;
onEnded = onStarted;
onStarted = instance;
instance = undefined;
}
// we want to start a new instance at a specific node
let activityId;
if (typeof instance === 'string') {
activityId = instance;
instance = undefined;
}
// Subscribe to the new process instances stream before we start the execution
this._process
.getInstance$()
.subscribe(getNewInstanceHandler(this, instance, onStarted, onEnded, onTokenEnded));
if (this._started) {
throw new Error('This Engine instance was already started with a process!');
}
this._started = true;
try {
if (activityId !== undefined) {
// start at the specified activity
this._process.startAt({
tokens: [
{
currentFlowElementId: activityId,
machineHops: 0,
deciderStorageTime: 0,
deciderStorageRounds: 0,
},
],
});
} else if (instance !== undefined) {
// continue the given instance
this._process.startAt({
globalStartTime: instance.globalStartTime,
tokens: instance.tokens,
instanceId: instance.processInstanceId,
variables: processVariables,
log: instance.log,
});
} else {
// start the process at a its start event
this._process.start({
variables: processVariables,
token: { machineHops: 0, deciderStorageTime: 0, deciderStorageRounds: 0 },
});
}
} catch (error) {
this._log.error(error);
}
}
/**
* Continues an token coming from another machine by inserting the token in the running instance on this engine
*
* @param {Object} instance the instance object coming from another machine we want to continue
*/
insertToken(instance) {
// the instance is already running => place token at desired location
const localInstance = this._process.getInstanceById(instance.processInstanceId);
const [token] = instance.tokens;
const sequenceFlowId = localInstance.getSequenceFlowId(token.from, token.to);
localInstance.updateVariables(instance.variables);
localInstance.mergeFlowNodeLog(instance.log);
const placingToken = { ...token };
delete placingToken.from;
delete placingToken.to;
localInstance.placeTokenAt(sequenceFlowId, placingToken);
}
moveToken(instanceId, tokenId, targetId) {
const localInstance = this._process.getInstanceById(instanceId);
if (!localInstance) {
throw new Error(`Instance with id ${instanceId} does not exist!`);
}
const localToken = localInstance.getState().tokens.find((token) => token.tokenId === tokenId);
if (!localToken) {
throw new Error(`Token with id ${tokenId} does not exist!`);
}
localInstance.placeTokenAt(targetId, { ...localToken });
}
/**
* Signals the user task as completed to the corresponding process instance,
* which is responsible.
* @param {string} instanceID The id of the process instance to be notified
* @param {string} userTaskID The id of the user task
* @param {object} variables The updated process variables
*/
completeUserTask(instanceID, userTaskID, variables) {
const userTask = this.userTasks.find(
(uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
);
userTask.processInstance.completeActivity(userTask.id, userTask.tokenId, variables);
}
/**
* Signals the user task as aborted to the corresponding process instance,
* which is responsible.
* @param {string} instanceID The id of the process instance to be notified
* @param {string} userTaskID The id of the user task
*/
abortUserTask(instanceID, userTaskID) {
const userTask = this.userTasks.find(
(uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
);
userTask.processInstance.failActivity(userTask.id, userTask.tokenId);
}
/**
* Sets the current progress of a flowNode running at given token (mainly used for usertasks)
* @param {string} instanceID The id of the process instance to be notified
* @param {string} tokenId The id of the token
* @param {number} progress The current progress of a flow node
*/
setFlowNodeProgress(instanceID, tokenId, progress) {
const instance = this.getInstance(instanceID);
instance.setFlowNodeProgress(tokenId, progress);
}
/**
* Returns the instance with the given id
*
* @param {string} instanceID id of the instance we want to get
* @returns {object} - the requested process instance
*/
getInstance(instanceID) {
return this._process.getInstanceById(instanceID);
}
/**
* Deletes the instance with the given id
*
* @param {string} instanceID id of the instance to be deleted
*/
deleteInstance(instanceID) {
this.instanceIDs.splice(this.instanceIDs.indexOf(instanceID), 1);
this._process.deleteInstanceById(instanceID);
}
getInstanceInformation(instanceID) {
const instance = this.getInstance(instanceID);
const state = instance.getState();
return state;
}
getAllInstanceTokens(instanceID) {
const instance = this.getInstance(instanceID);
const state = instance.getState();
return state.tokens;
}
getToken(instanceID, tokenId) {
const instance = this.getInstance(instanceID);
const tokens = instance.getState().tokens;
return tokens.find((token) => token.tokenId === tokenId);
}
updateToken(instanceID, tokenId, attributes) {
const instance = this.getInstance(instanceID);
instance.updateToken(tokenId, attributes);
}
mergeFlowNodeLog(instanceID, executions) {
const instance = this.getInstance(instanceID);
instance.mergeFlowNodeLog(executions);
}
logExecution(instanceID, elementId, tokenId, attributes) {
const instance = this.getInstance(instanceID);
instance.logExecution(elementId, tokenId, attributes);
}
removeToken(instanceID, tokenId) {
const instance = this.getInstance(instanceID);
instance.removeToken(tokenId);
}
updateLog(instanceID, elementId, tokenId, attributes) {
const instance = this.getInstance(instanceID);
instance.updateLog(elementId, tokenId, attributes);
}
updateVariables(instanceID, variables) {
const instance = this.getInstance(instanceID);
instance.updateVariables(variables);
}
getInstanceState(instanceID) {
const instance = this.getInstance(instanceID);
if (instance.isEnded()) {
return 'ended';
} else if (instance.isPaused()) {
return 'paused';
} else {
return 'running';
}
// TODO: get state from instance
// return instance.getState().instanceState;
// -> returns array with states of all tokens
}
/**
* Stops instance
*
* @param {string} instanceID id of the instance we want to stop
*/
async stopInstance(instanceID) {
const instance = this.getInstance(instanceID);
if (!instance.isEnded()) {
this._log.info({
msg: `Stopping process instance. Id = ${instanceID}`,
instanceId: instanceID,
});
const tokens = this.getAllInstanceTokens(instanceID);
tokens.forEach((token) => {
if (
token.state === 'RUNNING' ||
token.state === 'DEPLOYMENT-WAITING' ||
token.state === 'READY'
) {
instance.interruptToken(token.tokenId);
instance.logExecution(token.currentFlowElementId, token.tokenId, {
executionState: 'STOPPED',
startTime: token.currentFlowElementStartTime,
endTime: +new Date(),
machine: this.machineInformation,
});
}
});
instance.stop();
// archive the information for the stopped instance
await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
this.deleteInstance(instance.id);
this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
}
}
/**
*
* Stop every token of this instance due to unfulfilled constraints
* @param {String} instanceID - ID of process instance
* @param {Array} unfulfilledConstraints - List of unfulfilled constraints
*/
async stopUnfulfilledInstance(instanceID, unfulfilledConstraints) {
const instance = this.getInstance(instanceID);
if (!instance.isEnded()) {
this._log.info({
msg: `Stopping process instance due to unfulfilled constraints. Id =${instanceID}`,
instanceId: instanceID,
});
}
const tokens = this.getAllInstanceTokens(instanceID);
tokens.forEach((token) => {
if (token.state === 'DEPLOYMENT-WAITING') {
instance.interruptToken(token.tokenId); // will cancel shouldPassTokenHook
instance.endToken(token.tokenId, {
state: 'ERROR-CONSTRAINT-UNFULFILLED',
errorMessage: `Instance stopped execution because of: ${unfulfilledConstraints.join(
', '
)}`,
endTime: +new Date(),
});
instance.updateLog(token.currentFlowElementId, token.tokenId, {
machine: this.machineInformation,
});
}
if (token.state === 'READY' || token.state === 'RUNNING') {
instance.endToken(token.tokenId, {
state: 'ERROR-CONSTRAINT-UNFULFILLED',
errorMessage: `Instance stopped execution because of: ${unfulfilledConstraints.join(
', '
)}`,
endTime: +new Date(),
});
instance.updateLog(token.currentFlowElementId, token.tokenId, {
machine: this.machineInformation,
});
}
});
await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
this.deleteInstance(instance.id);
this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
}
async abortInstance(
instanceID,
msg = `Aborting process instance due to signal from another machine. Id =${instanceID}`
) {
const instance = this.getInstance(instanceID);
if (!instance.isEnded()) {
this._log.info({
msg,
instanceId: instanceID,
});
}
const tokens = this.getAllInstanceTokens(instanceID);
// abort all not-ended tokens on instance
tokens.forEach((token) => {
if (token.state === 'DEPLOYMENT-WAITING') {
instance.interruptToken(token.tokenId); // will cancel shouldPassTokenHook
instance.endToken(token.tokenId, { state: 'ABORTED', endTime: +new Date() });
}
if (token.state === 'READY' || token.state === 'RUNNING') {
instance.endToken(token.tokenId, { state: 'ABORTED', endTime: +new Date() });
}
});
// archive the information for the stopped instance
await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
this.deleteInstance(instance.id);
this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
}
/**
* Pauses an instance
*
* @param {string} instanceID id of the instance we want to pause
*/
async pauseInstance(instanceID) {
const instance = this.getInstance(instanceID);
if (!instance.isEnded() && !instance.isPaused()) {
instance.updateProcessStatus('PAUSING');
this._pausing = true; // set pausing true to prevent further transition (in shouldPassTokenHook) of currently running tokens
this._log.info({
msg: `Pausing process instance. Id = ${instanceID}`,
instanceId: instanceID,
});
const tokens = this.getAllInstanceTokens(instanceID);
let tokensRunning = false;
// pause flowNode execution of tokens with state READY and DEPLOYMENT-WAITING
tokens.forEach((token) => {
if (token.state === 'DEPLOYMENT-WAITING' || token.state === 'READY') {
instance.pauseToken(token.tokenId);
this.updateToken(instanceID, token.tokenId, { state: 'PAUSED' });
}
if (token.state === 'RUNNING') {
tokensRunning = true;
}
});
// wait for running tokens to end execution before setting instance state to PAUSED
return new Promise((resolve, reject) => {
if (!tokensRunning) {
instance.pause();
resolve();
}
instance.onInstanceStateChange((newInstanceState) => {
const instanceInactive = newInstanceState.find(
(tokenState) => tokenState === 'STOPPED' || tokenState === 'PAUSED'
);
if (instanceInactive) {
reject();
} else {
tokensRunning = newInstanceState.find((tokenstate) => tokenstate === 'RUNNING');
if (!tokensRunning) {
resolve();
}
}
});
})
.then(() => {
instance.pause();
distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
this.deleteInstance(instance.id);
})
.catch(() => {});
}
}
getUserTasks() {
let callActivityUserTasks = Object.values(this.callActivityExecutors).flatMap((engine) =>
engine.getUserTasks()
);
callActivityUserTasks = callActivityUserTasks.map((userTask) => ({
...userTask,
processChain: `${this.processID}|${userTask.processChain}`,
}));
return this.userTasks.concat(callActivityUserTasks);
}
getMilestones(instanceID, userTaskID) {
const userTask = this.userTasks.find(
(uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
);
const token = this.getToken(instanceID, userTask.tokenId);
return token.milestones || {};
}
updateMilestones(instanceID, userTaskID, milestones) {
const userTask = this.userTasks.find(
(uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
);
const token = this.getToken(instanceID, userTask.tokenId);
const newMilestones = { ...token.milestones, ...milestones };
userTask.processInstance.updateToken(token.tokenId, {
milestones: newMilestones,
});
const currentProgress = Object.values(newMilestones).reduce(
(prev, curr, _, array) => (prev + curr) / array.length
);
this.setFlowNodeProgress(instanceID, token.tokenId, currentProgress);
}
setFlowNodeState(instanceId, tokenId, state, variables) {
const instance = this.getInstance(instanceId);
const token = this.getToken(instanceId, tokenId);
const activityId = token.currentFlowElementId;
switch (state) {
case 'ACTIVE':
case 'EXTERNAL':
instance.setFlowNodeState(tokenId, 'EXTERNAL');
break;
case 'COMPLETED':
instance.completeActivity(activityId, tokenId, variables);
break;
case 'TERMINATED':
instance.terminateActivity(activityId, tokenId);
break;
case 'FAILED':
instance.failActivity(activityId, tokenId);
break;
default:
throw new Error('Invalid state');
}
}
}
module.exports = Engine;