Source: management-system/src/backend/shared-electron-server/network/deployment.js

import { processEndpoint, _5thIndustryEndpoint } from './ms-engine-communication/module.js';

import {
  asyncForEach,
  asyncMap,
} from '../../../shared-frontend-backend/helpers/javascriptHelpers.js';

import { toListString } from '../../../shared-frontend-backend/helpers/arrayHelpers.js';

import { requestDeploymentInformation } from './machines/machineInfo.js';
import { getMachines } from '../data/machines.js';
import loggingEx from '@proceed/machine';
const { logging } = loggingEx;

import bpmnEx from '@proceed/bpmn-helper';
const {
  toBpmnObject,
  toBpmnXml,
  getElementsByTagName,
  getElementMachineMapping,
  setDeploymentMethod,
  getStartEvents,
  getProcessIds,
  getDefinitionsAndProcessIdForEveryCallActivity,
  validateCalledProcess,
  getAllUserTaskFileNamesAndUserTaskIdsMapping,
  getTaskConstraintMapping,
  getProcessConstraints,
  getMetaDataFromElement,
} = bpmnEx;

import {
  updateProcess,
  getProcessUserTasksHtml,
  getProcesses,
  getProcessBpmn,
} from '../data/process.js';

import decider from '@proceed/decider';

import {
  start5thIndustryPlan,
  stop5thIndustryPlan,
  get5thIndustryServiceAccountData,
  get5thIndustryAuthorization,
} from './5thIndustry/5thIndustry.js';

export async function getInstanceInformation(machine, definitionId, instanceId) {
  return await processEndpoint.getInstanceInformation(machine, definitionId, instanceId);
}
export async function getActiveUserTasks(machine) {
  return await processEndpoint.getActiveUserTasks(machine);
}
export async function getActiveUserTaskHTML(
  machine,
  instanceId,
  userTaskId,
  processChain,
  callChain
) {
  return await processEndpoint.getActiveUserTaskHTML(
    machine,
    instanceId,
    userTaskId,
    processChain,
    callChain
  );
}
export async function completeUserTask(machine, instanceId, userTaskId, processChain, callChain) {
  await processEndpoint.completeUserTask(machine, instanceId, userTaskId, processChain, callChain);
}
export async function updateUserTaskMilestone(
  machine,
  instanceId,
  userTaskId,
  processChain,
  callChain,
  milestone
) {
  await processEndpoint.updateUserTaskMilestone(
    machine,
    instanceId,
    userTaskId,
    processChain,
    callChain,
    milestone
  );
}
export async function getProcessInstances(machine, definitionId) {
  return await processEndpoint.getProcessInstances(machine, definitionId);
}
export async function getDeployedProcesses(machine) {
  return await processEndpoint.getDeployedProcesses(machine);
}

const configObject = {
  moduleName: 'MS',
  consoleOnly: true,
};

let logger = null;

async function getLogger() {
  logger = await logging.getLogger(configObject);
}

getLogger();

/**
 * Waits for request to resolve or reject and returns the result and information if it succeded
 *
 * @param {promise} request the request we await to resolve
 *
 * @returns {object} object containing a result and a status member
 */
async function settleRequest(request) {
  let result;
  let status;
  try {
    result = await request;
    status = 'Succeeded';
  } catch (err) {
    result = err;
    status = 'Failed';
  }

  return { result, status };
}

/**
 * Returns an array with all machines the process with the given id is deployed to
 *
 * @param {String} processDefinitionsId the id of the deployed process
 */
function getDeployedToMachines(processDefinitionsId) {
  const machines = getMachines();

  const deployedTo = machines.filter((machine) => {
    if (
      machine.deployedProcesses &&
      Object.values(machine.deployedProcesses).some(
        (deployment) => deployment.definitionId === processDefinitionsId
      )
    ) {
      return true;
    }

    return false;
  });

  return deployedTo;
}

/**
 * Removes deployed process with corresponding definitionid from all given machines
 *
 * @param {String} definitionId name of the file the process is stored under
 * @param {Array} machines contains information about all machines the process is supposed to be removed from
 */
