justonedb / kafka-sink-pg-json

Kafka sink connector for streaming messages to PostgreSQL
MIT License
91 stars 31 forks source link

Connector Error #9

Open rob-01 opened 7 years ago

rob-01 commented 7 years ago

Hello, I'm getting the following error and don't know how to proceed. My setup: kafka 3.2.1, zookeeper 3.2.1, both of which are running fine. Any ideas would be much appreciated! Thanks!

Error Output:

[2017-05-17 23:47:13,987] INFO StandaloneConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    bootstrap.servers = [localhost:29092]
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = /tmp/connect.offsets
    rest.advertised.host.name = null
    rest.advertised.port = null
    rest.host.name = null
    rest.port = 8083
    task.shutdown.graceful.timeout.ms = 5000
    value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-05-17 23:47:14,617] INFO Logging initialized @1287ms (org.eclipse.jetty.util.log:186)
[2017-05-17 23:47:15,200] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-05-17 23:47:15,201] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
[2017-05-17 23:47:15,201] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-05-17 23:47:15,202] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-05-17 23:47:15,394] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-05-17 23:47:15,395] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2017-05-17 23:47:15,395] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-05-17 23:47:16,235] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
May 17, 2017 11:47:21 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-05-17 23:47:21,376] INFO Started o.e.j.s.ServletContextHandler@349c1daf{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-05-17 23:47:21,408] INFO Started ServerConnector@2b370e4b{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-05-17 23:47:21,412] INFO Started @8085ms (org.eclipse.jetty.server.Server:379)
[2017-05-17 23:47:21,415] INFO REST server listening at http://25.0.0.49:8083/, advertising URL http://25.0.0.49:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-05-17 23:47:21,415] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
**[2017-05-17 23:47:21,425] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
    at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)**
[2017-05-17 23:47:21,488] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66)
[2017-05-17 23:47:21,489] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-05-17 23:47:21,525] INFO Stopped ServerConnector@2b370e4b{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-05-17 23:47:21,559] INFO Stopped o.e.j.s.ServletContextHandler@349c1daf{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2017-05-17 23:47:21,570] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
[2017-05-17 23:47:21,572] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77)
[2017-05-17 23:47:21,572] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:131)
[2017-05-17 23:47:21,573] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)
[2017-05-17 23:47:21,573] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:151)
[2017-05-17 23:47:26,644] INFO Reflections took 10991 ms to scan 96 urls, producing 3823 keys and 30812 values  (org.reflections.Reflections:229)
[2017-05-17 23:47:26,711] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:87)
[2017-05-17 23:47:26,711] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:71)

CONFIG

justone-kafka-sink-pg-json-connector.properties

# Name of the connector (do not change)
#
name=justone-kafka-sink-pg-json
#
# Connector class (do not change)
#
connector.class=com.justone.kafka.sink.pg.json.PostgreSQLSinkConnector
#
# Number of tasks to be assigned to the connector (mandatory)
#
tasks.max=1
#
# Topics to consume from (mandatory)
#
topics=test-topic
#
# Server address/name hosting the database (optional - default is localhost)
#
db.host=postgresql.cloud66.local
#
# Name of the database to connect to (mandatory)
#
db.database=testdb
#
# Name of the user to connect to the database with (mandatory)
#
db.username=user2390493
#
# Password to use for user authentication (optional - default is none)
#
db.password=PLACEHOLDER_PASSWORD
#
# Schema of the table (mandatory)
#
db.schema=schema1
#
# Table to receive rows (mandatory)
#
db.table=table1
#
# Comma separated list of columns to receive json elements (mandatory)
#
db.columns=item1,item2
#
# Comma separated list of parse paths to retrieve json elements by (mandatory)
#
db.json.parse=/@item1,/@item2
#
# Type of delivery (mandatory). Must be one of fastest, guaranteed, synchronized
#
db.delivery=synchronized
#
# Buffer size (bytes) used to cache writes
#
db.buffer.size=8000000
#

justone-kafka-sink-pg-json-standalone.properties

bootstrap.servers=sys-analytics-kafka.cloud66.local:29092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
#
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
#
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
#
# Location of of Kafka offsets
#
offset.storage.file.filename=/tmp/connect.offsets
amimimor commented 7 years ago

I'm getting this too. Any fix coming? @rob-01 did you hack it somehow?

liangrui1988 commented 7 years ago

` private static final ConfigDef CONFIG_DEF = new ConfigDef() .define("db.host", Type.STRING, Importance.LOW, "The db.host to publish data to");

@Override
public ConfigDef config() {
    return CONFIG_DEF;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
    ConfigDef configDef = config();
    List<ConfigValue> configValues = configDef.validate(connectorConfigs);
    return new Config(configValues);
}`

connect-api 版本问题, pom.xml 修改 version 0.11.0.0 ,然后实现上面的代码就可以了

agouz commented 7 years ago

To explain the above comment, the newer version of kafka requires the "PostgreSQLSinkConnector" to override "config", so solve the problem update the class by add the confg() and validate() operations as shown in the previous comment.

you don't need the ".define("db.host", Type.STRING, Importance.LOW, "The db.host to publish data to");" bit though.

thanks @liangrui1988 for your comment 👍