Source: engine/universal/core/src/engine/shouldPassToken.js

const distribution = require('@proceed/distribution');
const decider = require('@proceed/decider');
const { config } = require('@proceed/machine');
const Parser = require('@proceed/constraint-parser-xml-json/parser.js');
const {
  forwardProcess,
  forwardInstance,
  forwardHTML,
  forwardImports,
  getMachineInfo,
} = require('./processForwarding.js');
const { getAllBpmnFlowElements } = require('@proceed/bpmn-helper');

/**
 * Evaluates if the next acitvity is supposed be executed on the current engine for static deployment
 *
 * @param {object} engine instance of the engine class containing all information about the current process and its instances
 * @param {object} nextActivity information object about the activity that the token is supposed to be moved to
 * @returns {staticDeploymentResult} Result object that contains information about execution of next element
 */
async function evaluateStaticDeployment(engine, nextActivity) {
  const staticDeploymentResult = {
    continueExecution: true,
    nextMachine: null,
  };

  // get information about expected current and next machine from process
  const nextMachineId = nextActivity.machineId;

  // try to get address of next machine from activity mapping
  const nextMachineAddr = nextActivity.machineAddress || '';

  // converts address with IPv4: (ip:port) or IPv6: ([ip]:port) to (ip+port)
  const address = nextMachineAddr.replace(/\[?((?:(?:\d|\w)|:|\.)*)\]?:(\d*)/g, '$1+$2');
  let [nextIp, nextPort] = address.split('+');
  nextPort = parseInt(nextPort);

  // get information for current machine
  const { id: currId, ip: currIp, port: currPort } = engine.machineInformation;
  const { processes } = await config.readConfig();

  let nextMachine;

  if (nextMachineId && nextMachineId !== currId) {
    // the next activity is mapped to a specific machine id of another machine
    nextMachine = distribution.communication
      .getAvailableMachines()
      .find((machine) => machine.id === nextMachineId);
  } else if (!nextMachineId && (nextIp !== currIp || nextPort !== currPort)) {
    // the address of the next engine differs from the one of the current engine => get necessary machine information
    try {
      // request info about next machine from the machine itself
      const machineInfo = await getMachineInfo(nextIp, nextPort);
      if (machineInfo.id === currId) {
        staticDeploymentResult.continueExecution = !processes.deactivateProcessExecution;
        return staticDeploymentResult;
      } else {
        nextMachine = { ...machineInfo };
        nextMachine.ip = nextIp;
        nextMachine.port = nextPort;
      }
    } catch (err) {
      // if the request fails we can't send the token to next machine => fail
      staticDeploymentResult.continueExecution = false;
      return staticDeploymentResult;
    }
  } else {
    staticDeploymentResult.continueExecution = !processes.deactivateProcessExecution;
    return staticDeploymentResult;
  }

  if (nextMachine) {
    // remove unnecessary information about next machine; set hostname as name if there is no name
    staticDeploymentResult.nextMachine = (({ id, ip, port, name, hostname }) => ({
      id,
      ip,
      port,
      name: name || hostname,
    }))(nextMachine);
  } else {
    staticDeploymentResult.continueExecution = false;
  }

  return staticDeploymentResult;
}
/**
 *
 * @param {object} engine instance of the engine class containing all information about the current process and its instances
 * @param {string} processInstanceId id of the specific instance the token is running in
 * @param {string} tokenId the id of the token for which we have to decide where to execute it next
 * @param {string} from the previous executed element
 * @param {string} to the next element to be executed
 * @param {object} machine machine to forward the instance to
 * @returns {boolean} - true if instance was successully forwarded, false if not
 */
