Closed briggleman closed 7 years ago
Which version of the Sink are you on? On master we took out the flush call. There's still a pending issue with it.
kafka-connect-kudu-0.2-3.0.0-all.jar. Taken from here. Should I build from source?
Yes try from source.
will do. ill keep you posted. thanks!
issue still persists. additional note, i am running kudu 1.0.0. could that be part of the problem?
@briggleman in the kuduWriter.flush do this:
Option(session.flush()).map(_.asScala).foreach{ responses => responses .filter(r => (r == null) || r.hasRowError) .foreach(e => { throw new Throwable(s"Failed to flush one or more changes: ${e.getRowError.toString}") }) }
i will push this change to avoid NPE
@andrewstevenson : i thought we had the flush taken out
Will do. I'll keep you posted.
@stheppi I thought so too.
flush error appears to be resolved. still encountering errors with any data not sent through cli. but the flushing error is gone.
trace in case related.
[2016-11-15 18:37:16,423] INFO Connecting to Kudu Master at 10.12.225.75:7051 (com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$:44)
[2016-11-15 18:37:16,425] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769)
[2016-11-15 18:37:16,428] INFO Initialising Kudu writer (com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter:52)
[2016-11-15 18:37:16,456] INFO Sink task WorkerSinkTask{id=test.python.msg-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-11-15 18:37:16,559] INFO Discovered coordinator 192.168.233.3:9092 (id: 2147483646 rack: null) for group connect-test.python.msg. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:528)
[2016-11-15 18:37:16,562] INFO Revoking previously assigned partitions [] for group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:292)
[2016-11-15 18:37:16,563] INFO (Re-)joining group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:349)
[2016-11-15 18:37:16,566] INFO Successfully joined group connect-test.python.msg with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:457)
[2016-11-15 18:37:16,567] INFO Setting newly assigned partitions [test.python.msg-0] for group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:231)
[2016-11-15 18:37:16,616] INFO WorkerSinkTask{id=test.python.msg-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-11-15 18:37:16,632] ERROR Task test.python.msg-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found io.confluent.rest.exceptions.RestNotFoundException: Schema not found
io.confluent.rest.exceptions.RestNotFoundException: Schema not found
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.schemaNotFoundException(Errors.java:58)
at io.confluent.kafka.schemaregistry.rest.resources.SchemasResource.getSchema(SchemasResource.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:205)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:164)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:181)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:317)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:310)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:62)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:117)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-11-15 18:37:16,641] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
I'm generating messages from Python to Kafka and having issues when running the kudu connector. Specifically, I can see the output from python is being written properly by running
./kafka-avro-console-consumer
, however, when I run the kudu sink I am receiving the following error.kakfa-avro-console-consumer output:
kudu-sink.properties file:
Can confirm the schema is being set properly. Also, the kudu-sink example works fine when I generate data from the cli. What would be causing an issue with processing this stream?