22Acacia / angled-dream

PubSub -> Cloud Dataflow Pipeline Composition
Apache License 2.0
0 stars 2 forks source link

Pass in more than on pub/sub input topic #12

Closed viktort closed 7 years ago

viktort commented 7 years ago

Hi chaps, @LusciousPear @coffeepac

Not sure this works of the box atm?

Have defined two input topics for a pipeline but seems to fail in angled-dream Main.java as it looks like it reads from only one topic.

ss]\nSLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\nSLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]\nMar 20, 2017 11:45:18 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions\nINFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 2 files. Enable logging at DEBUG level to see which files will be staged.\nException in thread \"main\" java.lang.IllegalArgumentException: Illegal Pubsub object name specified: server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine Please see Javadoc for naming rules.\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO.validatePubsubName(PubsubIO.java:135)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO.access$400(PubsubIO.java:74)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$PubsubTopic.fromPath(PubsubIO.java:376)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound.topic(PubsubIO.java:667)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$Read.topic(PubsubIO.java:461)\n\tat com.acacia.angleddream.Main.main(Main.java:137)\n" and stdout of: "Jar file hashes: 8a45e4c3a0bf5c76ef58501ce2901544,2ff5804f6d1b0d1d0c758564e28fe681\n"

I can alter the code to fix this but incase I have missed something, let me know :).

All the best,

Viktor

LusciousPear commented 7 years ago

It should definitely read from multiple topics, it looks like they were not fully qualified in the command? server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine

viktort commented 7 years ago

Hey @LusciousPear

So in terraform plan:

+ googlecli_dataflow.prod-responsys-pipe
    class:                              "com.acacia.angleddream.Main"
    classpath:                          "/usr/local/lib/angleddream-bundled-0.4-ALPHA.jar:/usr/local/lib/responsyspipeline-bundled-0.2-ALPHA.jar"
    job_states.#:                       "<computed>"
    jobids.#:                           "<computed>"
    name:                               "prod-responsys-pipe"
    optional_args.%:                    "12"
    optional_args.autoscalingAlgorithm: "THROUGHPUT_BASED"
    optional_args.containerDeps:        "foooooobarrrrrr.com"
    optional_args.errorPipelineName:    "projects/ha-proj/topics/prod-responsys-pipe-error-out"
    optional_args.maxNumWorkers:        "1"
    optional_args.numWorkers:           "1"
    optional_args.outputTopics:         "projects/ha-proj/topics/prod-responsys-pipe-out"
    optional_args.pipelineName:         "prod-responsys-pipe"
    optional_args.pubsubTopic:          "projects/ha-proj/topics/server_ecommerce,projects/ha-proj/topics/hxtrialorionappengine"
    optional_args.stagingLocation:      "gs://hx-trial/staging-eu"
    optional_args.stringHashes:         "2ff5804f6d1b......"
    optional_args.workerMachineType:    "n1-standard-1"
    optional_args.zone:                 "europe-west1-c"
    resource_hashes.#:                  "1"
    resource_hashes.0:                  "2ff5804f6............"

I thought that is printing out a subset of it

This is what I have in a test config.clj

 :pipelines {"prod-responsys-pipe"
             {:container-deps ["foooo"]
              :transform-jar "responsyspipeline-bundled-0.2-ALPHA.jar"
              :pail "ha-proj-build-artifacts"
              :key "responsys-pipeline"
              :workerMachineType "n1-standard-1"
              :error-out true}}
 :sources {"haprojappengine" {:type "gce"}}
 :sinks {"new-orion-identity-bucket" {:type "gcs" :bucket "new-orion-identity-bucket" :batch_size 250 :replicas 3}
         "prod-responsys-bucket" {:type "gcs" :bucket "prod-responsys-bucket" :batch_size 250 :replicas 2}
         "prod-responsys-sink" {:type "gcs" :sink_type "responsys" :rsys_user "******" :rsys_pass "********" :rsys_table "LAST_SEARCH_INFO" :merge_insert true :error-out true :batch_size 500 :replicas 2}}
 :edges [{:origin "haprojappengine" :targets ["prod-responsys-pipe" "new-orion-identity-bucket" "bigquery-sink-test"]}
         {:origin "server_ecommerce" :targets ["prod-responsys-pipe"]}
         {:origin "prod-responsys-pipe" :targets ["prod-responsys-bucket" "prod-responsys-sink"]}]}
viktort commented 7 years ago

Hey @LusciousPear - how can I define multiple topics for a pipeline? Does the topic need to be defined as a source like haprojappengine above?