async function forwardDynamicInstance(engine, processInstanceId, tokenId, from, to, machine) {
  if (!machine) {
    return false;
  }
  const { ip, port, name } = machine;
  // send process bpmn to next machine

  const processInstance = engine._process.getInstanceById(processInstanceId);
  const instanceInfo = { ...processInstance.getState() };
  const currentFlowNodeToken = { ...engine.getToken(processInstanceId, tokenId), from, to };
  instanceInfo.tokens = [currentFlowNodeToken];

  try {
    await forwardProcess(ip, port, engine.definitionId, engine._bpmn);
  } catch (err) {
    engine._log.info({
      msg: `Error sending process bpmn to next machine: ${err}. InstanceId = ${processInstanceId}`,
      instanceId: processInstanceId,
    });
    return false;
  }

  // send user task html to next machine
  try {
    await forwardHTML(ip, port, engine.definitionId);
  } catch (err) {
    engine._log.info({
      msg: `Error sending user task html to next machine: ${err}. InstanceId = ${processInstanceId}`,
      instanceId: processInstanceId,
    });
    return false;
  }

  // send imported processes to next machine

  try {
    await forwardImports(ip, port, engine.definitionId);
  } catch (err) {
    engine._log.info({
      msg: `Error sending imported processes to next machine: ${err}. InstanceId = ${processInstanceId}`,
      instanceId: processInstanceId,
    });
    return false;
  }

  try {
    await forwardInstance(ip, port, engine.definitionId, processInstanceId, instanceInfo);

    engine._log.info({
      msg: `Forwarding token execution to another machine ${name}. TokenId = ${instanceInfo.tokens[0].tokenId}. InstanceId = ${processInstanceId}`,
      instanceId: processInstanceId,
    });
  } catch (err) {
    engine._log.info({
      msg: `Unable to forward process to next machine ${name}. TokenId = ${instanceInfo.tokens[0].tokenId}. InstanceId = ${processInstanceId}`,
      instanceId: processInstanceId,
    });
    return false;
  }

  return true;
}

/**
 * Reevaluate after timer expired to find the optimal next machine to continue execution of the current token
 *
 * @param {object} engine instance of the engine class containing all information about the current process and its instances
 * @param {string} processInstanceId id of the specific instance the token is running in
 * @param {object} processInfo object with information about the executed process
 * @param {string} tokenId the id of the token for which we have to decide where to execute it next
 * @param {string} from the previous executed element
 * @param {string} to the next element to be executed
 * @returns {dynamicDeploymentResult} holding information about continuation of execution
 */
async function reEvaluateDynamicDeployment(
  engine,
  processInstanceId,
  processInfo,
  tokenId,
  from,
  to
) {
  let dynamicDeploymentResult = {
    engineList: [],
    abortCheck: {
      stopProcess: null,
      unfulfilledConstraints: [],
    },
  };

  const reEvaluateTimer = await config.readConfig('router.reEvaluateTimer');

  const parser = new Parser();
  const bpmn = engine._bpmn;

  const processConstraints = parser.getConstraints(bpmn);
  const taskConstraints = parser.getConstraints(bpmn, to);

  return new Promise((resolve) => {
    // after timer, call evaluation again
    setTimeout(async () => {
      const processInstanceInfo = engine.getInstanceInformation(processInstanceId);
      const token = engine.getToken(processInstanceId, tokenId);

      const tokenInfo = {
        globalStartTime: processInstanceInfo.globalStartTime,
        localStartTime: token.localStartTime,
        localExecutionTime: token.localExecutionTime,
        machineHops: token.machineHops,
        storageRounds: token.deciderStorageRounds + 1,
        storageTime: token.deciderStorageTime + reEvaluateTimer,
      };

      engine.updateToken(processInstanceId, tokenId, {
        deciderStorageRounds: token.deciderStorageRounds + 1,
        deciderStorageTime: token.deciderStorageTime + reEvaluateTimer,
      });

      let reEvaluationResult;
      reEvaluationResult = await decider.findOptimalNextMachine(
        processInfo,
        tokenInfo,
        taskConstraints.processConstraints || {},
        processConstraints.processConstraints || {}
      );
      // recursively call reEvaluation again if still no engines found
      if (
        reEvaluationResult.engineList.length === 0 &&
        reEvaluationResult.abortCheck.stopProcess === null
      ) {
        reEvaluationResult = await reEvaluateDynamicDeployment(
          engine,
          processInstanceId,
          processInfo,
          tokenId,
          from,
          to
        );
      }
      resolve(reEvaluationResult);
    }, reEvaluateTimer);
  });
}
/**
 * Uses the decider module to find the optimal next machine to continue execution of the current token
 *
 * @param {object} engine instance of the engine class containing all information about the current process and its instances
 * @param {string} processInstanceId id of the specific instance the token is running in
 * @param {object} processInfo - information about the running process
 * @param {string} tokenId the id of the token for which we have to decide where to execute it next
 * @param {string} from the previous executed element
 * @param {string} to the next element to be executed
 * @returns {dynamicDeploymentResult} holding information about continuation of execution
 */
