justonedb / kafka-sink-pg-json

Kafka sink connector for streaming messages to PostgreSQL
MIT License
91 stars 31 forks source link

Exception when running twice 'No current assignment for partition xxx-0' #2

Open killfill opened 8 years ago

killfill commented 8 years ago

Hi there!

Was trying to use sink-pg-json, and noticed something.

When i run the connect in standalone mode, with the pretty much the default options:

name=prueba-connector-postgresql-standalone_v3
connector.class=com.justone.kafka.sink.pg.json.PostgreSQLSinkConnector
tasks.max=1
topics=hola
db.host=localhost
db.database=kafka
db.username=postgres
db.schema=web
db.table=event
db.columns=hola
db.json.parse=/@hola
db.buffer.size=1
db.delivery=synchronized

It works well the first time:

INFO Created connector prueba-connector-postgresql-standalone_v3 (org.apache.kafka.connect.cli.ConnectStandalone:91)
INFO Sink task WorkerSinkTask{id=prueba-connector-postgresql-standalone_v3-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)

INFO Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group connect-prueba-connector-postgresql-standalone_v3. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
INFO Revoking previously assigned partitions [] for group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
INFO (Re-)joining group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
INFO Successfully joined group connect-prueba-connector-postgresql-standalone_v3 with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
INFO Setting newly assigned partitions [hola-0] for group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)

But then, when Ctrl+C and run exactly the same again, i see the following message:

INFO Created connector prueba-connector-postgresql-standalone_v3 (org.apache.kafka.connect.cli.ConnectStandalone:91)
INFO Sink task WorkerSinkTask{id=prueba-connector-postgresql-standalone_v3-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
ERROR Task prueba-connector-postgresql-standalone_v3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.IllegalStateException: No current assignment for partition hola-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:405)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:214)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

The Exception comes from here

So, when i delete the state in the database like this:

DELETE from "$justone$kafka$connect$sink"."web.event";

Then, it runs smoothly again.

Im pretty new to kafka and kafka-connect, there is a high probability im doing something wrong.

Is it something im missing or is this an expected behaviour?

Thanks!!

duncanpauly commented 8 years ago

Hi Philip,

Thanks for the note. It's probably a bug. I will take a look when I am back at work (next week).

Thanks Duncan

On 11 Aug 2016 1:05 a.m., "Phillip Neumann" notifications@github.com wrote:

Hi there!

Was trying to use sink-pg-json, and noticed something.

When i run the connect in standalone mode, with the pretty much the default options:

name=prueba-connector-postgresql-standalone_v3 connector.class=com.justone.kafka.sink.pg.json.PostgreSQLSinkConnector tasks.max=1 topics=hola db.host=localhost db.database=kafka db.username=postgres db.schema=web db.table=event db.columns=hola db.json.parse=/@hola db.buffer.size=1 db.delivery=synchronized

It works well the first time:

INFO Created connector prueba-connector-postgresql-standalone_v3 (org.apache.kafka.connect.cli.ConnectStandalone:91) INFO Sink task WorkerSinkTask{id=prueba-connector-postgresql-standalone_v3-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)

INFO Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group connect-prueba-connector-postgresql-standalone_v3. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505) INFO Revoking previously assigned partitions [] for group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280) INFO (Re-)joining group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326) INFO Successfully joined group connect-prueba-connector-postgresql-standalone_v3 with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434) INFO Setting newly assigned partitions [hola-0] for group connect-prueba-connector-postgresql-standalone_v3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)

But then, when Ctrl+C and run exactly the same again, i see the following message:

INFO Created connector prueba-connector-postgresql-standalone_v3 (org.apache.kafka.connect.cli.ConnectStandalone:91) INFO Sink task WorkerSinkTask{id=prueba-connector-postgresql-standalone_v3-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208) ERROR Task prueba-connector-postgresql-standalone_v3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.IllegalStateException: No current assignment for partition hola-0 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:405) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:214) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

The Exception comes from here https://www.codatlas.com/github.com/apache/kafka/0.10.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java?line=142

So, when i delete the state in the database like this:

DELETE from "$justone$kafka$connect$sink"."web.event";

Then, it runs smoothly again.

Im pretty new to kafka and kafka-connect, there is a high probability im doing something wrong.

Is it something im missing or is this an expected behaviour?

Thanks!!

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/justonedb/kafka-sink-pg-json/issues/2, or mute the thread https://github.com/notifications/unsubscribe-auth/AA-W475-skZaXgdgYAegELL_RQ2gzeGVks5qemdOgaJpZM4JhsgZ .

graphaelli commented 8 years ago

5 up for this

mashannon168 commented 7 years ago

Hi,

I have the same issue, but i dont have this table

"$justone$kafka$connect$sink"."web.event";

I only one temp table in this schema (the copy from myschema.mytable).

What other workaround can i do?

Thanks Shannon

duncanpauly commented 7 years ago

Hi Shannon,

The state table "$justone$kafka$connect$sink"."web.event" is only created if db.delivery is set to "synchronized" in the connector properties file. Is it set to "synchronized"?

If you can send me your property files and table schema I can take a look and see if I can re-create the problem.

Thanks Duncan

On Wed, Oct 26, 2016 at 10:48 PM, Shannon Ma notifications@github.com wrote:

Hi,

I have the same issue, but i dont have this table

"$justone$kafka$connect$sink"."web.event";

I only one temp table in this schema (the copy from myschema.mytable).

What other workaround can i do?

Thanks Shannon

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/justonedb/kafka-sink-pg-json/issues/2#issuecomment-256487417, or mute the thread https://github.com/notifications/unsubscribe-auth/AA-W4zO_dfXxCVFzI73G2cD9f8dBEC7Sks5q38qngaJpZM4JhsgZ .

pronvis commented 7 years ago

Sometimes after rebalance I found the same exception:

[2017-03-23 14:43:04,313] ERROR Task ${task-name}-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.IllegalStateException: No current assignment for partition ${topic-name}-0
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
        at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:416)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:214)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-23 14:43:04,313] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

Can you advice something? (I use HDFS Connector)

P.S. I saw how task for this partition moved to another Worker, but Status (by REST call) shows that task in state FAILED.