Source: engine/universal/core/src/engine/hookCallbacks.js

const NeoEngine = require('neo-bpmn-engine');
const distribution = require('@proceed/distribution');
const {
  getProcessIds,
  getDefinitionsAndProcessIdForEveryCallActivity,
} = require('@proceed/bpmn-helper');
const { abortInstanceOnNetwork } = require('./processForwarding.js');

/**
 * Creates a callback function that can be used to handle calls from the log stream of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 */
function getLogHandler(engine, instance) {
  return (bpmnLog) => {
    engine._log.log({
      level: NeoEngine.LogLevel[bpmnLog.level].toLowerCase(),
      msg: bpmnLog.message,
      moduleName: 'BPMN-ENGINE',
      instanceId: instance.id,
    });
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onEnded hook of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Function} onEnded function that is supposed to be called when instance ends
 * @param {Object} instance the process instance that ended
 */
function getOnEndedHandler(engine, onEnded, instance) {
  return () => {
    engine._log.info({
      msg: `Process instance ended. Id = ${instance.id}`,
      instanceId: instance.id,
    });

    // archive the information for the finalized instance
    if (!engine.importDefinitionId) {
      distribution.db.archiveInstance(engine.definitionId, instance.id, instance.getState());
    }

    if (typeof onEnded === 'function') {
      onEnded(instance);
    }
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onAborted hook of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Object} instance the process instance that ended
 */
function getOnAbortedHandler(engine, instance) {
  return () => {
    engine._log.info({
      msg: `Process instance aborted. Id = ${instance.id}`,
      instanceId: instance.id,
    });
    engine._log.info({
      msg: `Broadcasting instance abort signal into network. Id = ${instance.id}`,
      instanceId: instance.id,
    });
    abortInstanceOnNetwork(engine.definitionId, instance.id);
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onTokenEnded hook of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Function} onTokenEnded function that is supposed to be called when the execution of a token ends
 * @param {Object} instance the process instance the token is in
 */
function getOnTokenEndedHandler(engine, onTokenEnded, instance) {
  return (token) => {
    engine._log.info({
      msg: `Token with id ${token.tokenId} ended. InstanceId = ${instance.id} `,
      instanceId: instance.id,
    });

    if (typeof onTokenEnded === 'function') {
      onTokenEnded(token);
    }
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onScriptTaskError hook of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Object} instance the process instance the token is in
 */
function getOnScriptTaskErrorHandler(engine, instance) {
  return (execution) => {
    // engine._log.info({
    //   msg: `Technical Error in Script Task with id ${execution.flowElementId} on token ${execution.tokenId}. InstanceId = ${instance.id} `,
    //   instanceId: instance.id,
    // });
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onUserTaskInterrupted hook of the neo engine
 *
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Object} instance the process instance the token is in
 */
function getOnUserTaskInterruptedHandler(engine, instance) {
  return (execution) => {
    engine._log.info({
      msg: `User Task with id ${execution.flowElementId} on token ${execution.tokenId} ended. InstanceId = ${instance.id} `,
      instanceId: instance.id,
    });
  };
}

/**
 * Creates a callback function that can be used to handle calls from the onCallActivityInterrupted hook of the neo engine
 * -> stop execution of call activity processs
 * @param {Object} engine proceed engine instance that contains the process information
 * @param {Object} instance the process instance the token is in
 */
function getOnCallActivityInterruptedHandler(engine, instance) {
  return async (execution) => {
    const callActivityId = execution.flowElementId;
    const callActivityDefinitionIdMapping = await getDefinitionsAndProcessIdForEveryCallActivity(
      engine._bpmn
    );
    const callActivityDefinitionId = callActivityDefinitionIdMapping[callActivityId].definitionId;
    const importBPMN = await distribution.db.getImportedProcess(
      engine.definitionId,
      callActivityDefinitionId
    );
    const [importProcessId] = await getProcessIds(importBPMN);

    const CAExecutionEngine = engine.callActivityExecutors[importProcessId];
    if (CAExecutionEngine) {
      CAExecutionEngine.instanceIDs.forEach((instanceID) => {
        CAExecutionEngine.stopInstance(instanceID);
      });
    }
  };
}

module.exports = {
  /**
   * Returns a callBack function that is used for the instance stream of the neo engine
   * this callBack registers callBack functions for the different lifecycle hooks of a newly created process
   *
   * @param {Object} engine proceed engine instance that contains the process information
   * @param {Class} Engine the class we use to store information about a specific process and its instances
   * @param {Object} preexistingInstance an optional object containing information about an instance we want to continue on this machine
   * @param {Function} onStarted a callback function that is supposed to be called when a new instance starts
   * @param {Function} onEnded a callback function that is supposed to be called when an instance ends
   * @param {Function} onTokenEnded a callback function that is supposed to be called a token inside an instance reaches a finished state
   */
  getNewInstanceHandler(engine, preexistingInstance, onStarted, onEnded, onTokenEnded) {
    return (newInstance) => {
      if (!preexistingInstance) {
        // we are starting a new instance
        engine._log.info({
          msg: `A new process instance was created. Id = ${newInstance.id}`,
          instanceId: newInstance.id,
        });
        engine.instanceIDs.push(newInstance.id);
      } else {
        engine._log.info({
          msg: `Process instance started. Id = ${newInstance.id}`,
          instanceId: newInstance.id,
        });
        // we are starting a new local instance to continue an instance started on another machine
        engine.instanceIDs.push(preexistingInstance.processInstanceId);
      }

      newInstance.getLog$().subscribe(getLogHandler(engine, newInstance)); // subscribe to log-stream of bpmn processinstance

      // Set up lifecycle listeners
      if (typeof onStarted === 'function') {
        onStarted(newInstance);
      }

      newInstance.onEnded(getOnEndedHandler(engine, onEnded, newInstance));

      newInstance.onScriptTaskError(getOnScriptTaskErrorHandler(engine, newInstance));

      newInstance.onAborted(getOnAbortedHandler(engine, newInstance));

      newInstance.onUserTaskInterrupted(getOnUserTaskInterruptedHandler(engine, newInstance));

      newInstance.onCallActivityInterrupted(
        getOnCallActivityInterruptedHandler(engine, newInstance)
      );

      newInstance.onTokenEnded(getOnTokenEndedHandler(engine, onTokenEnded, newInstance));

      newInstance.onFlowNodeExecuted((execution) => {
        const token = engine.getToken(newInstance.id, execution.tokenId);
        // move information about milestones to log and delete from token
        if (token) {
          if (token.milestones) {
            newInstance.updateLog(execution.flowElementId, execution.tokenId, {
              milestones: token.milestones,
            });

            newInstance.updateToken(execution.tokenId, { milestones: undefined });
          }
          if (token.currentFlowNodeIsExternal) {
            newInstance.updateLog(execution.flowElementId, execution.tokenId, {
              external: true,
            });
            newInstance.updateToken(execution.tokenId, { currentFlowNodeIsExternal: undefined });
          }

          const flowElement = newInstance.getFlowElement(execution.flowElementId);
          if (flowElement && flowElement.$type === 'bpmn:UserTask') {
            // remove user task from list
            const index = engine.userTasks.findIndex(
              (uT) => uT.processInstance.id === newInstance.id && uT.id === flowElement.id
            );

            if (index > -1) {
              engine.userTasks.splice(index, 1);
            }
          }
        }

        if (!execution.machine) {
          newInstance.updateLog(execution.flowElementId, execution.tokenId, {
            machine: engine.machineInformation,
          });
          engine._log.info({
            msg: `Finished execution of flowNode ${execution.flowElementId}. InstanceId = ${newInstance.id}`,
            instanceId: newInstance.id,
          });
        }
      });

      newInstance.onInstanceStateChange((instanceState) => {
        //instanceState = array of token states
        const instanceEnded = instanceState.every(
          (s) =>
            s === 'ENDED' ||
            s === 'FAILED' ||
            s === 'TERMINATED' ||
            s === 'ABORTED' ||
            s === 'ERROR-TECHNICAL' ||
            s === 'ERROR-SEMANTIC' ||
            s === 'FORWARDED' ||
            s === 'ERROR-CONSTRAINT-UNFULFILLED'
        );

        if (instanceEnded) {
          // TODO: save instance data, delete instance
        }
      });
    };
  },
};