const NeoEngine = require('neo-bpmn-engine');
const distribution = require('@proceed/distribution');
const {
getProcessIds,
getDefinitionsAndProcessIdForEveryCallActivity,
} = require('@proceed/bpmn-helper');
const { abortInstanceOnNetwork } = require('./processForwarding.js');
/**
* Creates a callback function that can be used to handle calls from the log stream of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
*/
function getLogHandler(engine, instance) {
return (bpmnLog) => {
engine._log.log({
level: NeoEngine.LogLevel[bpmnLog.level].toLowerCase(),
msg: bpmnLog.message,
moduleName: 'BPMN-ENGINE',
instanceId: instance.id,
});
};
}
/**
* Creates a callback function that can be used to handle calls from the onEnded hook of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Function} onEnded function that is supposed to be called when instance ends
* @param {Object} instance the process instance that ended
*/
function getOnEndedHandler(engine, onEnded, instance) {
return () => {
engine._log.info({
msg: `Process instance ended. Id = ${instance.id}`,
instanceId: instance.id,
});
// archive the information for the finalized instance
if (!engine.importDefinitionId) {
distribution.db.archiveInstance(engine.definitionId, instance.id, instance.getState());
}
if (typeof onEnded === 'function') {
onEnded(instance);
}
};
}
/**
* Creates a callback function that can be used to handle calls from the onAborted hook of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Object} instance the process instance that ended
*/
function getOnAbortedHandler(engine, instance) {
return () => {
engine._log.info({
msg: `Process instance aborted. Id = ${instance.id}`,
instanceId: instance.id,
});
engine._log.info({
msg: `Broadcasting instance abort signal into network. Id = ${instance.id}`,
instanceId: instance.id,
});
abortInstanceOnNetwork(engine.definitionId, instance.id);
};
}
/**
* Creates a callback function that can be used to handle calls from the onTokenEnded hook of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Function} onTokenEnded function that is supposed to be called when the execution of a token ends
* @param {Object} instance the process instance the token is in
*/
function getOnTokenEndedHandler(engine, onTokenEnded, instance) {
return (token) => {
engine._log.info({
msg: `Token with id ${token.tokenId} ended. InstanceId = ${instance.id} `,
instanceId: instance.id,
});
if (typeof onTokenEnded === 'function') {
onTokenEnded(token);
}
};
}
/**
* Creates a callback function that can be used to handle calls from the onScriptTaskError hook of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Object} instance the process instance the token is in
*/
function getOnScriptTaskErrorHandler(engine, instance) {
return (execution) => {
// engine._log.info({
// msg: `Technical Error in Script Task with id ${execution.flowElementId} on token ${execution.tokenId}. InstanceId = ${instance.id} `,
// instanceId: instance.id,
// });
};
}
/**
* Creates a callback function that can be used to handle calls from the onUserTaskInterrupted hook of the neo engine
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Object} instance the process instance the token is in
*/
function getOnUserTaskInterruptedHandler(engine, instance) {
return (execution) => {
engine._log.info({
msg: `User Task with id ${execution.flowElementId} on token ${execution.tokenId} ended. InstanceId = ${instance.id} `,
instanceId: instance.id,
});
};
}
/**
* Creates a callback function that can be used to handle calls from the onCallActivityInterrupted hook of the neo engine
* -> stop execution of call activity processs
* @param {Object} engine proceed engine instance that contains the process information
* @param {Object} instance the process instance the token is in
*/
function getOnCallActivityInterruptedHandler(engine, instance) {
return async (execution) => {
const callActivityId = execution.flowElementId;
const callActivityDefinitionIdMapping = await getDefinitionsAndProcessIdForEveryCallActivity(
engine._bpmn
);
const callActivityDefinitionId = callActivityDefinitionIdMapping[callActivityId].definitionId;
const importBPMN = await distribution.db.getImportedProcess(
engine.definitionId,
callActivityDefinitionId
);
const [importProcessId] = await getProcessIds(importBPMN);
const CAExecutionEngine = engine.callActivityExecutors[importProcessId];
if (CAExecutionEngine) {
CAExecutionEngine.instanceIDs.forEach((instanceID) => {
CAExecutionEngine.stopInstance(instanceID);
});
}
};
}
module.exports = {
/**
* Returns a callBack function that is used for the instance stream of the neo engine
* this callBack registers callBack functions for the different lifecycle hooks of a newly created process
*
* @param {Object} engine proceed engine instance that contains the process information
* @param {Class} Engine the class we use to store information about a specific process and its instances
* @param {Object} preexistingInstance an optional object containing information about an instance we want to continue on this machine
* @param {Function} onStarted a callback function that is supposed to be called when a new instance starts
* @param {Function} onEnded a callback function that is supposed to be called when an instance ends
* @param {Function} onTokenEnded a callback function that is supposed to be called a token inside an instance reaches a finished state
*/
getNewInstanceHandler(engine, preexistingInstance, onStarted, onEnded, onTokenEnded) {
return (newInstance) => {
if (!preexistingInstance) {
// we are starting a new instance
engine._log.info({
msg: `A new process instance was created. Id = ${newInstance.id}`,
instanceId: newInstance.id,
});
engine.instanceIDs.push(newInstance.id);
} else {
engine._log.info({
msg: `Process instance started. Id = ${newInstance.id}`,
instanceId: newInstance.id,
});
// we are starting a new local instance to continue an instance started on another machine
engine.instanceIDs.push(preexistingInstance.processInstanceId);
}
newInstance.getLog$().subscribe(getLogHandler(engine, newInstance)); // subscribe to log-stream of bpmn processinstance
// Set up lifecycle listeners
if (typeof onStarted === 'function') {
onStarted(newInstance);
}
newInstance.onEnded(getOnEndedHandler(engine, onEnded, newInstance));
newInstance.onScriptTaskError(getOnScriptTaskErrorHandler(engine, newInstance));
newInstance.onAborted(getOnAbortedHandler(engine, newInstance));
newInstance.onUserTaskInterrupted(getOnUserTaskInterruptedHandler(engine, newInstance));
newInstance.onCallActivityInterrupted(
getOnCallActivityInterruptedHandler(engine, newInstance)
);
newInstance.onTokenEnded(getOnTokenEndedHandler(engine, onTokenEnded, newInstance));
newInstance.onFlowNodeExecuted((execution) => {
const token = engine.getToken(newInstance.id, execution.tokenId);
// move information about milestones to log and delete from token
if (token) {
if (token.milestones) {
newInstance.updateLog(execution.flowElementId, execution.tokenId, {
milestones: token.milestones,
});
newInstance.updateToken(execution.tokenId, { milestones: undefined });
}
if (token.currentFlowNodeIsExternal) {
newInstance.updateLog(execution.flowElementId, execution.tokenId, {
external: true,
});
newInstance.updateToken(execution.tokenId, { currentFlowNodeIsExternal: undefined });
}
const flowElement = newInstance.getFlowElement(execution.flowElementId);
if (flowElement && flowElement.$type === 'bpmn:UserTask') {
// remove user task from list
const index = engine.userTasks.findIndex(
(uT) => uT.processInstance.id === newInstance.id && uT.id === flowElement.id
);
if (index > -1) {
engine.userTasks.splice(index, 1);
}
}
}
if (!execution.machine) {
newInstance.updateLog(execution.flowElementId, execution.tokenId, {
machine: engine.machineInformation,
});
engine._log.info({
msg: `Finished execution of flowNode ${execution.flowElementId}. InstanceId = ${newInstance.id}`,
instanceId: newInstance.id,
});
}
});
newInstance.onInstanceStateChange((instanceState) => {
//instanceState = array of token states
const instanceEnded = instanceState.every(
(s) =>
s === 'ENDED' ||
s === 'FAILED' ||
s === 'TERMINATED' ||
s === 'ABORTED' ||
s === 'ERROR-TECHNICAL' ||
s === 'ERROR-SEMANTIC' ||
s === 'FORWARDED' ||
s === 'ERROR-CONSTRAINT-UNFULFILLED'
);
if (instanceEnded) {
// TODO: save instance data, delete instance
}
});
};
},
};