Source: engine/universal/core/src/engine/engine.js

/* eslint-disable class-methods-use-this */
const { logging, information } = require('@proceed/machine');
const distribution = require('@proceed/distribution');
const NeoEngine = require('neo-bpmn-engine');
const { setupNeoEngine } = require('./neoEngineSetup.js');
const { getNewInstanceHandler } = require('./hookCallbacks.js');
const { getShouldPassToken } = require('./shouldPassToken.js');
const { getShouldActivateFlowNode } = require('./shouldActivateFlowNode.js');
const { getProcessIds } = require('@proceed/bpmn-helper');
// const Separator = require('./separator.js').default;

setupNeoEngine();

/**
 * @memberof module:@proceed/core
 * @class
 * Every instance is associated with exactly one BPMN process definition and can
 * contain multiple instances of said process.
 */
class Engine {
  /**
   * Creates a new Engine instance.
   */
  constructor() {
    /**
     * Boolean indicating wether this Engine instance has already been started or not.
     * @private
     * @type {boolean}
     */
    this._started = false;
    /**
     * Boolean indicating wether this Engine is currently pausing
     * @private
     * @type {boolean}
     */
    this._pausing = false;
    /**
     * The user tasks when encountered in a process instance and awaiting handling.
     * @type {object[]}
     */
    this.userTasks = [];
    /**
     * The engines in which we intend to execute the call activities, maps from the id of the call activity to the corresponding engine
     * @type {Object}
     */
    this.callActivityExecutors = {};
    /**
     * The globally known IDs for the instances created within this BPMN process
     * @type {string[]}
     */
    this.instanceIDs = [];
    /**
     * The ID of the process for this Engine instance
     * @type {string}
     */
    this.processID = null;

    /**
     * The name of the file the process description is taken from before being executed in this module
     * @type {string}
     */
    this.definitionId = null;

    /**
     * The definitionId for the process description of the imported process executed in this module
     */
    this.importDefinitionId = null;

    /**
     * The NEOBPMNEngine process object containing all process instances.
     * @private
     */
    this._process = null;

    /**
     * The logging instance configured with the provided processID.
     * @private
     */
    this._log = null;

    /**
     * The description of the process that is deployed to this engine instance
     */
    this._bpmn = '';

    /** The timestamp of the start of this process, at which it could init new
     * instances. They might be created later though.
     * @private
     * @type {number}
     */
    this._processStartTime = null;

    /** id, ip and name of this machine
     * @type {object}
     */
    this.machineInformation = null;
  }

  static provideScriptExecutor(scriptExecutor) {
    if (scriptExecutor) {
      NeoEngine.provideScriptExecutor(scriptExecutor);
    }
  }

  /**
   * Deploys the process to the NeoBPMN Engine making it ready to start instances
   *
   * @param {string} definitionId The name of the file of the (main) process (as stored in the `data`)
   * @param {string} importDefinitionId --optional-- the definitionId under which we can find the imported process definition we want to start here
   * module)
   */
  async deployProcess(definitionId, importDefinitionId) {
    // Fetch the stored BPMN
    let bpmn;

    if (!importDefinitionId) {
      bpmn = await distribution.db.getProcess(definitionId);
    } else {
      bpmn = await distribution.db.getImportedProcess(definitionId, importDefinitionId);
      this.importDefinitionId = importDefinitionId;
    }

    const [processId] = await getProcessIds(bpmn);
    // validate imports and user tasks on first deploy || assumes validity for imported processes since we expect to have a fully valid main process
    if (!importDefinitionId && !(await distribution.db.isProcessValid(definitionId))) {
      throw new Error(
        `Process ${processId} with definitionId ${definitionId} is invalid. It can't be deployed.`
      );
    }

    if (!importDefinitionId) {
      const log = logging.getLogger({
        moduleName: 'CORE',
        processID: processId,
        definitionId,
      });
      this._log = log;
    }

    // Every Engine instance is only allowed to be associated with one process
    // (possibly multiple instances of that process though)
    this.processID = processId;

    const { id, name, hostname, port } = await information.getMachineInformation([
      'id',
      'name',
      'hostname',
      'port',
    ]);

    const { ip } = distribution.communication
      .getAvailableMachines()
      .find((machine) => machine.id === id);

    this.machineInformation = { id, name: name || hostname, ip, port };

    const process = await NeoEngine.BpmnProcess.fromXml(processId, bpmn, {
      shouldPassTokenHook: getShouldPassToken(this),
      shouldActivateFlowNodeHook: getShouldActivateFlowNode(this, Engine),
    });

    process.deploy();

    this._process = process;
    this.definitionId = definitionId;
    this._bpmn = bpmn;
  }

