keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

[ERROR - org.keedio.flume.source.SQLSourceHelper.getStatusFileIndex(SQLSourceHelper.java:231)] #50

Open randidwiputra opened 6 years ago

randidwiputra commented 6 years ago
Exception reading status file, doing back up and creating new status file
Unexpected exception at position -1: null
    at org.keedio.flume.source.SQLSourceHelper.checkJsonValues(SQLSourceHelper.java:244)
    at org.keedio.flume.source.SQLSourceHelper.getStatusFileIndex(SQLSourceHelper.java:227)
    at org.keedio.flume.source.SQLSourceHelper.<init>(SQLSourceHelper.java:105)
    at org.keedio.flume.source.SQLSource.configure(SQLSource.java:66)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
lazaromedina commented 6 years ago

Hi randidwiputra, exception seems to be launched where trying to parse the json, on checkJsonValues() with a log.error information trace that helps to identify the problem, but i cant see it i your copy paste error. So please:

best, Luis

randidwiputra commented 6 years ago

Hi Luis

Sorry for late reply. I'm following this tutorial https://www.toadworld.com/platforms/oracle/w/wiki/11121.collecting-indexing-and-searching-mysql-database-table-data-in-apache-solr

flume-mysql.conf**

`agent.sources=sql-source agent.sinks=sink1 agent.channels=ch1 agent.channels.ch1.type=memory

agent.sources.sql-source.type=org.apache.flume.source.SQLSource

agent.sources.sql-source.type=org.keedio.flume.source.SQLSource agent.sources.sql-source.channels=ch1

agent.sources.sql-source.connection.url=jdbc:mysql://localhost:3306/wlslog

Database connection properties

agent.sources.sql-source.user=root agent.sources.sql-source.password=12345 agent.sources.sql-source.table=wlslog agent.sources.sql-source.database=wlslog

agent.sources.sql-source.columns.to.select =*

Increment column properties

agent.sources.sql-source.incremental.column.name=id

Increment value is from you want to start taking data from tables (0 will import entire table)

agent.sources.sql-source.incremental.value=0

Query delay, each configured milisecond the query will be sent

agent.sources.sql-source.run.query.delay=10000

Status file is used to save last readed row

agent.sources.sql-source.status.file.path=/var/lib/flume agent.sources.sql-source.status.file.name=sql-source.status

agent.sinks.sink1.morphlineId=morphline1 agent.sinks.sink1.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent.sinks.sink1.channel=ch1 agent.sinks.sink1.morphlineFile=/opt/flume/conf/morphlines.conf

agent.sinks.sink1.batchSize=1 agent.sinks.sink1.batchDurationMillis=10000

agent.channels.ch1.capacity=100000 `

`SOLR_LOCATOR : {

collection : collection1 solrUrl : "http://localhost:8983/solr/" solrHomeDir: "/opt/solr/example/solr/collection1" } morphlines : [ { id : morphline1 importCommands : ["com.cloudera.", "org.apache.solr.", "org.kitesdk.**"] commands : [ { readLine { charset : UTF-8

    }
  }
  {
      generateUUID {
          field : id
      }
  }
  {
    sanitizeUnknownSolrFields {
      solrLocator : ${SOLR_LOCATOR}
    }
  } 

  { logDebug { format : "output record: {}", args : ["@{}"] } }   
       {
    loadSolr {
      solrLocator : ${SOLR_LOCATOR}
    }
  }
]

} ]`

sql-source.status jdbc:mysql://localhost:3306/test wlslog id 0

sql-source.status.bak.1518407398120 {"LastIndex":"5"}

Thanks for your attention Luis

lazaromedina commented 6 years ago

Hi randidwiputra,

agent.sinks = k1
agent.channels = ch1
agent.sources = sql1

# For each one of the sources, the type is defined
agent.sources.sql1.type = org.keedio.flume.source.SQLSource
agent.sources.sql1.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/testdb

# Hibernate Database connection properties
agent.sources.sql1.hibernate.connection.user = root
agent.sources.sql1.hibernate.connection.password = root
agent.sources.sql1.hibernate.connection.autocommit = true
agent.sources.sql1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.sql1.hibernate.connection.driver_class =  com.mysql.jdbc.Driver

agent.sources.sql1.table = customers
# Columns to import (default * import entire row)
agent.sources.sql1.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
agent.sources.sql1.run.query.delay=10000

# Status file is used to save last readed row
agent.sources.sql1.status.file.path = /var/log/sqlflume/flume-out
agent.sources.sql1.status.file.name = sql1.status

agent.sources.sql1.batch.size = 1000
agent.sources.sql1.delimiter.entry = ~
agent.sources.sql1.enclose.by.quotes = false

agent.sources.sql1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sql1.hibernate.c3p0.min_size=1
agent.sources.sql1.hibernate.c3p0.max_size=10

# The channel can be defined as follows.
agent.sources.sql1.channels = ch1

agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log
agent.sinks.k1.sink.rollInterval = 7200

agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000

agent.sources.sql1.channels = ch1

agent.sinks.k1.channel = ch1
randidwiputra commented 6 years ago

Hi Luis

Thanks for your reply.

My Flume version is apache-flume-1.4.0-bin. My Solr version solr-4.10.3. MySql Connector mysql-connector-java-5.1.45.

this is a basic configuration file for flume-sql sinking to file. Maybe it can help you reduce the complexity of your config file in your use case:

Thanks, i'll try. Yes, i just try sink mysql -> flume -> solr . But, i found error when flume agent running.

Thanks Sir