If a topic exists outside of this config, can it not be defined in the edges section such as it is above :point_up: ?

LusciousPear commented 7 years ago

It should just be another edge, I'll check the sample config. It should all "just work". I tested the scenario explicitly.

Sent from my iPhone

On Mar 20, 2017, at 7:39 AM, Viktor Trako notifications@github.com wrote:

Hi chaps, @LusciousPear @coffeepac

Not sure this works of the box atm?

Have defined two input topics for a pipeline but seems to fail in angled-dream Main.java as it looks like it reads from only one topic.

ss]\nSLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\nSLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]\nMar 20, 2017 11:45:18 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions\nINFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 2 files. Enable logging at DEBUG level to see which files will be staged.\nException in thread \"main\" java.lang.IllegalArgumentException: Illegal Pubsub object name specified: server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine Please see Javadoc for naming rules.\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO.validatePubsubName(PubsubIO.java:135)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO.access$400(PubsubIO.java:74)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$PubsubTopic.fromPath(PubsubIO.java:376)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound.topic(PubsubIO.java:667)\n\tat com.google.cloud.dataflow.sdk.io.PubsubIO$Read.topic(PubsubIO.java:461)\n\tat com.acacia.angleddream.Main.main(Main.java:137)\n" and stdout of: "Jar file hashes: 8a45e4c3a0bf5c76ef58501ce2901544,2ff5804f6d1b0d1d0c758564e28fe681\n" I can alter the code to fix this but incase I have missed something, let me know :).

All the best,

Viktor

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

viktort commented 7 years ago

Is there a bug here @LusciousPear https://github.com/22Acacia/sossity/blob/master/src/sossity/core.clj#L279 ?

LusciousPear commented 7 years ago

Nope, that line simply puts a comma between every item in the list and then turns it into a single string.

On Tue, Mar 21, 2017 at 4:54 AM, Viktor Trako notifications@github.com wrote:

Is there a bug here @LusciousPear https://github.com/LusciousPear https://github.com/22Acacia/sossity/blob/master/src/sossity/core.clj#L279 ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/22Acacia/angled-dream/issues/12#issuecomment-288028315, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKzlk1VgF7tNeATIHXgNlf4A4jUx2qvks5rn55BgaJpZM4MiX3c .

-- Bradford Stephens roboticprofit.com Data for Driving Revenue

viktort commented 7 years ago
java -classpath ../angleddream-bundled-0.ALPHA.jar:target/responsyspipeline-bundled-0.2-ALPHA.jar com.acacia.angleddream.Main --stagingLocation=gs://hx-trial/staging-eu --project=hx-trial --pubsubTopics=projects/hx-trial/topics/server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine --outputTopics=projects/hx-trial/topics/prod-responsys-pipe-out --maxNumWorkers=1 --numWorkers=1 --zone=europe-west1-c --workerMachineType=n1-standard-1 --errorPipelineName=projects/hx-trial/topics/prod-responsys-pipe-error-out
HX2049T1:responsys-pipeline viktor.trako$ java -classpath ../hx-sossity-and-angled-dream/angled-dream/target/angleddream-bundled-0.2-ALPHA.jar:target/responsyspipeline-bundled-0.2-ALPHA.jar com.acacia.angleddream.Main --stagingLocation=gs://hx-trial/staging-eu --project=hx-trial --pubsubTopic=projects/hx-trial/topics/server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine --outputTopics=projects/hx-trial/topics/prod-responsys-pipe-out --maxNumWorkers=1 --numWorkers=1 --zone=europe-west1-c --workerMachineType=n1-standard-1 --errorPipelineName=projects/hx-trial/topics/prod-responsys-pipe-error-out
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/viktor.trako/developer/github/holidayextras/googleCloudDataPipelining/hx-sossity-and-angled-dream/angled-dream/target/angleddream-bundled-0.2-ALPHA.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/viktor.trako/developer/github/holidayextras/googleCloudDataPipelining/responsys-pipeline/target/responsyspipeline-bundled-0.2-ALPHA.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
Jar file hashes: no-hashes
Mar 21, 2017 3:36:45 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 2 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalArgumentException: Illegal Pubsub object name specified: server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine Please see Javadoc for naming rules.
    at com.google.cloud.dataflow.sdk.io.PubsubIO.validatePubsubName(PubsubIO.java:135)
    at com.google.cloud.dataflow.sdk.io.PubsubIO.access$400(PubsubIO.java:74)
    at com.google.cloud.dataflow.sdk.io.PubsubIO$PubsubTopic.fromPath(PubsubIO.java:376)
    at com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound.topic(PubsubIO.java:667)
    at com.google.cloud.dataflow.sdk.io.PubsubIO$Read.topic(PubsubIO.java:461)
    at com.acacia.angleddream.Main.main(Main.java:137)
