Islandora / documentation

Contains islandora's documentation and main issue queue.
MIT License
104 stars 71 forks source link

Create a camel 'Pipeline' #449

Closed dannylamb closed 7 years ago

dannylamb commented 7 years ago

Similar to #448

Create a small camel route that sends a message through a dynamic list of recipients, with the output of each being passed as input to the next. This can be accomplished using the Routing Slip and Request / Reply patterns. The list should be read from a header, which you can assume comes with the message.

Check out this gist for a POC of this concept.

dannylamb commented 7 years ago

FYI, this is how we solve the "what derivatives go where" conundrum for everybody. Making a derivative would become a list of operations, and the user could configure the last one to ingest into Fedora, Drupal, or both. We're gonna get some insane flexibility from this.

whikloj commented 7 years ago

Couple confirmation questions:

  1. Java DSL or Spring DSL?
  2. Gradle
  3. Final list of routes supplied as a header, which header?
ajs6f commented 7 years ago

This seems like it's entirely overlapping with fcrepo-camel-toolbox. Is there a danger of duplicated effort here? If it's a question of synchronous workflow as opposed to asynchronous, @acoburn and I have already begun exploring thoughts about synchronous repository-driven workflows, but using the standardized repo services and not anything new.

whikloj commented 7 years ago

@ajs6f I think this is far too simple to be part of fcrepo-camel-toolbox. This would be used to have end-users in Drupal decide which endpoints (out of a curated list) to send a request to. Those endpoints could be fcrepo-camel-toolbox, but more likely the API-X derivative service @acoburn is (apparently) working on.

ajs6f commented 7 years ago

I'm not asking about this exact piece of technology-- I'm asking about its uses. IOW, it's hard for me to imagine a use for this piece of work that wouldn't be better served as part of -camel-toolbox, but maybe I am misunderstanding the expected kinds of requests it would receive. Is the work here not going to fall under the general heading of "make and store derivatives"?

dannylamb commented 7 years ago

@whikloj

  1. Java
  2. Gradle
  3. With the broadcaster we used IslandoraBroadcastRecipients, so IslandoraPipelineRecipients?

@ajs6f Yes, the end result here could be used to create and store derivatives, or any other sequence of well defined asynchronous operations. The long term goal for me (post MVP) is to provide config entities per asynchronous operation with nice descriptions and labels. That way we can work with them in a UI so repository admins can control their asynchronous workflows outside of karaf config files.

If you think this core piece would be better suited for fcrepo-camel-toolbox, that's a conversation I'll gladly have.

whikloj commented 7 years ago

@ajs6f Oh I see what you mean, yes I guess this could fit in fcrepo-camel-toolbox. We could have the header name be configurable. What does @acoburn think of that...or is there maybe a feature already in existence?

ajs6f commented 7 years ago

Yeah, that's where I was going, and to be clear, I'm not claiming that this is a sure thing for -camel-toolbox, just that it might be and we should have the conversation to which @dannylamb refers lest anyone do unneedful work. Not enough hours in the day to let that happen.

dannylamb commented 7 years ago

@ajs6f At the end of the day, this is nothing more than a question of where do we deposit the work. I'm not expecting the greater community to do that work for us.

ajs6f commented 7 years ago

Sure, and where you deposit the work has a significant impact on how expensive it is and its future maintenance costs. This ticket might be developing new Camel gear, or it might be merely configuring Camel gear the maintenance of which is a shared expense. It's worth asking that question. I think we might easily overestimating the novelty of this need.

dannylamb commented 7 years ago

@ajs6f Fair enough.

ajs6f commented 7 years ago

And having shoved my neck out, I'll bring the axe down by taking this ticket through the exploratory stage that I just vociferously demanded. I'll take responsibility for talking with @acoburn and determining what, if any, we can get done as part of the larger -camel-toolbox context and recording that (with any design notes) here. I don't mean by that to slow anyone down or confuse any scheduling or logistics, and if that seems inappropriate, I can take my hands off and leave it be.

ajs6f commented 7 years ago

I just don't want to basically come in here and say @whikloj, I demand that you work this ticket in the way that I think is best. That's not cool.

acoburn commented 7 years ago

Let me preface this by saying that I haven't thought this through in any sort of rigorous manner, but let me argue the other side for now, namely that such a pipeline should not go into fcrepo-camel-toolbox.

ajs6f commented 7 years ago

@acoburn, this all makes sense-- but it's exactly your third point/recommendation that is what I am suggesting here. I'm suggesting that either we find the generic pattern in -camel-toolbox or we offer it as a contribution to -camel-toolbox of the ilk of fcrepo-ldpath. (Like fcrepo-simple-chained-configurable-workflow or... maybe a better name.) Then the CLAW-specific stuff (e.g. dealing with Drupal) would stay CLAW-side, but some non-CLAW site might use the basic recipe for another purpose (e.g. maybe for dropping preservation copies into some other store or that sort of thing).

Does that makes sense to you, @acoburn, or am I missing your point? Thanks for checking in with this discussion!

acoburn commented 7 years ago