async function removeDeploymentFromMachines(definitionId, machines) {
  // makes all requests executable and awaitable at the same time even when some may fail
  const settledRequests = await asyncMap(machines, async (machine) => {
    const result = await settleRequest(processEndpoint.removeDeployment(machine, definitionId));
    return { request: result, machine };
  });

  settledRequests.forEach(({ request, machine }) => {
    if (request.status === 'Failed') {
      if (logger) {
        logger.error(`Failed to delete from ${machine.name || machine.ip}: ${request.result}`);
      }
    }
  });
}

/**
 * Removes the deployment of the process with the given id from all currently connected machines
 *
 * @param {String} processDefinitionsId the id of the deployed process
 */
export async function removeDeployment(processDefinitionsId) {
  const deployedTo = getDeployedToMachines(processDefinitionsId);

  removeDeploymentFromMachines(processDefinitionsId, deployedTo);
}

/**
 * Starts an instance of the deployed process with the given id
 *
 *
 * @param {String} processDefinitionsId id of the deployed process
 * @throws Will throw an error if starting the instance fails
 */
// TODO: use versioning to find the correct deployment to start
export async function startInstance(processDefinitionsId) {
  const deployedTo = getDeployedToMachines(processDefinitionsId);

  const deployment = deployedTo[0].deployedProcesses[processDefinitionsId];

  let startMachineInfo;
  const bpmnObj = await toBpmnObject(deployment.bpmn);
  if (deployment.deploymentMethod === 'static') {
    const machineMapping = await getElementMachineMapping(bpmnObj);
    const [startEventId] = await getStartEvents(bpmnObj);

    startMachineInfo = machineMapping[startEventId];
  } else {
    startMachineInfo = { machineId: deployedTo[0].id };
  }
  let startMachine;
  if (startMachineInfo.machineId) {
    startMachine = deployedTo.find((m) => m.id === startMachineInfo.machineId);
  } else {
    const [ip, port] = startMachineInfo.machineAddress
      .replace(/\[?((?:(?:\d|\w)|:|\.)*)\]?:(\d*)/g, '$1+$2')
      .split('+');
    startMachine = { ip, port };
  }

  // if a 5thIndustry Plan is linked to the Process => make sure it is set to being in progress when the instance is started
  let started5thIndustryPlan = false;
  const [process] = getElementsByTagName(bpmnObj, 'bpmn:Process');
  const metaData = getMetaDataFromElement(process);
  try {
    if (metaData['_5i-Inspection-Plan-ID']) {
      await start5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
      started5thIndustryPlan = true;

      const serviceAccountData = get5thIndustryServiceAccountData();

      // give the engine a way to authenticate itself in the 5thIndustry Application
      if (serviceAccountData) {
        // Pass the currently used service account data
        await _5thIndustryEndpoint.send5thIndustryServiceAccountData(
          startMachine,
          serviceAccountData
        );
      } else {
        // Pass the currently used authorization token
        // Warning: when this token becomes invalid the engine won't be able to communicate with the 5thIndustry Application
        await _5thIndustryEndpoint.send5thIndustryAuthorization(
          startMachine,
          get5thIndustryAuthorization()
        );
      }
    }

    await processEndpoint.startProcessInstance(startMachine, processDefinitionsId, {});
  } catch (e) {
    if (logger) {
      logger.debug(`EXECUTION Error starting the process instance: ${e}`);
      throw e;
    }
    // make sure to rollback in 5thIndustry App if there was a plan set to in progress but starting the instance failed
    if (started5thIndustryPlan) {
      await stop5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
    }
  }
}

const runningStates = ['PAUSED', 'RUNNING', 'READY', 'DEPLOYMENT-WAITING'];

/**
 * Sends request to stop an instance of a process on a machine
 */
export async function stopInstance(processDefinitionsId, instanceId) {
  await requestDeploymentInformation();
  const deployedTo = getDeployedToMachines(processDefinitionsId);

  const deployment = deployedTo[0].deployedProcesses[processDefinitionsId];

  const bpmnObj = await toBpmnObject(deployment.bpmn);
  const [process] = getElementsByTagName(bpmnObj, 'bpmn:Process');
  const metaData = getMetaDataFromElement(process);

  if (metaData['_5i-Inspection-Plan-ID']) {
    await stop5thIndustryPlan(metaData['_5i-Inspection-Plan-ID']);
  }

  const stillRunning = deployedTo.filter((machine) => {
    const instance = Object.values(machine.deployedProcesses)
      .find((deployment) =>
        deployment.instances.some((instance) => instance.processInstanceId === instanceId)
      )
      .instances.find((instance) => instance.processInstanceId === instanceId);

    if (instance && instance.instanceState.some((state) => runningStates.includes(state))) {
      return true;
    }

    return false;
  });

  stillRunning.forEach((machine) => {
    if (!machine) {
      if (logger) {
        logger.error('Unable to stop instance: Machine not found.');
      }
      throw new Error('Unable to stop instance: Machine not found.');
    }

    try {
      processEndpoint.stopProcessInstance(machine, processDefinitionsId, instanceId);
    } catch (err) {
      if (logger) {
        logger.error(`Failed to stop instance on ${machine.name}: ${err}.`);
        throw err;
      }
    }
  });
}