LusciousPear commented 7 years ago

Interesting, I wonder if the google format changed...

On Tue, Mar 21, 2017 at 10:40 AM, Viktor Trako notifications@github.com wrote:

java -classpath ../hx-sossity-and-angled-dream/angled-dream/target/angleddream-bundled-0.ALPHA.jar:target/responsyspipeline-bundled-0.2-ALPHA.jar com.acacia.angleddream.Main --stagingLocation=gs://hx-trial/staging-eu --project=hx-trial --pubsubTopics=projects/hx-trial/topics/server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine --outputTopics=projects/hx-trial/topics/prod-responsys-pipe-out --maxNumWorkers=1 --numWorkers=1 --zone=europe-west1-c --workerMachineType=n1-standard-1 --errorPipelineName=projects/hx-trial/topics/prod-responsys-pipe-error-out HX2049T1:responsys-pipeline viktor.trako$ java -classpath ../hx-sossity-and-angled-dream/angled-dream/target/angleddream-bundled-0.2-ALPHA.jar:target/responsyspipeline-bundled-0.2-ALPHA.jar com.acacia.angleddream.Main --stagingLocation=gs://hx-trial/staging-eu --project=hx-trial --pubsubTopic=projects/hx-trial/topics/server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine --outputTopics=projects/hx-trial/topics/prod-responsys-pipe-out --maxNumWorkers=1 --numWorkers=1 --zone=europe-west1-c --workerMachineType=n1-standard-1 --errorPipelineName=projects/hx-trial/topics/prod-responsys-pipe-error-out SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/viktor.trako/developer/github/holidayextras/googleCloudDataPipelining/hx-sossity-and-angled-dream/angled-dream/target/angleddream-bundled-0.2-ALPHA.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/viktor.trako/developer/github/holidayextras/googleCloudDataPipelining/responsys-pipeline/target/responsyspipeline-bundled-0.2-ALPHA.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory] Jar file hashes: no-hashes Mar 21, 2017 3:36:45 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 2 files. Enable logging at DEBUG level to see which files will be staged. Exception in thread "main" java.lang.IllegalArgumentException: Illegal Pubsub object name specified: server_ecommerce,projects/hx-trial/topics/hxtrialorionappengine Please see Javadoc for naming rules. at com.google.cloud.dataflow.sdk.io.PubsubIO.validatePubsubName(PubsubIO.java:135) at com.google.cloud.dataflow.sdk.io.PubsubIO.access$400(PubsubIO.java:74) at com.google.cloud.dataflow.sdk.io.PubsubIO$PubsubTopic.fromPath(PubsubIO.java:376) at com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound.topic(PubsubIO.java:667) at com.google.cloud.dataflow.sdk.io.PubsubIO$Read.topic(PubsubIO.java:461) at com.acacia.angleddream.Main.main(Main.java:137)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/22Acacia/angled-dream/issues/12#issuecomment-288119749, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKzlgtAluwqW64l1fh7IicOgE2zZVRlks5rn--AgaJpZM4MiX3c .

-- Bradford Stephens roboticprofit.com Data for Driving Revenue

viktort commented 7 years ago

good point

viktort commented 7 years ago

@LusciousPear - seems to work with the pull request I opened #13 Need to test it a little further as seeing some errors on Dataflow logs

viktort commented 7 years ago

Does not like multiple pubsub topics passed in

LusciousPear commented 7 years ago

Dataflow should definitely work with multiple topics.

On Tue, Mar 21, 2017 at 11:46 AM, Viktor Trako notifications@github.com wrote:

Does not like multiple pubsub topics passed in

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/22Acacia/angled-dream/issues/12#issuecomment-288141584, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKzlpvV7r51i-Abgz_G9fVgndH_AHN0ks5rn_7tgaJpZM4MiX3c .

-- Bradford Stephens roboticprofit.com Data for Driving Revenue

viktort commented 7 years ago

@LusciousPear - google support have come back with the same result. Tried out my changes to angled-dream in pr #13 and it appears to be working. Be great if you can take a look and feedback?

All the best, Viktor

viktort commented 7 years ago

I think this is resolved, potentially. #14 and #13