import { processEndpoint, _5thIndustryEndpoint } from './ms-engine-communication/module.js';
import {
asyncForEach,
asyncMap,
} from '../../../shared-frontend-backend/helpers/javascriptHelpers.js';
import { toListString } from '../../../shared-frontend-backend/helpers/arrayHelpers.js';
import { requestDeploymentInformation } from './machines/machineInfo.js';
import { getMachines } from '../data/machines.js';
import loggingEx from '@proceed/machine';
const { logging } = loggingEx;
import bpmnEx from '@proceed/bpmn-helper';
const {
toBpmnObject,
toBpmnXml,
getElementsByTagName,
getElementMachineMapping,
setDeploymentMethod,
getStartEvents,
getProcessIds,
getDefinitionsAndProcessIdForEveryCallActivity,
validateCalledProcess,
getAllUserTaskFileNamesAndUserTaskIdsMapping,
getTaskConstraintMapping,
getProcessConstraints,
getMetaDataFromElement,
} = bpmnEx;
import {
updateProcess,
getProcessUserTasksHtml,
getProcesses,
getProcessBpmn,
} from '../data/process.js';
import decider from '@proceed/decider';
import {
start5thIndustryPlan,
stop5thIndustryPlan,
get5thIndustryServiceAccountData,
get5thIndustryAuthorization,
} from './5thIndustry/5thIndustry.js';
export async function getInstanceInformation(machine, definitionId, instanceId) {
return await processEndpoint.getInstanceInformation(machine, definitionId, instanceId);
}
export async function getActiveUserTasks(machine) {
return await processEndpoint.getActiveUserTasks(machine);
}
export async function getActiveUserTaskHTML(
machine,
instanceId,
userTaskId,
processChain,
callChain
) {
return await processEndpoint.getActiveUserTaskHTML(
machine,
instanceId,
userTaskId,
processChain,
callChain
);
}
export async function completeUserTask(machine, instanceId, userTaskId, processChain, callChain) {
await processEndpoint.completeUserTask(machine, instanceId, userTaskId, processChain, callChain);
}
export async function updateUserTaskMilestone(
machine,
instanceId,
userTaskId,
processChain,
callChain,
milestone
) {
await processEndpoint.updateUserTaskMilestone(
machine,
instanceId,
userTaskId,
processChain,
callChain,
milestone
);
}
export async function getProcessInstances(machine, definitionId) {
return await processEndpoint.getProcessInstances(machine, definitionId);
}
export async function getDeployedProcesses(machine) {
return await processEndpoint.getDeployedProcesses(machine);
}
const configObject = {
moduleName: 'MS',
consoleOnly: true,
};
let logger = null;
async function getLogger() {
logger = await logging.getLogger(configObject);
}
getLogger();
/**
* Waits for request to resolve or reject and returns the result and information if it succeded
*
* @param {promise} request the request we await to resolve
*
* @returns {object} object containing a result and a status member
*/
async function settleRequest(request) {
let result;
let status;
try {
result = await request;
status = 'Succeeded';
} catch (err) {
result = err;
status = 'Failed';
}
return { result, status };
}
/**
* Returns an array with all machines the process with the given id is deployed to
*
* @param {String} processDefinitionsId the id of the deployed process
*/
function getDeployedToMachines(processDefinitionsId) {
const machines = getMachines();
const deployedTo = machines.filter((machine) => {
if (
machine.deployedProcesses &&
Object.values(machine.deployedProcesses).some(
(deployment) => deployment.definitionId === processDefinitionsId
)
) {
return true;
}
return false;
});
return deployedTo;
}
/**
* Removes deployed process with corresponding definitionid from all given machines
*
* @param {String} definitionId name of the file the process is stored under
* @param {Array} machines contains information about all machines the process is supposed to be removed from
*/
async function removeDeploymentFromMachines(definitionId, machines) {
// makes all requests executable and awaitable at the same time even when some may fail
const settledRequests = await asyncMap(machines, async (machine) => {
const result = await settleRequest(processEndpoint.removeDeployment(machine, definitionId));
return { request: result, machine };
});
settledRequests.forEach(({ request, machine }) => {
if (request.status === 'Failed') {
if (logger) {
logger.error(`Failed to delete from ${machine.name || machine.ip}: ${request.result}`);
}
}
});
}
/**
* Removes the deployment of the process with the given id from all currently connected machines
*
* @param {String} processDefinitionsId the id of the deployed process
*/
export async function removeDeployment(processDefinitionsId) {
const deployedTo = getDeployedToMachines(processDefinitionsId);
removeDeploymentFromMachines(processDefinitionsId, deployedTo);
}
/**
* Starts an instance of the deployed process with the given id
*
*
* @param {String} processDefinitionsId id of the deployed process
* @throws Will throw an error if starting the instance fails
*/
// TODO: use versioning to find the correct deployment to start
export async function startInstance(processDefinitionsId) {
const deployedTo = getDeployedToMachines(processDefinitionsId);
const deployment = deployedTo[0].deployedProcesses[processDefinitionsId];
let startMachineInfo;
const bpmnObj = await toBpmnObject(deployment.bpmn);
if (deployment.deploymentMethod === 'static') {
const machineMapping = await getElementMachineMapping(bpmnObj);
const [startEventId] = await getStartEvents(bpmnObj);
startMachineInfo = machineMapping[startEventId];
} else {
startMachineInfo = { machineId: deployedTo[0].id };
}
let startMachine;
if (startMachineInfo.machineId) {
startMachine = deployedTo.find((m) => m.id === startMachineInfo.machineId);
} else {
const [ip, port] = startMachineInfo.machineAddress
.replace(/\[?((?:(?:\d|\w)|:|\.)*)\]?:(\d*)/g, '$1+$2')
.split('+');
startMachine = { ip, port };
}
// if a 5thIndustry Plan is linked to the Process => make sure it is set to being in progress when the instance is started
let started5thIndustryPlan = false;
const [process] = getElementsByTagName(bpmnObj, 'bpmn:Process');
const metaData = getMetaDataFromElement(process);
try {
if (metaData['_5i-Inspection-Plan-ID']) {
await start5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
started5thIndustryPlan = true;
const serviceAccountData = get5thIndustryServiceAccountData();
// give the engine a way to authenticate itself in the 5thIndustry Application
if (serviceAccountData) {
// Pass the currently used service account data
await _5thIndustryEndpoint.send5thIndustryServiceAccountData(
startMachine,
serviceAccountData
);
} else {
// Pass the currently used authorization token
// Warning: when this token becomes invalid the engine won't be able to communicate with the 5thIndustry Application
await _5thIndustryEndpoint.send5thIndustryAuthorization(
startMachine,
get5thIndustryAuthorization()
);
}
}
await processEndpoint.startProcessInstance(startMachine, processDefinitionsId, {});
} catch (e) {
if (logger) {
logger.debug(`EXECUTION Error starting the process instance: ${e}`);
throw e;
}
// make sure to rollback in 5thIndustry App if there was a plan set to in progress but starting the instance failed
if (started5thIndustryPlan) {
await stop5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
}
}
}
const runningStates = ['PAUSED', 'RUNNING', 'READY', 'DEPLOYMENT-WAITING'];
/**
* Sends request to stop an instance of a process on a machine
*/
export async function stopInstance(processDefinitionsId, instanceId) {
await requestDeploymentInformation();
const deployedTo = getDeployedToMachines(processDefinitionsId);
const deployment = deployedTo[0].deployedProcesses[processDefinitionsId];
const bpmnObj = await toBpmnObject(deployment.bpmn);
const [process] = getElementsByTagName(bpmnObj, 'bpmn:Process');
const metaData = getMetaDataFromElement(process);
if (metaData['_5i-Inspection-Plan-ID']) {
await stop5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
}
const stillRunning = deployedTo.filter((machine) => {
const instance = Object.values(machine.deployedProcesses)
.find((deployment) =>
deployment.instances.some((instance) => instance.processInstanceId === instanceId)
)
.instances.find((instance) => instance.processInstanceId === instanceId);
if (instance && instance.instanceState.some((state) => runningStates.includes(state))) {
return true;
}
return false;
});
stillRunning.forEach((machine) => {
if (!machine) {
if (logger) {
logger.error('Unable to stop instance: Machine not found.');
}
throw new Error('Unable to stop instance: Machine not found.');
}
try {
processEndpoint.stopProcessInstance(machine, processDefinitionsId, instanceId);
} catch (err) {
if (logger) {
logger.error(`Failed to stop instance on ${machine.name}: ${err}.`);
throw err;
}
}
});
}
/**
* Sends request to pause an instance of a process on a machine
*/
export async function pauseInstance(processDefinitionsId, instanceId) {
await requestDeploymentInformation();
const deployedTo = getDeployedToMachines(processDefinitionsId);
const running = deployedTo.filter((machine) => {
const instance = Object.values(machine.deployedProcesses)
.find((deployment) =>
deployment.instances.some((instance) => instance.processInstanceId === instanceId)
)
.instances.find((instance) => instance.processInstanceId === instanceId);
if (
instance &&
instance.instanceState.some((state) => state !== 'PAUSED' && runningStates.includes(state))
) {
return true;
}
return false;
});
running.forEach((machine) => {
if (!machine) {
if (logger) {
logger.error('Unable to pause instance: Machine not found.');
}
throw new Error('Unable to pause instance: Machine not found.');
}
try {
processEndpoint.pauseProcessInstance(machine, processDefinitionsId, instanceId);
} catch (err) {
if (logger) {
logger.error(`Failed to pause instance on ${machine.name}: ${err}.`);
throw err;
}
}
});
}
/**
* Sends request to resume an instance of a process on a machine
*/
export async function resumeInstance(processDefinitionsId, instanceId) {
await requestDeploymentInformation();
const deployedTo = getDeployedToMachines(processDefinitionsId);
const paused = deployedTo.filter((machine) => {
const instance = Object.values(machine.deployedProcesses)
.find((deployment) =>
deployment.instances.some((instance) => instance.processInstanceId === instanceId)
)
.instances.find((instance) => instance.processInstanceId === instanceId);
if (
instance &&
instance.instanceState.some((state) => state === 'PAUSING' || state === 'PAUSED')
) {
return true;
}
return false;
});
paused.forEach((machine) => {
if (!machine) {
if (logger) {
logger.error('Unable to resume instance: Machine not found.');
}
throw new Error('Unable to resume instance: Machine not found.');
}
try {
processEndpoint.resumeProcessInstance(machine, processDefinitionsId, instanceId);
} catch (err) {
if (logger) {
logger.error(`Failed to resume instance on ${machine.name}: ${err}.`);
throw err;
}
}
});
}
/**
* Moves token inside an instance to a new flow element
*
* @param {Object} machine
* @param {String} machine.ip the ip address of the machine
* @param {Number} machine.port the port of the machine
* @param {String} processDefinitionsId
* @param {String} instanceId
* @param {String} tokenId
* @param {String} flowElementId
*/
export async function moveToken(machine, processDefinitionsId, instanceId, tokenId, flowElementId) {
await processEndpoint.moveToken(
machine,
processDefinitionsId,
instanceId,
tokenId,
flowElementId
);
}
/**
* Tries to find an optimal machine to deploy process to and sends all necessary process information on success
*
* @param {Object} process an object containing inforamtion about the process to deploy
* @param {String} bpmn the process description in xml
*/
async function dynamicDeployment(process, bpmn) {
const bpmnObj = await toBpmnObject(bpmn);
const startEventIds = await getStartEvents(bpmnObj);
const processConstraints = await getProcessConstraints(bpmnObj);
const taskConstraintMapping = await getTaskConstraintMapping(bpmnObj);
await setDeploymentMethod(bpmnObj, 'dynamic');
bpmn = await toBpmnXml(bpmnObj);
await updateProcess(process.id, { bpmn });
const addedMachines = getMachines().filter(
(machine) => !machine.discovered && machine.status === 'CONNECTED'
);
// use decider to get sorted list of viable engines
const { engineList } = await decider.findOptimalExternalMachine(
{ id: process.id, name: process.name, nextFlowNode: startEventIds[0] },
taskConstraintMapping[startEventIds[0]] || {},
processConstraints || {},
addedMachines
);
// try to get the best engine
let [preferredMachine] = engineList;
// there is no deployable machine known to the MS
if (!preferredMachine) {
if (logger) {
logger.error(`Unable to find machine to deploy process: ${process.name} to.`);
}
throw new Error('There is no machine the process can be deployed to.');
}
// deploying process to selected engine
try {
await processEndpoint.deployProcess(preferredMachine, process.id, bpmn);
await sendUserTaskHTML(process, bpmn, preferredMachine, true, false);
await sendImportedProcesses(process, bpmn, preferredMachine, true);
} catch (error) {
removeDeploymentFromMachines(process.id, [preferredMachine]);
if (logger) {
logger.error(`Failed to send process ${process.name} to selected machine. ${error}`);
}
throw error;
}
}
/**
* Function that given a machineMapping will return an array of unique machine addresses of machines in the mapping
*
* @param {object} machineMapping the mapping that contains the machines we want to know the addresses of
*/
function getUniqueMappedMachineAddresses(machineMapping) {
return Object.keys(machineMapping)
.map((key) => {
const entry = machineMapping[key];
if (entry.machineId) {
const machine = getMachines().find((curMachine) => curMachine.id === entry.machineId);
if (!machine) {
throw new Error("Can't find machine with given id to resolve address");
}
return { ip: machine.ip, port: machine.port };
}
const [ip, port] = entry.machineAddress
.replace(/\[?((?:(?:\d|\w)|:|\.)*)\]?:(\d*)/g, '$1+$2')
.split('+');
if (!ip || !port) {
throw new Error('Unable to get ip and port from given address.');
}
return { ip, port: parseInt(port, 10) };
})
.reduce((currEntries, entry) => {
if (!currEntries.some((el) => el.ip === entry.ip && el.port === entry.port)) {
return [...currEntries, entry];
}
return currEntries;
}, []);
}
/**
* Sends process bpmn and user task html to all machines that were mapped to flowNodes in the process
*
* @param {Object} process object containing all information about the process to deploy
* @param {String} bpmn the process description in xml
*/
async function staticDeployment(process, bpmn) {
bpmn = await setDeploymentMethod(bpmn, 'static');
await updateProcess(process.id, { bpmn });
const machineMapping = await getElementMachineMapping(bpmn);
let mappedMachinesAdresses;
try {
mappedMachinesAdresses = getUniqueMappedMachineAddresses(machineMapping);
} catch (err) {
if (logger) {
logger.debug(err);
}
throw new Error('check if all machines are available!');
}
// sends request and gets the answer and information if sending succeded for every request
const settledRequests = await asyncMap(mappedMachinesAdresses, async (address) => {
const result = await settleRequest(processEndpoint.deployProcess(address, process.id, bpmn));
return {
result,
address,
};
});
try {
if (settledRequests.some(({ result }) => result.status === 'Failed')) {
throw new Error('check if all machines are available!');
}
await sendUserTaskHTML(process, bpmn, machineMapping, false, false);
await sendImportedProcesses(process, bpmn, machineMapping, false);
} catch (error) {
removeDeploymentFromMachines(
process.id,
settledRequests
.filter((request) => request.result.status === 'Succeeded')
.map((request) => request.address)
);
if (logger) {
logger.info(`Failed to send process ${process.name} to at least one machine.`);
}
throw error;
}
}
/**
* Sends user task html to all machines that need them
*
* @param {object} process used to determine the endpoint we want to send the data to (actual process/importing process)
* @param {object} machineInfo either a map mapping taskId to machine or a machine
* @param {bool} dynamic indicates if the html is to be send to a singular machine or multiple ones
*/
async function sendUserTaskHTML(process, bpmn, machineInfo, dynamic, importer) {
// don't need to send html when 5thIndustry is used as the user task application
const bpmnObj = await toBpmnObject(bpmn);
const [processElement] = getElementsByTagName(bpmnObj, 'bpmn:Process');
const metaData = getMetaDataFromElement(processElement);
if (metaData['_5i-Inspection-Plan-ID']) {
// early exit
return;
}
const taskFileNameHtmlMapping = await getProcessUserTasksHtml(process.id);
const taskIdFileNameMapping = await getAllUserTaskFileNamesAndUserTaskIdsMapping(bpmn);
// check if each user task has user task data to send
Object.entries(taskIdFileNameMapping).forEach(([taskFileName, taskIds]) => {
if (!taskFileNameHtmlMapping[taskFileName]) {
throw new Error(`Missing user task data for tasks with ids ${toListString(taskIds)}!`);
}
});
// send all user task data
await asyncForEach(Object.entries(taskFileNameHtmlMapping), async ([taskFileName, html]) => {
const taskIds = taskIdFileNameMapping[taskFileName];
// don't send user task data that isn't used
if (!taskIds) {
return;
}
let machines = [];
if (dynamic) {
machines.push(machineInfo);
} else {
const machineMapping = {};
taskIds.forEach((id) => (machineMapping[id] = machineInfo[id]));
machines = getUniqueMappedMachineAddresses(machineMapping);
}
await asyncForEach(machines, async (machine) => {
await processEndpoint.sendUserTaskHTML(
machine,
importer ? importer.id : process.id,
taskFileName,
html,
!!importer
);
});
});
}
/**
* Checks the process for imported processes and sends them to the correct endpoint if there are any
*
* @param {*} process
* @param {*} bpmn
* @param {*} machineInfo
* @param {Boolean} dynamic if the process is deployed dynamically
*/
async function sendImportedProcesses(process, bpmn, machineInfo, dynamic) {
const activityDefinitionIdMapping = await getDefinitionsAndProcessIdForEveryCallActivity(bpmn);
Object.entries(activityDefinitionIdMapping).forEach(
async ([activityId, { definitionId: importedDefinitionId, processId }]) => {
const importedProcess = getProcesses().find((p) => p.id === importedDefinitionId);
const importedBpmn = await getProcessBpmn(importedDefinitionId);
const processIds = await getProcessIds(importedBpmn);
if (!processIds.includes(processId)) {
throw new Error(
`The file (${importedDefinitionId}) doesn't contain the expected process with id ${processId}`
);
}
// check if there is only one non typed start event
try {
validateCalledProcess(importedBpmn, processId);
} catch (err) {
throw new Error(`Invalid process referenced in callActivity ${activityId}: ${err}`);
}
let machine;
if (dynamic) {
machine = machineInfo;
} else {
[machine] = getUniqueMappedMachineAddresses({ activityId: machineInfo[activityId] });
}
await processEndpoint.sendImportedProcess(
machine,
process.id,
importedDefinitionId,
importedBpmn
);
// send the html data for the imported process to the same machine as the imported process
// dynamic = true so we just send everything to the same machine
await sendUserTaskHTML(importedProcess, importedBpmn, machine, true, process);
// recursively send all nested imported processes (imported processes in an imported process)
// to the same machine as the imported process
// dynamic = true so we just send everything to the same machine
await sendImportedProcesses(process, importedBpmn, machine, true);
}
);
}
export async function deployProcess(processDefinitionsId, dynamic) {
const process = getProcesses().find((process) => process.id === processDefinitionsId);
if (!process) {
throw new Error(`No process found for given process id ${processDefinitionsId}!`);
}
const bpmn = await getProcessBpmn(process.id);
if (!bpmn) {
throw new Error(`Can't find bpmn for the process with id ${processDefinitionsId}`);
}
const ids = await getProcessIds(bpmn);
// the engine only allows for descriptions containing a single process
if (ids.length > 1) {
throw new Error('Process desciption contains more than one Process');
}
if (ids.length === 0) {
throw new Error("Process description doesn't contain a process");
}
if (dynamic) {
await dynamicDeployment(process, bpmn);
} else {
await staticDeployment(process, bpmn);
}
await requestDeploymentInformation();
}