marklogic-community / marklogic-state-conductor

An event-based state-machine engine for manipulating MarkLogic database documents.
Other
1 stars 4 forks source link

Implement DHF run flow action that utilizes the defined SourceQuery #153

Closed aclavio closed 3 years ago

aclavio commented 3 years ago

Implement an action module that runs a DHF step utilizing the step's defined SourceQuery instead of the passed in URI.

This will allow steps to be run in a similar fashion as when run with the gradle hubRunFlow command.

aclavio commented 3 years ago

Sample implementation:

const DataHub = require('/data-hub/5/datahub.sjs');
const datahub = new DataHub();

const sc = require('/state-conductor/state-conductor.sjs');

function getContentDescriptorArray(flowName, stepNumber, options) {
  let flow = datahub.flow.getFlow(flowName);
  let stepRef = flow.steps[stepNumber];
  let stepDetails = datahub.flow.step.getStepByNameAndType(
    stepRef.stepDefinitionName,
    stepRef.stepDefinitionType
  );
  let flowOptions = flow.options || {};
  let stepRefOptions = stepRef.options || {};
  let stepDetailsOptions = stepDetails.options || {};
  let combinedOptions = Object.assign({}, stepDetailsOptions, flowOptions, stepRefOptions, options);
  let sourceDatabase = combinedOptions.sourceDatabase || datahub.flow.globalContext.sourceDatabase;
  let sourceQuery = combinedOptions.sourceQuery || flow.sourceQuery;
  let query = sourceQuery ? cts.query(fn.head(xdmp.eval(sourceQuery)).toObject()) : null;
  return datahub.hubUtils.queryToContentDescriptorArray(query, combinedOptions, sourceDatabase);
}

function performAction(uri, options = {}, context = {}) {
  // find the dhf flow and step to execute
  const step = options.step || null;
  const flowName = options.flowName || null;
  const flowOptions = options.flowOptions || {};
  const flowContext = options.flowContext || {};

  flowOptions.stateConductorContext = context;

  // setup the dhf runFlow content - utilize the source query - not the passed in uri
  const contentObjs = getContentDescriptorArray(flowName, step, flowOptions);

  xdmp.trace(
    sc.TRACE_EVENT,
    Sequence.from([
      'Execute DHF flow using Source Query:',
      '  records:     ' + contentObjs.length,
      '  flowName:    ' + flowName,
      '  step:        ' + step,
      '  flowOptions: ' + flowOptions,
      '  flowContext: ' + flowContext,
    ])
  );

  // execute the dhf flow step
  // utilizing an invoke to avoid locking on the batched documents
  let flowResponse;
  xdmp.invokeFunction(() => {
    flowResponse = datahub.flow.runFlow(flowName, null, contentObjs, flowOptions, step);
  });

  if (flowResponse.errors && flowResponse.errors.length) {
    datahub.debug.log(flowResponse.errors[0]);
    fn.error(null, flowResponse.errors[0].message, flowResponse.errors[0].stack);
  }

  context[flowName] = context[flowName] || {};
  context[flowName]['' + step] = flowResponse;

  return context;
}

exports.performAction = performAction;