@ajs6f if your suggestion here follows what I describe in the third point, then, yes, I would completely support that.

ajs6f commented 7 years ago

@acoburn I think that it does. The crucial question is: can we extract the generic pattern in this ticket as a contribution to -camel-toolkit? I believe that we can. I suspect that it is something like:

"Configure a chained series of simple (i.e. one Camel endpoint) steps, each feeding the next. This pipeline is fed by resources that change in the repository as they change, in the usual -camel-toolkit way. The output of the last step goes to some configurable Camel endpoint, by default back into the repository to some location related to the location of the original resource in some simple, well-known way."

@dannylamb @whikloj Does that sound like it captures a generic form of what you are trying to do here? Now that I read it back, it sounds almost like a generic version of several of the -camel-toolbox recipes. I certainly am not trying to drag us down the road of building a generic workflow engine, but I do want to spot any architectural wins we pass on whatever road on which we are all traveling together.

dannylamb commented 7 years ago

To go through @acoburn's points:

@ajs6f You should also see https://github.com/Islandora-CLAW/Alpaca/tree/master/islandora-connector-broadcast and https://gitlab.amherst.edu/acdc/repository-extension-services/tree/master/acrepo-connector-broadcast, which are pretty similar and differ in the manner described above.

dannylamb commented 7 years ago

@ajs6f To more directly answer the question you posed, that is pretty much exactly the long-term goal, except there's folks in the islandora audience who will want the derivative stored just in drupal, and not in fedora. That usually comes from people with large binaries like video.

So that's a curveball from our end. But then again, if we can just layer in our step at the end, or swap something out, etc...

acoburn commented 7 years ago

Here's a suggestion for moving forward, which is pretty much what @bseeger and I do at Amherst:

  1. Implement whatever you need to implement yourself in a CLAW-based repo. Our corollary at Amherst is https://gitlab.amherst.edu/acdc/repository-extension-services
  2. If there is a nugget in that implementation that could be extracted into a more general bit of code, push that into fcrepo-camel-toolbox.

That has been the process for much of what is currently in fcrepo-camel-toolbox.

ajs6f commented 7 years ago

@acoburn Yes, that's the concrete MO. The reason I wanted to start this discussion earlier is to clarify whether there might be such a nugget. It sounds like there is, so whoever is doing this ticket should have that in mind. First things first, the functionality has to work for CLAW, but it would be good to keep the longer term in mind.

whikloj commented 7 years ago

I am going to un-assign myself from this. I think there are others that could benefit from this small well defined need for some Camel. @Natkeeran??

Also based on @acoburn's statement above, I think we go ahead and implement this for CLAW and if we discover some generalities we can open a discussion at that point.

Natkeeran commented 7 years ago

@dannylamb

I am trying out your POC.

While implementing it in Java DSL, I am getting the following exception. Any suggestion as to the causes:

package com.lab.hellocamel;

import java.io.File;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

public class App 
{

    public static final String URL        = "tcp://localhost:61616";