async function evaluateDynamicDeployment(
  engine,
  processInstanceId,
  processInfo,
  tokenId,
  from,
  to
) {
  let dynamicDeploymentResult = {
    engineList: [],
    abortCheck: {
      stopProcess: null,
      unfulfilledConstraints: [],
    },
  };

  const parser = new Parser();
  const bpmn = engine._bpmn;
  // parse constraints from process definition
  const processConstraints = parser.getConstraints(bpmn);
  const taskConstraints = parser.getConstraints(bpmn, to);
  const previousTaskConstraints = parser.getConstraints(bpmn, from);

  const processInstanceInfo = engine.getInstanceInformation(processInstanceId);
  const currentFlowNodeToken = engine.getToken(processInstanceId, tokenId);

  const previousExecution = processInstanceInfo.log
    .filter(
      (execution) =>
        execution.flowElementId === from &&
        (execution.tokenId.includes(tokenId) || tokenId.includes(execution.tokenId))
    )
    .pop();

  const tokenInfo = {
    globalStartTime: processInstanceInfo.globalStartTime,
    localStartTime: currentFlowNodeToken.localStartTime,
    localExecutionTime: currentFlowNodeToken.localExecutionTime,
    machineHops: currentFlowNodeToken.machineHops,
    storageRounds: currentFlowNodeToken.deciderStorageRounds,
    storageTime: currentFlowNodeToken.deciderStorageTime,
  };

  // first check if previous execution was valid -> abort instance or token if not
  const previousExecutionCheck = await decider.preCheckAbort(
    processInfo,
    {
      ...tokenInfo,
      flowNodeTime: previousExecution
        ? previousExecution.endTime - previousExecution.startTime
        : null,
    },
    previousTaskConstraints.hardConstraints || [],
    processConstraints.hardConstraints || []
  );

  // stop process because previous execution was not valid
  if (previousExecutionCheck.stopProcess !== null) {
    dynamicDeploymentResult.abortCheck = previousExecutionCheck;
    return dynamicDeploymentResult;
  }
  // decide where to move the execution using the decider
  dynamicDeploymentResult = await decider.findOptimalNextMachine(
    processInfo,
    tokenInfo,
    taskConstraints.processConstraints || {},
    processConstraints.processConstraints || {}
  );

  // reEvaluation because no fitting engine found
  if (
    dynamicDeploymentResult.abortCheck.stopProcess === null &&
    dynamicDeploymentResult.engineList.length === 0
  ) {
    dynamicDeploymentResult = await reEvaluateDynamicDeployment(
      engine,
      processInstanceId,
      processInfo,
      tokenId,
      from,
      to
    );
  }

  return dynamicDeploymentResult;
}