/**
 * Sends request to pause an instance of a process on a machine
 */
export async function pauseInstance(processDefinitionsId, instanceId) {
  await requestDeploymentInformation();
  const deployedTo = getDeployedToMachines(processDefinitionsId);

  const running = deployedTo.filter((machine) => {
    const instance = Object.values(machine.deployedProcesses)
      .find((deployment) =>
        deployment.instances.some((instance) => instance.processInstanceId === instanceId)
      )
      .instances.find((instance) => instance.processInstanceId === instanceId);

    if (
      instance &&
      instance.instanceState.some((state) => state !== 'PAUSED' && runningStates.includes(state))
    ) {
      return true;
    }

    return false;
  });

  running.forEach((machine) => {
    if (!machine) {
      if (logger) {
        logger.error('Unable to pause instance: Machine not found.');
      }
      throw new Error('Unable to pause instance: Machine not found.');
    }

    try {
      processEndpoint.pauseProcessInstance(machine, processDefinitionsId, instanceId);
    } catch (err) {
      if (logger) {
        logger.error(`Failed to pause instance on ${machine.name}: ${err}.`);
        throw err;
      }
    }
  });
}

/**
 * Sends request to resume an instance of a process on a machine
 */
export async function resumeInstance(processDefinitionsId, instanceId) {
  await requestDeploymentInformation();
  const deployedTo = getDeployedToMachines(processDefinitionsId);

  const paused = deployedTo.filter((machine) => {
    const instance = Object.values(machine.deployedProcesses)
      .find((deployment) =>
        deployment.instances.some((instance) => instance.processInstanceId === instanceId)
      )
      .instances.find((instance) => instance.processInstanceId === instanceId);

    if (
      instance &&
      instance.instanceState.some((state) => state === 'PAUSING' || state === 'PAUSED')
    ) {
      return true;
    }

    return false;
  });

  paused.forEach((machine) => {
    if (!machine) {
      if (logger) {
        logger.error('Unable to resume instance: Machine not found.');
      }
      throw new Error('Unable to resume instance: Machine not found.');
    }

    try {
      processEndpoint.resumeProcessInstance(machine, processDefinitionsId, instanceId);
    } catch (err) {
      if (logger) {
        logger.error(`Failed to resume instance on ${machine.name}: ${err}.`);
        throw err;
      }
    }
  });
}

/**
 * Moves token inside an instance to a new flow element
 *
 * @param {Object} machine
 * @param {String} machine.ip the ip address of the machine
 * @param {Number} machine.port the port of the machine
 * @param {String} processDefinitionsId
 * @param {String} instanceId
 * @param {String} tokenId
 * @param {String} flowElementId
 */
export async function moveToken(machine, processDefinitionsId, instanceId, tokenId, flowElementId) {
  await processEndpoint.moveToken(
    machine,
    processDefinitionsId,
    instanceId,
    tokenId,
    flowElementId
  );
}

/**
 * Tries to find an optimal machine to deploy process to and sends all necessary process information on success
 *
 * @param {Object} process an object containing inforamtion about the process to deploy
 * @param {String} bpmn the process description in xml
 */