    public static void main( String[] args ) throws Exception
    {

        System.out.println( "Testing Apache Camel");

        Logger logger = Logger.getLogger(App.class);

        String log4JPropertyFile = "/home/nat/workspace/hellocamel/resources/log4j.properties";
        PropertyConfigurator.configure(log4JPropertyFile);

        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory( URL );

        CamelContext context = new DefaultCamelContext();
        context.addComponent("jms",
                JmsComponent.jmsComponentAutoAcknowledge(connFactory));

        context.addRoutes(new RouteBuilder() {

       public void configure() {
                from("timer:foo?period=1s")
                .setHeader("slip", simple("jms:queue:activemq/queue/testA,jms:queue:activemq/queue/testB"))
                .setBody(simple("Message222 at ${date:now:yyyy-MM-dd HH:mm:ss}"))
                .setExchangePattern(ExchangePattern.InOut)
                .log("FINISHED: ${body}")
                .routingSlip(header("slip"), ",")
                .to("jms:queue:activemq/queue/testA")
                .transform(simple("DEFP"))
                .to("jms:queue:activemq/queue/testB")
                .log("FROM A: ${body}")
                .transform(simple("HERP"));

            }
        });

        context.start();
        Thread.sleep(5000);
        context.stop();

    }
}
Testing Apache Camel
15:34| INFO | DefaultCamelContext.java 2418 | Apache Camel 2.15.1 (CamelContext: camel-1) is starting
15:34| INFO | ManagedManagementStrategy.java 187 | JMX is enabled
15:34| INFO | DefaultTypeConverter.java 56 | Loaded 183 type converters
15:34| INFO | DefaultCamelContext.java 2633 | AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
15:34| INFO | DefaultCamelContext.java 2643 | StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
15:34| INFO | DefaultCamelContext.java 3164 | Route: route1 started and consuming from: Endpoint[timer://foo?period=1s]
15:34| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started.
15:34| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel-1) started in 0.242 seconds
15:34| INFO | MarkerIgnoringBase.java 95 | FINISHED: Message222 at 2017-03-03 15:34:30
15:34| ERROR | MarkerIgnoringBase.java 159 | Failed delivery for (MessageId: ID-natubuntu-33252-1488573268679-0-1 on ExchangeId: ID-natubuntu-33252-1488573268679-0-2). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [timer://foo?period=1s                                                         ] [       196]
[route1            ] [setHeader1        ] [setHeader[slip]                                                               ] [         2]
[route1            ] [setBody1          ] [setBody[simple{Simple: Message222 at ${date:now:yyyy-MM-dd HH:mm:ss}}]        ] [         1]
[route1            ] [setExchangePattern] [setExchangePattern[InOut]                                                     ] [         0]
[route1            ] [log1              ] [log                                                                           ] [         1]
[route1            ] [routingSlip1      ] [routingSlip[header{header(slip)}]                                             ] [       190]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-natubuntu-33252-1488573268679-0-2
    ExchangePattern     InOut
    Headers             {breadcrumbId=ID-natubuntu-33252-1488573268679-0-1, CamelRedelivered=false, CamelRedeliveryCounter=0, firedTime=Fri Mar 03 15:34:30 EST 2017, JMSCorrelationID=Camel-ID-natubuntu-33252-1488573268679-0-3, slip=jms:queue:activemq/queue/testA,jms:queue:activemq/queue/testB}
    BodyType            String
    Body                Message222 at 2017-03-03 15:34:30
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]
    at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)
    at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)
    at org.apache.camel.processor.RoutingSlip$2.doInAsyncProducer(RoutingSlip.java:301)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:304)
    at org.apache.camel.processor.RoutingSlip.processExchange(RoutingSlip.java:294)
    at org.apache.camel.processor.RoutingSlip.doRoutingSlip(RoutingSlip.java:215)
    at org.apache.camel.processor.RoutingSlip.process(RoutingSlip.java:146)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:165)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:73)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.NoSuchMethodError: org.apache.camel.support.DefaultTimeoutMap.put(Ljava/lang/Object;Ljava/lang/Object;J)V
    at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.put(CorrelationTimeoutMap.java:86)
    at org.apache.camel.component.jms.reply.TemporaryQueueReplyManager.registerReply(TemporaryQueueReplyManager.java:64)
    at org.apache.camel.component.jms.JmsProducer$1.createMessage(JmsProducer.java:221)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:427)
    at org.apache.camel.component.jms.JmsProducer.processInOut(JmsProducer.java:233)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)
    ... 15 more
15:34| WARN | MarkerIgnoringBase.java 135 | Error processing exchange. Exchange[Message: Message222 at 2017-03-03 15:34:30]. Caused by: [org.apache.camel.CamelExecutionException - Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]]
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]
    at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)
    at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)
    at org.apache.camel.processor.RoutingSlip$2.doInAsyncProducer(RoutingSlip.java:301)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:304)
    at org.apache.camel.processor.RoutingSlip.processExchange(RoutingSlip.java:294)
    at org.apache.camel.processor.RoutingSlip.doRoutingSlip(RoutingSlip.java:215)
    at org.apache.camel.processor.RoutingSlip.process(RoutingSlip.java:146)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:165)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:73)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.NoSuchMethodError: org.apache.camel.support.DefaultTimeoutMap.put(Ljava/lang/Object;Ljava/lang/Object;J)V
    at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.put(CorrelationTimeoutMap.java:86)
    at org.apache.camel.component.jms.reply.TemporaryQueueReplyManager.registerReply(TemporaryQueueReplyManager.java:64)
    at org.apache.camel.component.jms.JmsProducer$1.createMessage(JmsProducer.java:221)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:427)
    at org.apache.camel.component.jms.JmsProducer.processInOut(JmsProducer.java:233)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)
    ... 15 more
whikloj commented 7 years ago

@Natkeeran Not sure if this is helpful at all, but here is a Java DSL camel route I route wrote. This is a WAR webapp though. https://github.com/uml-digitalinitiatives/derivative-route/blob/master/src/main/java/ca/umanitoba/dam/DerivativeRoute.java

dannylamb commented 7 years ago

@Natkeeran I'd try and set this up using blueprint within Alpaca and its build process. You may be down a sinkhole you wouldn't encounter in that context.

If you 'd like, I can help you get that set up with a feature and all.

dannylamb commented 7 years ago

And just after posting that I see the problem: .routingSlip(header("slip"), ",") should just be .routingSlip("slip", ",")

dannylamb commented 7 years ago

@Natkeeran Offer still stands if you want help getting that set up all the way with a feature. Just tag me.

dannylamb commented 7 years ago

@Natkeeran I've done some explorations after our last few rounds of discussion, and I think we can merge the broadcaster and pipeline code if we make the exchange pattern come from a header. Tag me or skype me if you want to discuss this.

Basically just take the work you've already done and replace the broadcaster with it, hacking out the ExchangePattern (InOnly, InOut) from a header.

dannylamb commented 7 years ago

Resolved with https://github.com/Islandora-CLAW/Alpaca/commit/558ef69ea9a9b166443f3764cec0671813d0f623

Additional changes to be made in a subsequent ticket.