  /**
   * Starts the execution of a BPMN process. This can involve the creation of
   * multiple instances of the process, if the process contains such events.
   * When encountering User Tasks in the ongoing execution, they are added to
   * the `userTasks` array property.
   * @param {object} processVariables The process variables in the init state
   * @param {object|string} instance contains the instance object that came from another engine to be contiued here (might contain only an id of an activity to start)
   * @param {function} onStarted function that is executed when the new instance starts
   * @param {function} onEnded function that is executed when the new instance ends
   * @param {function} onTokenEnded function that is executed when a token ends its execution
   */
  startProcess(processVariables, instance, onStarted, onEnded, onTokenEnded) {
    if (typeof instance === 'function') {
      onTokenEnded = onEnded;
      onEnded = onStarted;
      onStarted = instance;
      instance = undefined;
    }
    // we want to start a new instance at a specific node
    let activityId;
    if (typeof instance === 'string') {
      activityId = instance;
      instance = undefined;
    }

    // Subscribe to the new process instances stream before we start the execution
    this._process
      .getInstance$()
      .subscribe(getNewInstanceHandler(this, instance, onStarted, onEnded, onTokenEnded));

    if (this._started) {
      throw new Error('This Engine instance was already started with a process!');
    }
    this._started = true;

    try {
      if (activityId !== undefined) {
        // start at the specified activity
        this._process.startAt({
          tokens: [
            {
              currentFlowElementId: activityId,
              machineHops: 0,
              deciderStorageTime: 0,
              deciderStorageRounds: 0,
            },
          ],
        });
      } else if (instance !== undefined) {
        // continue the given instance
        this._process.startAt({
          globalStartTime: instance.globalStartTime,
          tokens: instance.tokens,
          instanceId: instance.processInstanceId,
          variables: processVariables,
          log: instance.log,
        });
      } else {
        // start the process at a its start event
        this._process.start({
          variables: processVariables,
          token: { machineHops: 0, deciderStorageTime: 0, deciderStorageRounds: 0 },
        });
      }
    } catch (error) {
      this._log.error(error);
    }
  }

  /**
   * Continues an token coming from another machine by inserting the token in the running instance on this engine
   *
   * @param {Object} instance the instance object coming from another machine we want to continue
   */
  insertToken(instance) {
    // the instance is already running => place token at desired location
    const localInstance = this._process.getInstanceById(instance.processInstanceId);
    const [token] = instance.tokens;
    const sequenceFlowId = localInstance.getSequenceFlowId(token.from, token.to);
    localInstance.updateVariables(instance.variables);
    localInstance.mergeFlowNodeLog(instance.log);

    const placingToken = { ...token };
    delete placingToken.from;
    delete placingToken.to;

    localInstance.placeTokenAt(sequenceFlowId, placingToken);
  }

  moveToken(instanceId, tokenId, targetId) {
    const localInstance = this._process.getInstanceById(instanceId);

    if (!localInstance) {
      throw new Error(`Instance with id ${instanceId} does not exist!`);
    }

    const localToken = localInstance.getState().tokens.find((token) => token.tokenId === tokenId);

    if (!localToken) {
      throw new Error(`Token with id ${tokenId} does not exist!`);
    }

    localInstance.placeTokenAt(targetId, { ...localToken });
  }