async function dynamicDeployment(process, bpmn) {
  const bpmnObj = await toBpmnObject(bpmn);
  const startEventIds = await getStartEvents(bpmnObj);
  const processConstraints = await getProcessConstraints(bpmnObj);
  const taskConstraintMapping = await getTaskConstraintMapping(bpmnObj);

  await setDeploymentMethod(bpmnObj, 'dynamic');

  bpmn = await toBpmnXml(bpmnObj);

  await updateProcess(process.id, { bpmn });

  const addedMachines = getMachines().filter(
    (machine) => !machine.discovered && machine.status === 'CONNECTED'
  );

  // use decider to get sorted list of viable engines
  const { engineList } = await decider.findOptimalExternalMachine(
    { id: process.id, name: process.name, nextFlowNode: startEventIds[0] },
    taskConstraintMapping[startEventIds[0]] || {},
    processConstraints || {},
    addedMachines
  );

  // try to get the best engine
  let [preferredMachine] = engineList;

  // there is no deployable machine known to the MS
  if (!preferredMachine) {
    if (logger) {
      logger.error(`Unable to find machine to deploy process: ${process.name} to.`);
    }

    throw new Error('There is no machine the process can be deployed to.');
  }

  // deploying process to selected engine
  try {
    await processEndpoint.deployProcess(preferredMachine, process.id, bpmn);
    await sendUserTaskHTML(process, bpmn, preferredMachine, true, false);
    await sendImportedProcesses(process, bpmn, preferredMachine, true);
  } catch (error) {
    removeDeploymentFromMachines(process.id, [preferredMachine]);

    if (logger) {
      logger.error(`Failed to send process ${process.name} to selected machine. ${error}`);
    }
    throw error;
  }
}

/**
 * Function that given a machineMapping will return an array of unique machine addresses of machines in the mapping
 *
 * @param {object} machineMapping the mapping that contains the machines we want to know the addresses of
 */
function getUniqueMappedMachineAddresses(machineMapping) {
  return Object.keys(machineMapping)
    .map((key) => {
      const entry = machineMapping[key];

      if (entry.machineId) {
        const machine = getMachines().find((curMachine) => curMachine.id === entry.machineId);
        if (!machine) {
          throw new Error("Can't find machine with given id to resolve address");
        }
        return { ip: machine.ip, port: machine.port };
      }

      const [ip, port] = entry.machineAddress
        .replace(/\[?((?:(?:\d|\w)|:|\.)*)\]?:(\d*)/g, '$1+$2')
        .split('+');
      if (!ip || !port) {
        throw new Error('Unable to get ip and port from given address.');
      }
      return { ip, port: parseInt(port, 10) };
    })
    .reduce((currEntries, entry) => {
      if (!currEntries.some((el) => el.ip === entry.ip && el.port === entry.port)) {
        return [...currEntries, entry];
      }

      return currEntries;
    }, []);
}

/**
 * Sends process bpmn and user task html to all machines that were mapped to flowNodes in the process
 *
 * @param {Object} process object containing all information about the process to deploy
 * @param {String} bpmn the process description in xml
 */
async function staticDeployment(process, bpmn) {
  bpmn = await setDeploymentMethod(bpmn, 'static');

  await updateProcess(process.id, { bpmn });

  const machineMapping = await getElementMachineMapping(bpmn);

  let mappedMachinesAdresses;
  try {
    mappedMachinesAdresses = getUniqueMappedMachineAddresses(machineMapping);
  } catch (err) {
    if (logger) {
      logger.debug(err);
    }
    throw new Error('check if all machines are available!');
  }

  // sends request and gets the answer and information if sending succeded for every request
  const settledRequests = await asyncMap(mappedMachinesAdresses, async (address) => {
    const result = await settleRequest(processEndpoint.deployProcess(address, process.id, bpmn));
    return {
      result,
      address,
    };
  });

  try {
    if (settledRequests.some(({ result }) => result.status === 'Failed')) {
      throw new Error('check if all machines are available!');
    }

    await sendUserTaskHTML(process, bpmn, machineMapping, false, false);
    await sendImportedProcesses(process, bpmn, machineMapping, false);
  } catch (error) {
    removeDeploymentFromMachines(
      process.id,
      settledRequests
        .filter((request) => request.result.status === 'Succeeded')
        .map((request) => request.address)
    );

    if (logger) {
      logger.info(`Failed to send process ${process.name} to at least one machine.`);
    }
    throw error;
  }
}

/**
 * Sends user task html to all machines that need them
 *
 * @param {object} process used to determine the endpoint we want to send the data to (actual process/importing process)
 * @param {object} machineInfo either a map mapping taskId to machine or a machine
 * @param {bool} dynamic indicates if the html is to be send to a singular machine or multiple ones
 */