module.exports = {
  getShouldPassToken(engine) {
    // should return true if token should be executed on same engine, false if not
    // update token state respectively
    return async function shouldPassToken(processId, processInstanceId, from, to, tokenId, state) {
      // do not continue execution if process is pausing
      if (engine._pausing) {
        engine.updateToken(processInstanceId, tokenId, { state: 'PAUSED' });
        return false;
      }

      engine.updateToken(processInstanceId, tokenId, { state: 'DEPLOYMENT-WAITING' });

      const processInstance = engine._process.getInstanceById(processInstanceId);
      const processDefinition = processInstance.moddleDefinitions.get('rootElements')[0];
      const allFlowElements = await getAllBpmnFlowElements(engine._bpmn);
      const nextActivity = allFlowElements.find((element) => element.id === to);
      const deploymentMethod = processDefinition.$attrs['proceed:deploymentMethod'];

      // get information about the token for which are evaluating
      const currentFlowNodeToken = engine.getToken(processInstanceId, tokenId);

      if (deploymentMethod === 'static') {
        const staticDeploymentResult = await evaluateStaticDeployment(engine, nextActivity);

        const { nextMachine, continueExecution } = staticDeploymentResult;

        if (!continueExecution) {
          engine._log.info({
            msg: `Can't forward process. Next machine is unknown. TokenId = ${tokenId}. InstanceId = ${processInstanceId}`,
            instanceId: processInstanceId,
          });
          processInstance.endToken(tokenId, {
            state: 'ERROR-CONSTRAINT-UNFULFILLED',
            endTime: +new Date(),
            errorMessage: 'Token stopped execution',
          });
          processInstance.updateLog(currentFlowNodeToken.currentFlowElementId, tokenId, {
            machine: engine.machineInformation,
          });
          return false;
        }

        if (nextMachine) {
          const { ip, port, name } = nextMachine;

          // forwarding the instance
          const instanceInfo = { ...processInstance.getState() };
          currentFlowNodeToken.from = from;
          currentFlowNodeToken.to = to;
          instanceInfo.tokens = [currentFlowNodeToken];
          try {
            await forwardInstance(ip, port, engine.definitionId, processInstanceId, instanceInfo);

            engine._log.info({
              msg: `Forwarding token execution to another machine ${name}. TokenId = ${tokenId}. InstanceId = ${processInstanceId}`,
              instanceId: processInstanceId,
            });
          } catch (err) {
            engine._log.info({
              msg: `Unable to forward process to next machine ${name}. TokenId = ${tokenId}. InstanceId = ${processInstanceId}`,
              instanceId: processInstanceId,
            });
          }

          engine._log.info({
            msg: `Ending execution for current token. TokenId = ${tokenId}. InstanceId = ${processInstanceId}`,
            instanceId: processInstanceId,
          });

          processInstance.endToken(tokenId, {
            state: 'FORWARDED',
            nextMachine,
            endTime: +new Date(),
          });
          processInstance.updateLog(currentFlowNodeToken.currentFlowElementId, tokenId, {
            machine: engine.machineInformation,
            nextMachine,
          });
          return false;
        }
      } else if (deploymentMethod === 'dynamic') {
        const allUserTaskIds = await distribution.db.getAllUserTasks(
          engine.definitionId,
          undefined
        );

        const processInfo = {
          id: processId,

          nextFlowNode: {
            id: to,
            isUserTask: !!allUserTaskIds.find((userTaskId) => userTaskId === to),
          },
        };

        // get evaluation result of decider
        let dynamicDeploymentResult = await evaluateDynamicDeployment(
          engine,
          processInstanceId,
          processInfo,
          tokenId,
          from,
          to
        );

        // try to forward instance (or continue locally) until token/instance has to be aborted
        while (dynamicDeploymentResult.abortCheck.stopProcess === null) {
          let nextMachine = dynamicDeploymentResult.engineList.shift();

          // continue locally
          if (nextMachine.id === 'local-engine') {
            return true;
          }

          if (!nextMachine) {
            // re-evaluate if instance could not be forwarded to another machine
            dynamicDeploymentResult = await reEvaluateDynamicDeployment(
              engine,
              processInstanceId,
              processInfo,
              tokenId,
              from,
              to
            );

            continue;
          }

          const instanceForwarded = await forwardDynamicInstance(
            engine,
            processInstanceId,
            tokenId,
            from,
            to,
            nextMachine
          );

          if (instanceForwarded) {
            // mark token as forwarded
            processInstance.endToken(tokenId, {
              state: 'FORWARDED',
              nextMachine,
              endTime: +new Date(),
            });

            processInstance.updateLog(currentFlowNodeToken.currentFlowElementId, tokenId, {
              machine: engine.machineInformation,
              nextMachine,
            });

            return false;
          }
        }

        // stop instance
        if (dynamicDeploymentResult.abortCheck.stopProcess === 'instance') {
          engine._log.info({
            msg: `Can't forward process. Process will be stopped. InstanceId = ${processInstanceId}`,
            instanceId: processInstanceId,
          });

          // end every token locally with state ERROR-CONSTRAINT-UNFULFILLED
          await engine.stopUnfulfilledInstance(
            processInstanceId,
            dynamicDeploymentResult.abortCheck.unfulfilledConstraints
          );
          return false;
        }

        // stop token
        if (dynamicDeploymentResult.abortCheck.stopProcess === 'token') {
          engine._log.info({
            msg: `Can't forward process. Token will be stopped. TokenId = ${tokenId}. InstanceId = ${processInstanceId}`,
            instanceId: processInstanceId,
          });
          processInstance.endToken(tokenId, {
            state: 'ERROR-CONSTRAINT-UNFULFILLED',
            endTime: +new Date(),
            errorMessage: `Token stopped execution because of: ${dynamicDeploymentResult.abortCheck.unfulfilledConstraints.join(
              ', '
            )}`,
          });
          processInstance.updateLog(currentFlowNodeToken.currentFlowElementId, tokenId, {
            machine: engine.machineInformation,
          });
          return false;
        }
      }

      return true;
    };
  },
};