const { config } = require('@proceed/machine');
const distribution = require('@proceed/distribution');
const { getProcessIds, getUserTaskFileNameMapping } = require('@proceed/bpmn-helper');
const decider = require('@proceed/decider');
const Parser = require('@proceed/constraint-parser-xml-json/parser.js');
const Engine = require('./engine/engine.js');
/**
* @memberof module:@proceed/core
* @class
*
* Object that manages the execution of **all** BPMN processes.
* It is a Singleton.
* @hideconstructor
*/
const Management = {
/**
* Array containing all currently running engine instances.
* @private
*/
_engines: [],
provideScriptExecutor(scriptExecutor) {
Engine.provideScriptExecutor(scriptExecutor);
},
/**
* Creates a new engine instance for execution of the given process.
* @param {string} definitionId The name of the file the process to start is stored in
* @param {object} variables The process variables for the execution
* @param {string} [activityID] The optional id of the activity
* to start execution at (if not at the beginning)
*/
async createInstance(definitionId, variables, activityID) {
const { processes } = await config.readConfig();
if (processes.deactivateProcessExecution) {
return null;
}
const { bpmn, deploymentMethod } = await distribution.db.getProcessInfo(definitionId);
if (deploymentMethod === 'dynamic') {
const parser = new Parser();
const processConstraints = parser.getConstraints(bpmn);
const taskConstraints = parser.getConstraints(bpmn, activityID);
const processId = await getProcessIds(bpmn);
const userTasks = await getUserTaskFileNameMapping(bpmn);
const processInfo = {
id: processId,
nextFlowNode: {
id: activityID,
isUserTask: !!userTasks[activityID],
},
};
const executionAllowed = await decider.allowedToExecuteLocally(
processInfo,
null,
taskConstraints,
processConstraints
);
if (!executionAllowed) {
return null;
}
}
// Start up a new engine
const engine = new Engine();
this._engines.push(engine);
await engine.deployProcess(definitionId);
engine.startProcess(variables, activityID);
return engine;
},
/**
* Continues running an instance of a process on this engine
* that was running on another machine
*
* @param {string} definitionId The name of the file the process to continue is stored in
*/
async continueInstance(definitionId, instance) {
const { processes } = await config.readConfig();
if (processes.deactivateProcessExecution) {
return null;
}
const { bpmn, deploymentMethod } = await distribution.db.getProcessInfo(definitionId);
if (deploymentMethod === 'dynamic') {
const parser = new Parser();
const processConstraints = parser.getConstraints(bpmn);
const taskConstraints = parser.getConstraints(bpmn, instance.to);
const processId = await getProcessIds(bpmn);
const userTasks = await getUserTaskFileNameMapping(bpmn);
const processInfo = {
id: processId,
nextFlowNode: {
id: instance.tokens[0].to,
isUserTask: !!userTasks[instance.tokens[0].to],
},
};
const tokenInfo = {
globalStartTime: instance.globalStartTime,
localStartTime: instance.tokens[0].localStartTime,
localExecutionTime: instance.tokens[0].localExecutionTime,
machineHops: instance.tokens[0].machineHops,
storageRounds: instance.tokens[0].deciderStorageRounds,
storageTime: instance.tokens[0].deciderStorageTime,
};
const executionAllowed = await decider.allowedToExecuteLocally(
processInfo,
tokenInfo,
taskConstraints,
processConstraints
);
if (!executionAllowed) {
return null;
}
}
let engine = this.getEngineWithID(instance.processInstanceId);
if (engine && engine.getInstanceState(instance.processInstanceId) === 'running') {
const placingTokens = instance.tokens.map((token) => {
return {
tokenId: token.tokenId,
from: token.from,
to: token.to,
machineHops: token.machineHops + 1,
nextMachine: undefined,
};
});
const continueInstanceInfo = { ...instance, tokens: placingTokens };
engine.insertToken(continueInstanceInfo);
return engine;
}
engine = new Engine();
this._engines.push(engine);
const startingTokens = instance.tokens.map((token) => {
return {
tokenId: token.tokenId,
currentFlowElementId: token.to,
machineHops: token.machineHops + 1,
deciderStorageTime: 0,
deciderStorageRounds: 0,
};
});
const startingInstanceInfo = { ...instance, tokens: startingTokens };
await engine.deployProcess(definitionId);
engine.startProcess(startingInstanceInfo.variables, startingInstanceInfo, (newInstance) => {
engine._log.info({
msg: `Continuing process instance. Id = ${startingInstanceInfo.processInstanceId}. TokenId = ${startingInstanceInfo.tokens[0].tokenId}`,
instanceId: startingInstanceInfo.processInstanceId,
});
});
return engine;
},
/**
* Resuming an instance of a process on this engine that was paused
*
* @param {string} definitionId The name of the file the process to continue is stored in
* @param {string} instanceId The id the process instance to resume
*/
async resumeInstance(definitionId, instanceId) {
let instanceInformation;
const existingEngine = this.getEngineWithID(instanceId);
if (existingEngine) {
instanceInformation = existingEngine.getInstanceInformation(instanceId);
this.removeInstance(existingEngine);
} else {
instanceInformation = (await distribution.db.getArchivedInstances(definitionId))[instanceId];
}
const resumedTokens = instanceInformation.tokens.map((token) => {
const tokenActive =
token.state === 'RUNNING' ||
token.state === 'READY' ||
token.state === 'DEPLOYMENT-WAITING' ||
token.state === 'PAUSED';
return {
tokenId: token.tokenId,
state: tokenActive ? 'READY' : token.state,
currentFlowElementId: token.currentFlowElementId,
deciderStorageRounds: token.deciderStorageRounds,
deciderStorageTime: token.deciderStorageTime,
machineHops: token.machineHops,
};
});
const resumedInstanceInformation = {
processInstanceId: instanceInformation.processInstanceId,
globalStartTime: instanceInformation.globalStartTime,
tokens: resumedTokens,
variables: instanceInformation.variables,
log: instanceInformation.log,
};
// Start up a new engine
const engine = new Engine();
this._engines.push(engine);
await engine.deployProcess(definitionId);
engine.startProcess(
resumedInstanceInformation.variables,
resumedInstanceInformation,
(newInstance) => {
engine._log.info({
msg: `Resuming process instance. Id = ${resumedInstanceInformation.processInstanceId}`,
instanceId: resumedInstanceInformation.instanceId,
});
}
);
return engine;
},
removeInstance(engine) {
this._engines.splice(this._engines.indexOf(engine), 1);
},
getAllEngines() {
return this._engines;
},
/**
* Return the engine with the given instance id.
* @param {string} instanceID The id of an instance the engine is executing
* @returns {module:@proceed/core.ProceedEngine}
*/
getEngineWithID(instanceID) {
return this._engines.find((engine) => engine.instanceIDs.includes(instanceID));
},
/**
* Return the engines running a process that is defined in the file with the given name
*
* @param {String} definitionId name of the file the process description is stored in
* @returns {Array} - all engines running instances of the process with the given id
*/
getEnginesWithDefinitionId(definitionId) {
return this._engines.filter((engine) => engine.definitionId === definitionId);
},
/**
* Return the engines running a process with the given id
* @param {string} processID the id of the process the engine is executing
* @returns {Array} - all engines running instances of the process with the given id
*/
getEnginesWithProcessID(processID) {
return this._engines.filter((engine) => engine.processID === processID);
},
/**
* Return all activities that currently wait for user input.
* @returns {object[]}
*/
getPendingUserTasks() {
const userTasks = this._engines.flatMap((engine) => engine.getUserTasks());
return userTasks;
},
};
module.exports = Management;