async function sendUserTaskHTML(process, bpmn, machineInfo, dynamic, importer) {
  // don't need to send html when 5thIndustry is used as the user task application
  const bpmnObj = await toBpmnObject(bpmn);
  const [processElement] = getElementsByTagName(bpmnObj, 'bpmn:Process');
  const metaData = getMetaDataFromElement(processElement);

  if (metaData['_5i-Inspection-Plan-ID']) {
    // early exit
    return;
  }

  const taskFileNameHtmlMapping = await getProcessUserTasksHtml(process.id);
  const taskIdFileNameMapping = await getAllUserTaskFileNamesAndUserTaskIdsMapping(bpmn);

  // check if each user task has user task data to send
  Object.entries(taskIdFileNameMapping).forEach(([taskFileName, taskIds]) => {
    if (!taskFileNameHtmlMapping[taskFileName]) {
      throw new Error(`Missing user task data for tasks with ids ${toListString(taskIds)}!`);
    }
  });

  // send all user task data
  await asyncForEach(Object.entries(taskFileNameHtmlMapping), async ([taskFileName, html]) => {
    const taskIds = taskIdFileNameMapping[taskFileName];
    // don't send user task data that isn't used
    if (!taskIds) {
      return;
    }

    let machines = [];

    if (dynamic) {
      machines.push(machineInfo);
    } else {
      const machineMapping = {};

      taskIds.forEach((id) => (machineMapping[id] = machineInfo[id]));

      machines = getUniqueMappedMachineAddresses(machineMapping);
    }

    await asyncForEach(machines, async (machine) => {
      await processEndpoint.sendUserTaskHTML(
        machine,
        importer ? importer.id : process.id,
        taskFileName,
        html,
        !!importer
      );
    });
  });
}

/**
 * Checks the process for imported processes and sends them to the correct endpoint if there are any
 *
 * @param {*} process
 * @param {*} bpmn
 * @param {*} machineInfo
 * @param {Boolean} dynamic if the process is deployed dynamically
 */
async function sendImportedProcesses(process, bpmn, machineInfo, dynamic) {
  const activityDefinitionIdMapping = await getDefinitionsAndProcessIdForEveryCallActivity(bpmn);
  Object.entries(activityDefinitionIdMapping).forEach(
    async ([activityId, { definitionId: importedDefinitionId, processId }]) => {
      const importedProcess = getProcesses().find((p) => p.id === importedDefinitionId);

      const importedBpmn = await getProcessBpmn(importedDefinitionId);

      const processIds = await getProcessIds(importedBpmn);

      if (!processIds.includes(processId)) {
        throw new Error(
          `The file (${importedDefinitionId}) doesn't contain the expected process with id ${processId}`
        );
      }

      // check if there is only one non typed start event
      try {
        validateCalledProcess(importedBpmn, processId);
      } catch (err) {
        throw new Error(`Invalid process referenced in callActivity ${activityId}: ${err}`);
      }

      let machine;
      if (dynamic) {
        machine = machineInfo;
      } else {
        [machine] = getUniqueMappedMachineAddresses({ activityId: machineInfo[activityId] });
      }

      await processEndpoint.sendImportedProcess(
        machine,
        process.id,
        importedDefinitionId,
        importedBpmn
      );

      // send the html data for the imported process to the same machine as the imported process
      // dynamic = true so we just send everything to the same machine
      await sendUserTaskHTML(importedProcess, importedBpmn, machine, true, process);

      // recursively send all nested imported processes (imported processes in an imported process)
      // to the same machine as the imported process
      // dynamic = true so we just send everything to the same machine
      await sendImportedProcesses(process, importedBpmn, machine, true);
    }
  );
}

export async function deployProcess(processDefinitionsId, dynamic) {
  const process = getProcesses().find((process) => process.id === processDefinitionsId);

  if (!process) {
    throw new Error(`No process found for given process id ${processDefinitionsId}!`);
  }

  const bpmn = await getProcessBpmn(process.id);

  if (!bpmn) {
    throw new Error(`Can't find bpmn for the process with id ${processDefinitionsId}`);
  }

  const ids = await getProcessIds(bpmn);

  // the engine only allows for descriptions containing a single process
  if (ids.length > 1) {
    throw new Error('Process desciption contains more than one Process');
  }

  if (ids.length === 0) {
    throw new Error("Process description doesn't contain a process");
  }

  if (dynamic) {
    await dynamicDeployment(process, bpmn);
  } else {
    await staticDeployment(process, bpmn);
  }

  await requestDeploymentInformation();
}