Open kymr opened 6 years ago
dependencies {
compile project(':conductor-common')
compile "io.reactivex:rxjava:${revRxJava}"
compile "com.google.inject:guice:${revGuice}"
compile "com.google.inject.extensions:guice-multibindings:${revGuiceMultiBindings}"
compile "com.netflix.servo:servo-core:${revServo}"
compile "com.netflix.spectator:spectator-api:${revSpectator}"
compile "com.fasterxml.jackson.core:jackson-databind:${revJacksonDatabind}"
compile "com.fasterxml.jackson.core:jackson-core:${revJacksonCore}"
compile "com.jayway.jsonpath:json-path:${revJsonPath}"
compile "org.apache.commons:commons-lang3:${revCommonsLang3}"
compile "com.spotify:completable-futures:${revSpotifyCompletableFutures}"
testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}"
}
@Override
protected void configure() {
install(MultibindingsScanner.asModule());
requestStaticInjection(EventQueues.class);
bind(ActionProcessor.class).asEagerSingleton();
bind(EventProcessor.class).asEagerSingleton();
bind(SystemTaskWorkerCoordinator.class).asEagerSingleton();
bind(SubWorkflow.class).asEagerSingleton();
bind(Wait.class).asEagerSingleton();
bind(Event.class).asEagerSingleton();
}
Weird comments.
/**
* @author Viren
* Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start workflow etc)
* <p><b>Warning:</b> This is a work in progress and may be changed in future. Not ready for production yet.
*/
Injection
@Inject
public ActionProcessor(WorkflowExecutor executor, MetadataService metadataService) {
this.executor = executor;
this.metadataService = metadataService;
}
execute
logger.debug("Executing action: {} for event: {} with messageId:{}", action.getAction(), event, messageId);
Object jsonObject = payloadObject;
if (action.isExpandInlineJSON()) {
jsonObject = jsonUtils.expand(payloadObject);
}
switch (action.getAction()) {
case start_workflow:
return startWorkflow(action, jsonObject, event, messageId);
case complete_task:
return completeTask(action, jsonObject, action.getComplete_task(), Status.COMPLETED, event, messageId);
case fail_task:
return completeTask(action, jsonObject, action.getFail_task(), Status.FAILED, event, messageId);
default:
break;
}
throw new UnsupportedOperationException("Action not supported " + action.getAction() + " for event " + event);
completeTask
Map<String, Object> input = new HashMap<>();
input.put("workflowId", taskDetails.getWorkflowId());
input.put("taskRefName", taskDetails.getTaskRefName());
input.putAll(taskDetails.getOutput());
Map<String, Object> replaced = parametersUtils.replace(input, payload);
String workflowId = "" + replaced.get("workflowId");
String taskRefName = "" + replaced.get("taskRefName");
Workflow found = executor.getWorkflow(workflowId, true);
if (found == null) {
replaced.put("error", "No workflow found with ID: " + workflowId);
return replaced;
}
Task task = found.getTaskByRefName(taskRefName);
if (task == null) {
replaced.put("error", "No task found with reference name: " + taskRefName + ", workflowId: " + workflowId);
return replaced;
}
task.setStatus(status);
task.setOutputData(replaced);
task.getOutputData().put("conductor.event.messageId", messageId);
task.getOutputData().put("conductor.event.name", event);
try {
executor.updateTask(new TaskResult(task));
} catch (RuntimeException e) {
logger.error("Error updating task: {} in workflow: {} in action: {} for event: {} for message: {}", taskDetails.getTaskRefName(), taskDetails.getWorkflowId(), action.getAction(), event, messageId, e);
replaced.put("error", e.getMessage());
throw e;
}
return replaced;
startWorkflow (private)
StartWorkflow params = action.getStart_workflow();
Map<String, Object> output = new HashMap<>();
try {
WorkflowDef def = metadataService.getWorkflowDef(params.getName(), params.getVersion());
Map<String, Object> inputParams = params.getInput();
Map<String, Object> workflowInput = parametersUtils.replace(inputParams, payload);
workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);
String id = executor.startWorkflow(def.getName(), def.getVersion(), params.getCorrelationId(), workflowInput, event);
output.put("workflowId", id);
} catch (RuntimeException e) {
logger.error("Error starting workflow: {}, version: {}, for event: {} for message: {}", params.getName(), params.getVersion(), event, messageId, e);
output.put("error", e.getMessage());
throw e;
}
return output;
member variables
private static final int RETRY_COUNT = 3;
private ExecutorService executorService;
private final Map<String, ObservableQueue> eventToQueueMap = new ConcurrentHashMap<>();
Injection
@Inject
public EventProcessor(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, Configuration config) {
this.executionService = executionService;
this.metadataService = metadataService;
this.actionProcessor = actionProcessor;
int executorThreadCount = config.getIntProperty("workflow.event.processor.thread.count", 2);
if (executorThreadCount > 0) {
executorService = Executors.newFixedThreadPool(executorThreadCount);
refresh();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS);
} else {
logger.warn("Event processing is DISABLED. executorThreadCount set to {}", executorThreadCount);
}
}
refresh
private void refresh() {
try {
Set<String> events = metadataService.getEventHandlers().stream()
.map(EventHandler::getEvent)
.collect(Collectors.toSet());
List<ObservableQueue> createdQueues = new LinkedList<>();
events.forEach(event -> eventToQueueMap.computeIfAbsent(event, s -> {
ObservableQueue q = EventQueues.getQueue(event);
createdQueues.add(q);
return q;
}
));
// start listening on all of the created queues
createdQueues.stream()
.filter(Objects::nonNull)
.forEach(this::listen);
} catch (Exception e) {
Monitors.error(className, "refresh");
logger.error("refresh event queues failed", e);
}
}
handle messages
@SuppressWarnings({"unchecked", "ToArrayCallWithZeroLengthArrayArgument"})
private void handle(ObservableQueue queue, Message msg) {
try {
executionService.addMessage(queue.getName(), msg);
String event = queue.getType() + ":" + queue.getName();
logger.debug("Evaluating message: {} for event: {}", msg.getId(), event);
List<EventExecution> transientFailures = executeEvent(event, msg);
if (transientFailures.isEmpty()) {
queue.ack(Collections.singletonList(msg));
} else if (queue.rePublishIfNoAck()) {
// re-submit this message to the queue, to be retried later
// This is needed for queues with no unack timeout, since messages are removed from the queue
queue.publish(Collections.singletonList(msg));
}
} catch (Exception e) {
logger.error("Error handling message: {} on queue:{}", msg, queue.getName(), e);
}
}
executeEvent
private List<EventExecution> executeEvent(String event, Message msg) throws Exception {
List<EventHandler> eventHandlerList = metadataService.getEventHandlersForEvent(event, true);
Object payloadObject = getPayloadObject(msg.getPayload());
List<EventExecution> transientFailures = new ArrayList<>();
for (EventHandler eventHandler : eventHandlerList) {
String condition = eventHandler.getCondition();
if (StringUtils.isNotEmpty(condition)) {
logger.debug("Checking condition: {} for event: {}", condition, event);
Boolean success = ScriptEvaluator.evalBool(condition, jsonUtils.expand(payloadObject));
if (!success) {
String id = msg.getId() + "_" + 0;
EventExecution eventExecution = new EventExecution(id, msg.getId());
eventExecution.setCreated(System.currentTimeMillis());
eventExecution.setEvent(eventHandler.getEvent());
eventExecution.setName(eventHandler.getName());
eventExecution.setStatus(Status.SKIPPED);
eventExecution.getOutput().put("msg", msg.getPayload());
eventExecution.getOutput().put("condition", condition);
executionService.addEventExecution(eventExecution);
logger.debug("Condition: {} not successful for event: {} with payload: {}", condition, eventHandler.getEvent(), msg.getPayload());
continue;
}
}
CompletableFuture<List<EventExecution>> future = executeActionsForEventHandler(eventHandler, msg);
future.whenComplete((result, error) -> result.forEach(eventExecution -> {
if (error != null || eventExecution.getStatus() == Status.IN_PROGRESS) {
executionService.removeEventExecution(eventExecution);
transientFailures.add(eventExecution);
} else {
executionService.updateEventExecution(eventExecution);
}
})).get();
}
return transientFailures;
}
Boolean success = ScriptEvaluator.evalBool(condition, jsonUtils.expand(payloadObject));
executeActionsForEventHandler
/**
* @param eventHandler the {@link EventHandler} for which the actions are to be executed
* @param msg the {@link Message} that triggered the event
* @return a {@link CompletableFuture} holding a list of {@link EventExecution}s for the {@link Action}s executed in the event handler
*/
private CompletableFuture<List<EventExecution>> executeActionsForEventHandler(EventHandler eventHandler, Message msg) {
List<CompletableFuture<EventExecution>> futuresList = new ArrayList<>();
int i = 0;
for (Action action : eventHandler.getActions()) {
String id = msg.getId() + "_" + i++;
EventExecution eventExecution = new EventExecution(id, msg.getId());
eventExecution.setCreated(System.currentTimeMillis());
eventExecution.setEvent(eventHandler.getEvent());
eventExecution.setName(eventHandler.getName());
eventExecution.setAction(action.getAction());
eventExecution.setStatus(Status.IN_PROGRESS);
if (executionService.addEventExecution(eventExecution)) {
futuresList.add(CompletableFuture.supplyAsync(() -> execute(eventExecution, action, getPayloadObject(msg.getPayload())), executorService));
} else {
logger.warn("Duplicate delivery/execution of message: {}", msg.getId());
}
}
return CompletableFutures.allAsList(futuresList);
}
execute
/**
* @param eventExecution the instance of {@link EventExecution}
* @param action the {@link Action} to be executed for the event
* @param payload the {@link Message#payload}
* @return the event execution updated with execution output, if the execution is completed/failed with non-transient error
* the input event execution, if the execution failed due to transient error
*/
@SuppressWarnings("Guava")
@VisibleForTesting
EventExecution execute(EventExecution eventExecution, Action action, Object payload) {
try {
String methodName = "executeEventAction";
String description = String.format("Executing action: %s for event: %s with messageId: %s with payload: %s", action.getAction(), eventExecution.getId(), eventExecution.getMessageId(), payload);
logger.debug(description);
Map<String, Object> output = new RetryUtil<Map<String, Object>>().retryOnException(() -> actionProcessor.execute(action, payload, eventExecution.getEvent(), eventExecution.getMessageId()),
this::isTransientException, null, RETRY_COUNT, description, methodName);
if (output != null) {
eventExecution.getOutput().putAll(output);
}
eventExecution.setStatus(Status.COMPLETED);
} catch (RuntimeException e) {
logger.error("Error executing action: {} for event: {} with messageId: {}", action.getAction(), eventExecution.getEvent(), eventExecution.getMessageId(), e);
if (!isTransientException(e.getCause())) {
// not a transient error, fail the event execution
eventExecution.setStatus(Status.FAILED);
eventExecution.getOutput().put("exception", e.getMessage());
}
}
return eventExecution;
}
public static ObservableQueue getQueue(String eventType) {
String event = parametersUtils.replace(eventType).toString();
int index = event.indexOf(':');
if (index == -1) {
logger.error("Queue cannot be configured for illegal event: {}", event);
throw new IllegalArgumentException("Illegal event " + event);
}
String type = event.substring(0, index);
String queueURI = event.substring(index + 1);
EventQueueProvider provider = providers.get(type);
if (provider != null) {
return provider.getQueue(queueURI);
} else {
logger.error("Queue {} is not configured for event:{}", type, eventType);
throw new IllegalArgumentException("Unknown queue type " + type);
}
}
This class evaluate java script expressions dynamically.
public class ScriptEvaluator {
private static ScriptEngine engine = new ScriptEngineManager().getEngineByName("nashorn");
private ScriptEvaluator(){
}
public static Boolean evalBool(String script, Object input) throws ScriptException {
Object ret = eval(script, input);
if(ret instanceof Boolean) {
return ((Boolean)ret);
}else if(ret instanceof Number) {
return ((Number)ret).doubleValue() > 0;
}
return false;
}
public static Object eval(String script, Object input) throws ScriptException {
Bindings bindings = engine.createBindings();
bindings.put("$", input);
return engine.eval(script, bindings);
}
}
private String payload;
private String id;
private String receipt;
private final Map<String, ObservableQueue> queues = new ConcurrentHashMap<>();
private final QueueDAO queueDAO;
private final Configuration config;
@Inject
public DynoEventQueueProvider(QueueDAO queueDAO, Configuration config) {
this.queueDAO = queueDAO;
this.config = config;
}
@Override
public ObservableQueue getQueue(String queueURI) {
return queues.computeIfAbsent(queueURI, q -> new DynoObservableQueue(queueURI, queueDAO, config));
}
member variables
private static final String QUEUE_TYPE = "conductor";
private final String queueName;
private final QueueDAO queueDAO;
private final int pollTimeInMS;
private final int longPollTimeout;
private final int pollCount;
constructor
@Inject
DynoObservableQueue(String queueName, QueueDAO queueDAO, Configuration config) {
this.queueName = queueName;
this.queueDAO = queueDAO;
this.pollTimeInMS = config.getIntProperty("workflow.dyno.queues.pollingInterval", 100);
this.pollCount = config.getIntProperty("workflow.dyno.queues.pollCount", 10);
this.longPollTimeout = config.getIntProperty("workflow.dyno.queues.longPollTimeout", 1000);
}
observe
private OnSubscribe<Message> getOnSubscribe() {
return subscriber -> {
Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
List<Message> msgs = receiveMessages();
return Observable.from(msgs);
}).subscribe(subscriber::onNext, subscriber::onError);
};
}
receiveMessages
private List<Message> receiveMessages() {
try {
List<Message> messages = queueDAO.pollMessages(queueName, pollCount, longPollTimeout);
Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, queueName, messages.size());
return messages;
} catch (Exception exception) {
logger.error("Exception while getting messages from queueDAO", exception);
Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
}
return new ArrayList<>();
}
내용 내용 내용 내용 내용
Summary
Dependency