kite-sdk / kite-examples

Kite SDK Examples
Apache License 2.0
99 stars 70 forks source link

Question about kite.repo.uri #27

Closed nanounanue closed 9 years ago

nanounanue commented 9 years ago

Hi everyone

I want to adapt the json example provided, but I got this error:

15/05/20 08:09:47 INFO conf.FlumeConfiguration: Processing:UFOKiteDS
15/05/20 08:09:47 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [UFOAgent]
15/05/20 08:09:47 INFO node.AbstractConfigurationProvider: Creating channels
15/05/20 08:09:47 INFO channel.DefaultChannelFactory: Creating instance of channel archivo type file
15/05/20 08:09:47 INFO node.AbstractConfigurationProvider: Created channel archivo
15/05/20 08:09:47 INFO source.DefaultSourceFactory: Creating instance of source UFODir, type spooldir
15/05/20 08:09:47 INFO interceptor.StaticInterceptor: Creating StaticInterceptor: preserveExisting=true,key=flume.avro.schema.url,value=file:/home/itam/schemas/ufos.avsc
15/05/20 08:09:47 INFO api.MorphlineContext: Importing commands
15/05/20 08:09:52 INFO api.MorphlineContext: Done importing commands
15/05/20 08:09:52 INFO sink.DefaultSinkFactory: Creating instance of sink: UFOKiteDS, type: org.apache.flume.sink.kite.DatasetSink
15/05/20 08:09:52 ERROR node.AbstractConfigurationProvider: Sink UFOKiteDS has been removed due to an error during configuration
java.lang.IllegalArgumentException
        at org.kitesdk.shaded.com.google.common.base.Preconditions.checkArgument(Preconditions.java:72)
        at org.kitesdk.data.URIBuilder.<init>(URIBuilder.java:106)
        at org.kitesdk.data.URIBuilder.<init>(URIBuilder.java:90)
        at org.apache.flume.sink.kite.DatasetSink.configure(DatasetSink.java:188)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/05/20 08:09:52 INFO node.AbstractConfigurationProvider: Channel archivo connected to [UFODir]
15/05/20 08:09:52 INFO node.Application: Starting new configuration:{ sourceRunners:{UFODir=EventDrivenSourceRunner: { source:Spool Directory source UFODir: { spoolDir: /opt/ufos } }} sinkRunners:{} channels:{arch
ivo=FileChannel archivo { dataDirs: [/opt/ufos/log/data] }} }
15/05/20 08:09:52 INFO node.Application: Starting Channel archivo
15/05/20 08:09:52 INFO file.FileChannel: Starting FileChannel archivo { dataDirs: [/opt/ufos/log/data] }...
15/05/20 08:09:52 INFO file.Log: Encryption is not enabled

I ran the flume-agent with:

flume-ng agent -n UFOAgent -Xmx100m --conf ingestion -f ingestion/spooldir_example.conf

The spooldir_example.conf is


# Componentes
UFOAgent.sources = UFODir
UFOAgent.channels = archivo
UFOAgent.sinks = UFOKiteDS

# Canal
UFOAgent.channels.archivo.type = file
UFOAgent.channels.archivo.checkpointDir = /opt/ufos/log/checkpoint/
UFOAgent.channels.archivo.dataDirs = /opt/ufos/log/data/

# Fuente
UFOAgent.sources.UFODir.type = spooldir
UFOAgent.sources.UFODir.channels = archivo
UFOAgent.sources.UFODir.spoolDir = /opt/ufos
UFOAgent.sources.UFODir.fileHeader = true
UFOAgent.sources.UFODir.deletePolicy = immediate

# Interceptor
UFOAgent.sources.UFODir.interceptors = attach-schema morphline

UFOAgent.sources.UFODir.interceptors.attach-schema.type = static
UFOAgent.sources.UFODir.interceptors.attach-schema.key = flume.avro.schema.url
UFOAgent.sources.UFODir.interceptors.attach-schema.value = file:/home/itam/schemas/ufos.avsc

UFOAgent.sources.UFODir.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
UFOAgent.sources.UFODir.interceptors.morphline.morphlineFile = /home/itam/ingestion/morphline.conf
UFOAgent.sources.UFODir.interceptors.morphline.morphlineId = convertUFOFileToAvro

# Sumidero
UFOAgent.sinks.UFOKiteDS.type = org.apache.flume.sink.kite.DatasetSink
UFOAgent.sinks.UFOKiteDS.channel = archivo
UFOAgent.sinks.UFOKiteDS.kite.repo.uri = dataset:hive
UFOAgent.sinks.UFOKiteDS.kite.dataset.name = ufos
UFOAgent.sinks.UFOKiteDS.kite.batchSize = 10