  /**
   * Signals the user task as completed to the corresponding process instance,
   * which is responsible.
   * @param {string} instanceID The id of the process instance to be notified
   * @param {string} userTaskID The id of the user task
   * @param {object} variables The updated process variables
   */
  completeUserTask(instanceID, userTaskID, variables) {
    const userTask = this.userTasks.find(
      (uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
    );

    userTask.processInstance.completeActivity(userTask.id, userTask.tokenId, variables);
  }

  /**
   * Signals the user task as aborted to the corresponding process instance,
   * which is responsible.
   * @param {string} instanceID The id of the process instance to be notified
   * @param {string} userTaskID The id of the user task
   */
  abortUserTask(instanceID, userTaskID) {
    const userTask = this.userTasks.find(
      (uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
    );

    userTask.processInstance.failActivity(userTask.id, userTask.tokenId);
  }

  /**
   * Sets the current progress of a flowNode running at given token (mainly used for usertasks)
   * @param {string} instanceID The id of the process instance to be notified
   * @param {string} tokenId  The id of the token
   * @param {number} progress The current progress of a flow node
   */
  setFlowNodeProgress(instanceID, tokenId, progress) {
    const instance = this.getInstance(instanceID);
    instance.setFlowNodeProgress(tokenId, progress);
  }

  /**
   * Returns the instance with the given id
   *
   * @param {string} instanceID id of the instance we want to get
   * @returns {object} - the requested process instance
   */
  getInstance(instanceID) {
    return this._process.getInstanceById(instanceID);
  }

  /**
   * Deletes the instance with the given id
   *
   * @param {string} instanceID id of the instance to be deleted
   */
  deleteInstance(instanceID) {
    this.instanceIDs.splice(this.instanceIDs.indexOf(instanceID), 1);
    this._process.deleteInstanceById(instanceID);
  }

  getInstanceInformation(instanceID) {
    const instance = this.getInstance(instanceID);

    const state = instance.getState();

    return state;
  }

  getAllInstanceTokens(instanceID) {
    const instance = this.getInstance(instanceID);

    const state = instance.getState();

    return state.tokens;
  }

  getToken(instanceID, tokenId) {
    const instance = this.getInstance(instanceID);

    const tokens = instance.getState().tokens;

    return tokens.find((token) => token.tokenId === tokenId);
  }

  updateToken(instanceID, tokenId, attributes) {
    const instance = this.getInstance(instanceID);

    instance.updateToken(tokenId, attributes);
  }

  mergeFlowNodeLog(instanceID, executions) {
    const instance = this.getInstance(instanceID);

    instance.mergeFlowNodeLog(executions);
  }

  logExecution(instanceID, elementId, tokenId, attributes) {
    const instance = this.getInstance(instanceID);

    instance.logExecution(elementId, tokenId, attributes);
  }

  removeToken(instanceID, tokenId) {
    const instance = this.getInstance(instanceID);

    instance.removeToken(tokenId);
  }

  updateLog(instanceID, elementId, tokenId, attributes) {
    const instance = this.getInstance(instanceID);

    instance.updateLog(elementId, tokenId, attributes);
  }

  updateVariables(instanceID, variables) {
    const instance = this.getInstance(instanceID);

    instance.updateVariables(variables);
  }

  getInstanceState(instanceID) {
    const instance = this.getInstance(instanceID);

    if (instance.isEnded()) {
      return 'ended';
    } else if (instance.isPaused()) {
      return 'paused';
    } else {
      return 'running';
    }

    // TODO: get state from instance
    // return instance.getState().instanceState;
    // -> returns array with states of all tokens
  }

  /**
   * Stops instance
   *
   * @param {string} instanceID id of the instance we want to stop
   */
  async stopInstance(instanceID) {
    const instance = this.getInstance(instanceID);

    if (!instance.isEnded()) {
      this._log.info({
        msg: `Stopping process instance. Id = ${instanceID}`,
        instanceId: instanceID,
      });

      const tokens = this.getAllInstanceTokens(instanceID);

      tokens.forEach((token) => {
        if (
          token.state === 'RUNNING' ||
          token.state === 'DEPLOYMENT-WAITING' ||
          token.state === 'READY'
        ) {
          instance.interruptToken(token.tokenId);
          instance.logExecution(token.currentFlowElementId, token.tokenId, {
            executionState: 'STOPPED',
            startTime: token.currentFlowElementStartTime,
            endTime: +new Date(),
            machine: this.machineInformation,
          });
        }
      });

      instance.stop();

      // archive the information for the stopped instance
      await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
      this.deleteInstance(instance.id);

      this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
    }
  }
  /**
   *
   * Stop every token of this instance due to unfulfilled constraints
   * @param {String} instanceID - ID of process instance
   * @param {Array} unfulfilledConstraints - List of unfulfilled constraints
   */
  async stopUnfulfilledInstance(instanceID, unfulfilledConstraints) {
    const instance = this.getInstance(instanceID);

    if (!instance.isEnded()) {
      this._log.info({
        msg: `Stopping process instance due to unfulfilled constraints. Id =${instanceID}`,
        instanceId: instanceID,
      });
    }

    const tokens = this.getAllInstanceTokens(instanceID);

    tokens.forEach((token) => {
      if (token.state === 'DEPLOYMENT-WAITING') {
        instance.interruptToken(token.tokenId); // will cancel shouldPassTokenHook
        instance.endToken(token.tokenId, {
          state: 'ERROR-CONSTRAINT-UNFULFILLED',
          errorMessage: `Instance stopped execution because of: ${unfulfilledConstraints.join(
            ', '
          )}`,
          endTime: +new Date(),
        });
        instance.updateLog(token.currentFlowElementId, token.tokenId, {
          machine: this.machineInformation,
        });
      }

      if (token.state === 'READY' || token.state === 'RUNNING') {
        instance.endToken(token.tokenId, {
          state: 'ERROR-CONSTRAINT-UNFULFILLED',
          errorMessage: `Instance stopped execution because of: ${unfulfilledConstraints.join(
            ', '
          )}`,
          endTime: +new Date(),
        });
        instance.updateLog(token.currentFlowElementId, token.tokenId, {
          machine: this.machineInformation,
        });
      }
    });

    await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
    this.deleteInstance(instance.id);

    this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
  }

  async abortInstance(
    instanceID,
    msg = `Aborting process instance due to signal from another machine. Id =${instanceID}`
  ) {
    const instance = this.getInstance(instanceID);

    if (!instance.isEnded()) {
      this._log.info({
        msg,
        instanceId: instanceID,
      });
    }

    const tokens = this.getAllInstanceTokens(instanceID);
    // abort all not-ended tokens on instance
    tokens.forEach((token) => {
      if (token.state === 'DEPLOYMENT-WAITING') {
        instance.interruptToken(token.tokenId); // will cancel shouldPassTokenHook
        instance.endToken(token.tokenId, { state: 'ABORTED', endTime: +new Date() });
      }

      if (token.state === 'READY' || token.state === 'RUNNING') {
        instance.endToken(token.tokenId, { state: 'ABORTED', endTime: +new Date() });
      }
    });

    // archive the information for the stopped instance
    await distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
    this.deleteInstance(instance.id);

    this.userTasks = this.userTasks.filter((userTask) => userTask.processInstance !== instance);
  }

  /**
   * Pauses an instance
   *
   * @param {string} instanceID id of the instance we want to pause
   */
  async pauseInstance(instanceID) {
    const instance = this.getInstance(instanceID);
    if (!instance.isEnded() && !instance.isPaused()) {
      instance.updateProcessStatus('PAUSING');
      this._pausing = true; // set pausing true to prevent further transition (in shouldPassTokenHook) of currently running tokens
      this._log.info({
        msg: `Pausing process instance. Id = ${instanceID}`,
        instanceId: instanceID,
      });
      const tokens = this.getAllInstanceTokens(instanceID);

      let tokensRunning = false;
      // pause flowNode execution of tokens with state READY and DEPLOYMENT-WAITING
      tokens.forEach((token) => {
        if (token.state === 'DEPLOYMENT-WAITING' || token.state === 'READY') {
          instance.pauseToken(token.tokenId);
          this.updateToken(instanceID, token.tokenId, { state: 'PAUSED' });
        }
        if (token.state === 'RUNNING') {
          tokensRunning = true;
        }
      });

      // wait for running tokens to end execution before setting instance state to PAUSED
      return new Promise((resolve, reject) => {
        if (!tokensRunning) {
          instance.pause();
          resolve();
        }

        instance.onInstanceStateChange((newInstanceState) => {
          const instanceInactive = newInstanceState.find(
            (tokenState) => tokenState === 'STOPPED' || tokenState === 'PAUSED'
          );
          if (instanceInactive) {
            reject();
          } else {
            tokensRunning = newInstanceState.find((tokenstate) => tokenstate === 'RUNNING');
            if (!tokensRunning) {
              resolve();
            }
          }
        });
      })
        .then(() => {
          instance.pause();
          distribution.db.archiveInstance(this.definitionId, instance.id, instance.getState());
          this.deleteInstance(instance.id);
        })
        .catch(() => {});
    }
  }

  getUserTasks() {
    let callActivityUserTasks = Object.values(this.callActivityExecutors).flatMap((engine) =>
      engine.getUserTasks()
    );

    callActivityUserTasks = callActivityUserTasks.map((userTask) => ({
      ...userTask,
      processChain: `${this.processID}|${userTask.processChain}`,
    }));

    return this.userTasks.concat(callActivityUserTasks);
  }

  getMilestones(instanceID, userTaskID) {
    const userTask = this.userTasks.find(
      (uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
    );

    const token = this.getToken(instanceID, userTask.tokenId);

    return token.milestones || {};
  }
  updateMilestones(instanceID, userTaskID, milestones) {
    const userTask = this.userTasks.find(
      (uT) => uT.processInstance.id === instanceID && uT.id === userTaskID
    );

    const token = this.getToken(instanceID, userTask.tokenId);
    const newMilestones = { ...token.milestones, ...milestones };

    userTask.processInstance.updateToken(token.tokenId, {
      milestones: newMilestones,
    });

    const currentProgress = Object.values(newMilestones).reduce(
      (prev, curr, _, array) => (prev + curr) / array.length
    );
    this.setFlowNodeProgress(instanceID, token.tokenId, currentProgress);
  }

  setFlowNodeState(instanceId, tokenId, state, variables) {
    const instance = this.getInstance(instanceId);
    const token = this.getToken(instanceId, tokenId);
    const activityId = token.currentFlowElementId;
    switch (state) {
      case 'ACTIVE':
      case 'EXTERNAL':
        instance.setFlowNodeState(tokenId, 'EXTERNAL');
        break;
      case 'COMPLETED':
        instance.completeActivity(activityId, tokenId, variables);
        break;
      case 'TERMINATED':
        instance.terminateActivity(activityId, tokenId);
        break;
      case 'FAILED':
        instance.failActivity(activityId, tokenId);
        break;
      default:
        throw new Error('Invalid state');
    }
  }
}

module.exports = Engine;