justonedb / kafka-sink-pg-json

Kafka sink connector for streaming messages to PostgreSQL
MIT License
91 stars 31 forks source link

Error creating .start/.flush/.state/.drop functions #16

Open servomac opened 7 years ago

servomac commented 7 years ago

During the installation of the connector, I get some NOTICE logs and the sql executions is not capable of creating the functions:

# PGPASSWORD=$DB_PASS psql -h $DB_HOST -U $DB_USER -c '\i /opt/sql/install-justone-kafka-sink-pg-1.0.sql' $DB_NAME
psql:/opt/sql/install-justone-kafka-sink-pg-1.0.sql:68: ERROR:  schema "$justone$kafka$connect$sink" does not exist
CREATE SCHEMA
psql:/opt/sql/install-justone-kafka-sink-pg-1.0.sql:81: NOTICE:  function $justone$kafka$connect$sink.start(pg_catalog.varchar,pg_catalog.varchar) does not exist, skipping
DROP FUNCTION
CREATE FUNCTION
psql:/opt/sql/install-justone-kafka-sink-pg-1.0.sql:109: NOTICE:  function $justone$kafka$connect$sink.flush(pg_catalog.varchar,pg_catalog.varchar,pg_catalog.varchar[],pg_catalog.int4[],pg_catalog.int8[]) does not exist, skipping
DROP FUNCTION
CREATE FUNCTION
psql:/opt/sql/install-justone-kafka-sink-pg-1.0.sql:150: NOTICE:  function $justone$kafka$connect$sink.state(pg_catalog.varchar,pg_catalog.varchar,pg_catalog.varchar[],pg_catalog.int4[],pg_catalog.int8[]) does not exist, skipping
DROP FUNCTION
CREATE FUNCTION
psql:/opt/sql/install-justone-kafka-sink-pg-1.0.sql:171: NOTICE:  function $justone$kafka$connect$sink.drop(pg_catalog.varchar,pg_catalog.varchar) does not exist, skipping
DROP FUNCTION
CREATE FUNCTION
GRANT
GRANT

And the functions actually do not exist:

# PGPASSWORD=$DB_PASS psql -h $DB_HOST -U $DB_USER $DB_NAME -c '\df'
                       List of functions
 Schema | Name | Result data type | Argument data types | Type 
--------+------+------------------+---------------------+------
(0 rows)

When running the standalone connector, I get the following error:

[2017-10-28 18:55:06,258] INFO Sink connector configuration: 
    db.host:apidb
    db.database:api
    db.username:api
    db.password:mysecretpassword
    db.schema:public
    db.table:user
    db.columns:id,name
    db.json.parse:/@user/@id,/@user/@name
    db.buffer.size:8000000
    db.delivery:synchronized
 (com.justone.kafka.sink.pg.json.PostgreSQLSinkTask:217)
[2017-10-28 18:55:06,330] ERROR Thread WorkerSinkTask-justone-kafka-sink-pg-json-0 exiting with uncaught exception:  (org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: syntax error at or near "user"
  Where: PL/pgSQL function "$justone$kafka$connect$sink".start(character varying,character varying) line 5 at EXECUTE
    at com.justone.kafka.sink.pg.json.PostgreSQLSinkTask.start(PostgreSQLSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndStart(WorkerSinkTask.java:154)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:54)
    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "user"
  Where: PL/pgSQL function "$justone$kafka$connect$sink".start(character varying,character varying) line 5 at EXECUTE
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:562)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:406)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:286)
    at com.justone.kafka.sink.pg.json.PostgreSQLSinkTask.start(PostgreSQLSinkTask.java:266)
    ... 3 more

I'm using postgres 10.0, but I have the same problem with postgres 9.3.

cmorent commented 6 years ago

Any update on this issue? I've got the exact same issue, did you find a workaround? Thanks !

EDIT 1: While investigating, I finally found my created functions by running \df $justone$kafka$connect$sink.* instead of simply use \df and it works properly.