I created the dataset as follows:

kite-dataset create ufos --schema /home/itam/schemas/ufos.avsc --format avro

Finally, the morphline.conf is

morphlines: [                                                                                                                                                                                                                                  
  {                                                                                                                                                                                                                                            
    id: convertUFOFileToAvro                                                                                                                                                                                                                   
    importCommands: ["com.cloudera.**", "org.kitesdk.**" ]                                                                                                                                                                                     
    commands: [                                                                                                                                                                                                                                
      { tryRules {                                                                                                                                                                                                                             
    catchExceptions : false                                                                                                                                                                                                                    
    throwExceptionIfAllRulesFailed : true                                                                                                                                                                                                      
    rules : [                                                                                                                                                                                                                                  
   # next rule of tryRules cmd:                                                                                                                                                                                                                
   {                                                                                                                                                                                                                                           
     commands : [                                                                                                                                                                                                                              
     { readCSV: {                                                                                                                                                                                                                              
     separator : "\t"                                                                                                                                                                                                                          
     columns : [Timestamp, City, State, Shape, Duration, Summary, Posted]                                                                                                                                                                      
     trim: true                                                                                                                                                                                                                                
     charset : UTF-8                                                                                                                                                                                                                           
     quoteChar : "\""                                                                                                                                                                                                                          
     }                                                                                                                                                                                                                                         
    }                                                                                                                                                                                                                                          

    {                                                                                                                                                                                                                                          
     toAvro {                                                                                                                                                                                                                                  
      schemaFile: /home/itam/schemas/ufos.avsc                                                                                                                                                                                                 
     }                                                                                                                                                                                                                                         

    }                                                                                                                                                                                                                                          
    {                                                                                                                                                                                                                                          
     writeAvroToByteArray: {                                                                                                                                                                                                                   

      format: containerlessBinary                                                                                                                                                                                                              

     }                                                                                                                                                                                                                                         

    }                                                                                                                                                                                                                                          
   ]                                                                                                                                                                                                                                           
   }                                                                                                                                                                                                                                           
   # next rule of tryRules cmd:                                                                                                                                                                                                                
   {                                                                                                                                                                                                                                           
     commands : [                                                                                                                                                                                                                              
    { dropRecord {} }                                                                                                                                                                                                                          
     ]                                                                                                                                                                                                                                         
   }                                                                                                                                                                                                                                           

  ]                                                                                                                                                                                                                                            
    }                                                                                                                                                                                                                                          
  }                                                                                                                                                                                                                                            

  { logTrace { format : "output record: {}", args : ["@{}"] } }                                                                                                                                                                                
    ]                                                                                                                                                                                                                                          
  }                                                                                                                                                                                                                                            

]                                                                                                                                                                                                                                              

What I am doing wrong?

rdblue commented 9 years ago

Hi @nanounanue, this should be an easy fix. Kite used to expose a "repository" for datasets, which used URIs that started with "repo:". Then we added the dataset URI that incorporates that information, which is why your normal dataset URI contains pointers to how the dataset should be managed. In the shuffle, we added kite.dataset.uri to the Flume sink, but needed to keep kite.repo.uri and kite.dataset.name for backward-compatibility.

To fix your problem, you should switch to using kite.dataset.uri with your normal dataset URI. The error here, which I'll add a better message to, is that your repo URI starts with "dataset:" instead of "repo:". You can fix that as an alternative, but I suggest moving to setting the dataset URI and ignoring the repository stuff.

Thanks for using Kite!

rdblue commented 9 years ago

I've fixed the bug, CDK-1003 in https://github.com/kite-sdk/kite/commit/07da28e2. Is it okay if I close this?

nanounanue commented 9 years ago

@rdblue thank you for quick answer ...

One more question, in my case, which is my "normal dataset URI"?

Because I modify that line to:

UFOAgent.sinks.UFOKiteDS.kite.dataset.uri = dataset:hive

or

UFOAgent.sinks.UFOKiteDS.kite.dataset.uri = dataset:hive:ufos

and I am getting the following error:

15/05/20 21:06:58 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Error trying to open a new writer for dataset dataset:hive
        at org.apache.flume.sink.kite.DatasetSink.createWriter(DatasetSink.java:442)
        at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:282)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Dataset name cannot be null
        at org.kitesdk.shaded.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at org.kitesdk.data.spi.filesystem.FileSystemDatasetRepository.load(FileSystemDatasetRepository.java:188)
        at org.kitesdk.data.Datasets.load(Datasets.java:108)
        at org.kitesdk.data.Datasets.load(Datasets.java:140)
        at org.apache.flume.sink.kite.DatasetSink$1.run(DatasetSink.java:403)
        at org.apache.flume.sink.kite.DatasetSink$1.run(DatasetSink.java:400)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:55)
        at org.apache.flume.sink.kite.DatasetSink.createWriter(DatasetSink.java:399)
        ... 4 more
15/05/20 21:07:03 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Error trying to open a new writer for dataset dataset:hive
rdblue commented 9 years ago

@nanounanue, you want something like the second: dataset:hive:ufos. See the URI pattern docs.

nanounanue commented 9 years ago

@rdblue I did as you suggested and now, the stack trace error is slighty different:

15/05/20 21:57:53 WARN hive.MetaStoreUtil: Aborting use of local MetaStore. Allow local MetaStore by setting kite.hive.allow-local-metastore=true in HiveConf
15/05/20 21:57:53 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Error trying to open a new writer for dataset dataset:hive:ufos
        at org.apache.flume.sink.kite.DatasetSink.createWriter(DatasetSink.java:442)
        at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:282)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Missing Hive MetaStore connection URI
        at org.kitesdk.data.spi.hive.MetaStoreUtil.<init>(MetaStoreUtil.java:78)
        at org.kitesdk.data.spi.hive.HiveAbstractMetadataProvider.getMetaStoreUtil(HiveAbstractMetadataProvider.java:63)
        at org.kitesdk.data.spi.hive.HiveAbstractMetadataProvider.resolveNamespace(HiveAbstractMetadataProvider.java:270)
        at org.kitesdk.data.spi.hive.HiveAbstractMetadataProvider.resolveNamespace(HiveAbstractMetadataProvider.java:255)
        at org.kitesdk.data.spi.hive.HiveAbstractMetadataProvider.load(HiveAbstractMetadataProvider.java:102)
        at org.kitesdk.data.spi.filesystem.FileSystemDatasetRepository.load(FileSystemDatasetRepository.java:192)
        at org.kitesdk.data.Datasets.load(Datasets.java:108)
        at org.kitesdk.data.Datasets.load(Datasets.java:140)
        at org.apache.flume.sink.kite.DatasetSink$1.run(DatasetSink.java:403)
        at org.apache.flume.sink.kite.DatasetSink$1.run(DatasetSink.java:400)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:55)
        at org.apache.flume.sink.kite.DatasetSink.createWriter(DatasetSink.java:399)
        ... 4 more
15/05/20 21:57:58 WARN hive.MetaStoreUtil: Aborting use of local MetaStore. Allow local MetaStore by setting kite.hive.allow-local-metastore=true in HiveConf
...

Where I have to write that?

rdblue commented 9 years ago

Kite looks for the metastore URI in two places:

  1. In the environment configuration at JVM start-up (usually gets it from hive-site.xml)
  2. From the Hive URI. You can embed metastore info like this: dataset:hive://ms-host:port/dataset-name

The first option is preferred. We generally assume you're running with the environment configured to talk with your cluster.

nanounanue commented 9 years ago

I think that I have it configured in the correct way (otherwise the kite-dataset examples wouldn't work), here is the fragment of my hive-site.xml:

<property>                                                                                                                                                                                                           
  <name>javax.jdo.option.ConnectionURL</name>                                                                                                                                                                        
  <value>jdbc:mysql://localhost/metastore</value>                                                                                                                                                                    
</property>      

...

<property>                                                                                                                                                                                                           
  <name>hive.metastore.uris</name>                                                                                                                                                                                   
  <value>thrift://0.0.0.0:9083</value>                                                                                                                                                                               
  <description>IP address (or fully-qualified domain name) and port of the metastore host</description>                                                                                                              
</property>        

I am running in pseudodistributed mode, btw

nanounanue commented 9 years ago

If I use the second option that you gave me,:

UFOAgent.sinks.UFOKiteDS.kite.dataset.uri = dataset:hive://0.0.0.0:9083/ufos

everything works smoothly

But the question is, why isn't working the first one?

rdblue commented 9 years ago

The first one depends on how you're configuring the program where you're using the API. You need to have the configuration files in the classpath for them to be picked up automatically when calling new Configuration().

rdblue commented 9 years ago

@nanounanue, I don't know what I was thinking with my last response since you already said you're using Kite inside Flume. Oops. I don't think you should be required to set up the Flume classpath so it can see the Hive config. You should use the full dataset URI for now and hopefully the next version of CDH will fix this for you.

nanounanue commented 9 years ago

Thank you @rdblue !

rdblue commented 9 years ago

No problem, let us know if you have any more issues. I'm going to close this, since I think you're able to move on.