sanjuthomas / kafka-connect-gcp-bigtable

Kafka Sink Connect to GCP Bigtable - https://www.confluent.io/hub/sanjuthomas/kafka-connect-gcp-bigtable
http://sanjuthomas.com
MIT License
8 stars 8 forks source link

Handling column not present in data #22

Open ee07dazn opened 2 years ago

ee07dazn commented 2 years ago

Hi @sanjuthomas

Does the transform handle a column that is not present in the map? Say for example, my config looks like

    families:
    - data
    familyQualifiers:
    - data:
      - anonymousId
      - context

And a test data looks like

{
   "anonymousId" : "random123"
}

Will the transform you have baked into, will automatically set the context to null or throw an error. Guess, I will find out pretty soon but if it throws an error, is there a good way to handle it ?

sanjuthomas commented 2 years ago

https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/main/java/com/sanjuthomas/gcp/bigtable/transform/TypeUtils.java#L22

Maybe we have to handle that null in that method. Do you want me to make that fix? or will you be able to send a PR?

ee07dazn commented 2 years ago

Hi @sanjuthomas

Thanks for your quick reply.

Unfortunately, I am not a JAVA guy (:python:) so i would request you if you can make the change. At the same time, i am testing the latest tag to see how it behaves.

Would be very grateful if you can help out here (assuming this takes insignificant time on your end?)

Kind Regards Karan 🙏

sanjuthomas commented 2 years ago

OK, I am working on it.. https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/pulls

I will release the next version with that fix. Thanks!

sanjuthomas commented 2 years ago

The fix is part of https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/releases/tag/kafka-connect-gcp-bigtable-1.0.10

ee07dazn commented 2 years ago

Hi @sanjuthomas

Thanks for this.

While testing (not 1.0.10 but 1.0.7) , I have noticed some weird behaviour and wonder if you have come across this.

So, using the same example above, where my column family is data and has two cols - name, age - I am noticing that for messages, which has both the values for eg:

{
   "name" : "Karan",
   "age" : 22
}

randomly, the data that is getting written to the bigtable is missing attributes, like in some cases, age is missed, even though the value is present. This is only happening though (from my current investigation so far) where keys are same.

So for example row_key = 1

{
   "name" : "Karan",
   "age" : 22
}

after a while, another message comes, with the same key row_key = 1

{
   "name" : "Karan",
   "age" : 24
}

in this case, randomly, age of the second is missing.

Have you come across any such issue ?

Also, I think adding offset info to the metadata might be useful to debug such issues.

It feels to me that something is going wrong in transform maybe 🤔 ?

ee07dazn commented 2 years ago

The behaviour is random in nature, so for the same messages, when i ran it into another table, different row_key had missing data.

sanjuthomas commented 2 years ago

I have not seen any issues like that, nor has anyone reported it. It's a simple transform to transform a SinkRecord to Bigtable row. Bigtable maintains all versions of data as far as I can remember. Can you check and confirm that the data elements are produced correctly by the source?

ee07dazn commented 2 years ago

Yes, I can confirm that all elements of the data are produced correctly. This is because i can see it in the topic but by the time it reaches BigTable, i dont see some randomly.

Just FYI - my message is a json, with 18 attributes, with each attribute either being a datatype string or something like another json. Apart from randomly missing attributes, i haven't seen another issue.

I wonder, somehow transform is returning the row a bit quicker or .. 🤔 is their some multithreading kinda thing in place, where the transform is acted on the message more than once or something...sorry just throwing out my thoughts here..maybe this is the wrong directions.

ee07dazn commented 2 years ago

Screenshot 2022-11-23 at 18 02 11 Screenshot 2022-11-23 at 18 02 28 Screenshot 2022-11-23 at 18 03 17

As you will notice in the snapshots, the topic reports 4 messages of that key - 82363d5c-93b8-416a-bc03-a4fda8935952 , which is nothing but the userId in the data. But the snapshot of bigtable shows only 3

sanjuthomas commented 2 years ago

First, I will suggest you upgrade to the latest version. Second, please enable debug logging - https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/main/java/com/sanjuthomas/gcp/bigtable/sink/BigtableSinkTask.java#L52 will print the transformed row. That will tell you if there is an issue with transformation.

Its a nothing shared design, so I don't see how threads can overwrite data.

ee07dazn commented 2 years ago

Cool, i will give this a go ...just to confirm the latest tag is kafka-connect-**-1.0.10 ?

sanjuthomas commented 2 years ago

yes

sanjuthomas commented 2 years ago

Please let me know. If there is a bug, I am happy to fix it.

ee07dazn commented 2 years ago

Thanks @sanjuthomas I followed your advice I can see that the transformed row debug log is showing the right values but the ultimate records written to bigTable are missing data :( Any thoughts/suggestion on where to look?

sanjuthomas commented 2 years ago

Did you find any pattern about the missing data element? Any specific data type?

sanjuthomas commented 2 years ago

I have added some more debug logging and released a new version - https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/releases/tag/kafka-connect-gcp-bigtable-1.0.12

can you deploy that version and see what is logged here - https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/main/java/com/sanjuthomas/gcp/bigtable/writer/BigtableWriter.java#L146

I will test it myself tomorrow if we can't find the reason. I would need your table definition, though.

ee07dazn commented 2 years ago

Did you find any pattern about the missing data element? Any specific data type?

The only pattern i have found so far is that the issue is not happening when each record has a unique key.

However, i am just getting on looking into ☝️ suggestion and trying the new version out.

Once i have tested this out, I ll update you. But thanks for quick turnaround here.

ee07dazn commented 2 years ago

Screenshot 2022-11-24 at 15 21 04 Getting build failure :(

sanjuthomas commented 2 years ago

If you are building locally, you need to use Java 11. Instead of building locally, you can always use https://search.maven.org/artifact/com.sanjuthomas/kafka-connect-gcp-bigtable/1.0.12/jar

sanjuthomas commented 2 years ago

The only pattern i have found so far is that the issue is not happening when each record has a unique key. --

Does that mean when you send the first version of the record, every cell is saved, but when you send the second version of the same record, one or more cells are not saved?

ee07dazn commented 2 years ago

The only pattern i have found so far is that the issue is not happening when each record has a unique key. --

Does that mean when you send the first version of the record, every cell is saved, but when you send the second version of the same record, one or more cells are not saved?

because i can't see the offset info in the metadata and the record is fairly big so can't confirm that but will check on this. But given each unique key is working fine, my guess is the problem is when a new record is written to the same row-key

If you are building locally, you need to use Java 11. Instead of building locally, you can always use https://search.maven.org/artifact/com.sanjuthomas/kafka-connect-gcp-bigtable/1.0.12/jar

should i use the jar or shaded.jar ?

sanjuthomas commented 2 years ago

shaded.jar (kafka-connect-gcp-bigtable-1.0.12-shaded.jar)

ee07dazn commented 2 years ago

Screenshot 2022-11-24 at 16 35 31 Getting this issue with 1.0.12 :(

sanjuthomas commented 2 years ago

Let me take a look. looks like some library upgrade caused this. let me test the use case you mentioned also.

sanjuthomas commented 2 years ago

I have fixed the Jar sign issue. The next version will show up in maven in a couple of hours.

I have tested the use case that you mentioned. Same row key but with different values for the cell. I have not seen any issues. The only thing I can think of is the java version you are using for the connector. I have tested it for Java 11.

Can you confirm the java version that you are using for the connector?

btw I have a utility program that you can use to test the same use case that you mentioned here -

https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/test/java/com/sanjuthomas/bigtable/utils/UpsertData.java

You can run that again and again for the instance -> table that you have and see if you can reproduce that issue. I suggest you use Java 11.

sanjuthomas commented 2 years ago

@ee07dazn If you are in a hurry, you can pick it up from https://tmpfiles.org/dl/313349/kafka-connect-gcp-bigtable-1.0.13-shaded.jar

please note that it will be available only for an hour.

sanjuthomas commented 2 years ago

https://repo1.maven.org/maven2/com/sanjuthomas/kafka-connect-gcp-bigtable/1.0.13/

ee07dazn commented 2 years ago

Thanks for this @sanjuthomas Will give this a go in morning and update you. I am wondering the issue maybe because

  1. We are using BatchMutation
  2. In the given batch, two records of the same row-key exists.
  3. For the same row-key, the value of a particular column is same.
  4. Since a mutation involves a timestamp, which will be same in the batch, and since the column value is same for the row key, maybe that will cause only one value to appear.

Will need to do a few checks but will test it out tomorrow morning and update you.

sanjuthomas commented 2 years ago

I tested the same use case using the connector. I could not reproduce the issue. If you suspect that is the issue, you could use the configuration ulkMutateRowsMaxSize: 1 to write one record at a time and see.

What is your bulkMutateRowsMaxSize now?

https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/test/resources/demo-topic.yml#L6

ee07dazn commented 1 year ago

Apologies for a little delay,

Initially my bulkMutateRowsMaxSize was default of 1024. Changing this to 1, actually seems to have sorted the issue. And i think this logic makes sense, isnt it ? i do think i need to test a bit more though to be 100% sure, but initial results are suggesting the same. Did you check the same thing on your end?

For example two records of same row-key and same value and part of the same batch ?

In regards to the new 1.10.13, it also had some issues, basically crashing the pod. I will paste the output here soon. But i could test the above on 1.0.7

sanjuthomas commented 1 year ago

For example two records of same row-key and same value and part of the same batch ? - I have tested this last week. I can test once again.

In regards to the new 1.10.13, it also had some issues, basically crashing the pod. - I tested the connectors in stand-alone mode using Java 11 before I published .13 version. Can you tell me your Java versions and other configs?

Do you have a stack trace or heap dump for the crash?

ee07dazn commented 1 year ago
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f6ce07bf450, pid=43, tid=602
#
# JRE version: OpenJDK Runtime Environment 18.9 (11.0.15+10) (build 11.0.15+10-LTS)
# Java VM: OpenJDK 64-Bit Server VM 18.9 (11.0.15+10-LTS, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)
# Problematic frame:
# C  0x00007f6ce07bf450
#
# Core dump will be written. Default location: /core.%e.43.%t
#
# An error report file with more information is saved as:
# /tmp/hs_err_pid43.log
#
# If you would like to submit a bug report, please visit:
#   https://bugzilla.redhat.com/enter_bug.cgi?product=Red%20Hat%20Enterprise%20Linux%208&component=java-11-openjdk
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
ee07dazn commented 1 year ago
Screenshot 2022-11-28 at 19 24 29
sanjuthomas commented 1 year ago

Looking at the error message, I believe you are using open jdk 11.0.15+10 (https://mail.openjdk.org/pipermail/jdk-updates-dev/2022-April/014104.html) can you check what is inside /tmp/hs_err_pid43.log?

It appears to me that some C libraries are crashing.

ee07dazn commented 1 year ago
Internal exceptions (20 events):
Event: 9.992 Thread 0x00007f47d0a53800 Exception <a 'java/io/IOException'{0x00000007fb215298}> (0x00000007fb215298) thrown at [src/hotspot/share/prims/jni.cpp, line 617]
Event: 12.547 Thread 0x00007f48270ca000 Exception <a 'java/lang/NoSuchMethodError'{0x00000007fc9c5360}: 'long java.lang.invoke.DirectMethodHandle$Holder.invokeVirtual(java.lang.Object, java.lang.Object)'> (0x00000007fc9c5360) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.552 Thread 0x00007f48270ca000 Exception <a 'java/lang/NoSuchMethodError'{0x00000007fb0b8148}: 'int java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.lang.Object, java.lang.Object, int, int)'> (0x00000007fb0b8148) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.584 Thread 0x00007f4826c3a800 Exception <a 'java/lang/NoSuchMethodError'{0x00000007fae69630}: 'java.lang.Object java.lang.invoke.DirectMethodHandle$Holder.invokeSpecial(java.lang.Object, java.lang.Object, java.lang.Object, java.lang.Object, long)'> (0x00000007fae69630) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.732 Thread 0x00007f4827247000 Exception <a 'java/lang/NoSuchMethodError'{0x00000007fa6af4f8}: 'java.lang.Object java.lang.invoke.Invokers$Holder.linkToTargetMethod(java.lang.Object, java.lang.Object, long, java.lang.Object)'> (0x00000007fa6af4f8) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.773 Thread 0x00007f47f8017800 Exception <a 'java/lang/ClassCastException'{0x00000007f9f32a18}: class com.fasterxml.jackson.databind.ext.Java7SupportImpl cannot be cast to class com.fasterxml.jackson.databind.ext.Java7Support (com.fasterxml.jackson.databind.ext.Java7SupportImpl is in unnamed module of loader 'app'; com.fasterxml.jackson.databind.ext.Java7Support is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @49601f82)> (0x00000007f9f32a18) thrown at [src/hotspot/share/interpreter/interpreter
Event: 12.794 Thread 0x00007f482735e800 Exception <a 'java/lang/NoSuchMethodError'{0x00000007f9fb5968}: 'java.lang.Object java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.lang.Object, java.lang.Object, java.lang.Object, java.lang.Object, int, long)'> (0x00000007f9fb5968) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.796 Thread 0x00007f4827247000 Exception <a 'java/lang/NoSuchMethodError'{0x00000007f9e654b0}: 'java.lang.Object java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.lang.Object, java.lang.Object, java.lang.Object, java.lang.Object, int, long)'> (0x00000007f9e654b0) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.906 Thread 0x00007f48270ca000 Exception <a 'java/lang/NoSuchMethodError'{0x00000007ffd5b950}: 'java.lang.Object java.lang.invoke.Invokers$Holder.linkToTargetMethod(java.lang.Object, long, java.lang.Object, java.lang.Object)'> (0x00000007ffd5b950) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 12.959 Thread 0x00007f47ec00c800 Exception <a 'java/lang/ClassCastException'{0x00000007ffa49d18}: class com.fasterxml.jackson.databind.ext.Java7HandlersImpl cannot be cast to class com.fasterxml.jackson.databind.ext.Java7Handlers (com.fasterxml.jackson.databind.ext.Java7HandlersImpl is in unnamed module of loader 'app'; com.fasterxml.jackson.databind.ext.Java7Handlers is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @49601f82)> (0x00000007ffa49d18) thrown at [src/hotspot/share/interpreter/interpr
Event: 13.018 Thread 0x00007f47f0019800 Exception <a 'java/lang/NoSuchMethodError'{0x00000007ff864ef0}: 'void java.lang.invoke.DirectMethodHandle$Holder.invokeSpecial(java.lang.Object, java.lang.Object)'> (0x00000007ff864ef0) thrown at [src/hotspot/share/interpreter/linkResolver.cpp, line 773]
Event: 22.839 Thread 0x00007f47ec00c800 Exception <a 'java/lang/reflect/InvocationTargetException'{0x00000007fd284c50}> (0x00000007fd284c50) thrown at [src/hotspot/share/runtime/reflection.cpp, line 1247]
Event: 22.840 Thread 0x00007f47ec00c800 Exception <a 'java/lang/reflect/InvocationTargetException'{0x00000007fd298840}> (0x00000007fd298840) thrown at [src/hotspot/share/runtime/reflection.cpp, line 1247]
Event: 22.861 Thread 0x00007f47ec00c800 Exception <a 'java/lang/NoClassDefFoundError'{0x00000007fd2fd978}: io/grpc/netty/shaded/io/netty/internal/tcnative/SSLTask> (0x00000007fd2fd978) thrown at [src/hotspot/share/classfile/systemDictionary.cpp, line 219]
Event: 22.861 Thread 0x00007f47ec00c800 Exception <a 'java/lang/UnsatisfiedLinkError'{0x00000007fd2fdf98}: unsupported JNI version 0xFFFFFFFF required by /tmp/libio_grpc_netty_shaded_netty_tcnative_linux_x86_64173526158786210554.so> (0x00000007fd2fdf98) thrown at [src/hotspot/share/prims/jni.cpp, line 638]
Event: 22.862 Thread 0x00007f47ec00c800 Exception <a 'java/lang/reflect/InvocationTargetException'{0x00000007fd2fe510}> (0x00000007fd2fe510) thrown at [src/hotspot/share/runtime/reflection.cpp, line 1247]
Event: 22.862 Thread 0x00007f47ec00c800 Exception <a 'java/lang/NoClassDefFoundError'{0x00000007fd10a5e0}: io/grpc/netty/shaded/io/netty/internal/tcnative/SSLTask> (0x00000007fd10a5e0) thrown at [src/hotspot/share/classfile/systemDictionary.cpp, line 219]
Event: 22.862 Thread 0x00007f47ec00c800 Exception <a 'java/lang/UnsatisfiedLinkError'{0x00000007fd10ac00}: unsupported JNI version 0xFFFFFFFF required by /tmp/libio_grpc_netty_shaded_netty_tcnative_linux_x86_64173526158786210554.so> (0x00000007fd10ac00) thrown at [src/hotspot/share/prims/jni.cpp, line 638]
Event: 22.864 Thread 0x00007f47ec00c800 Exception <a 'java/lang/reflect/InvocationTargetException'{0x00000007fd111b48}> (0x00000007fd111b48) thrown at [src/hotspot/share/runtime/reflection.cpp, line 1247]
Event: 22.864 Thread 0x00007f47ec00c800 Exception <a 'java/lang/reflect/InvocationTargetException'{0x00000007fd121a48}> (0x00000007fd121a48) thrown at [src/hotspot/share/runtime/reflection.cpp, line 1247]
ee07dazn commented 1 year ago

any thoughts ?

sanjuthomas commented 1 year ago

I haven't got enough time to test out the JVM that you are using. I will keep you posted.

ee07dazn commented 1 year ago

any side effects of using bulkMutateRowsMaxSize = 1, especially when rate of messages coming into the topic is not that high ? also, i believe the current write setting of the connector is basically batch writes but setting bulkMutateRowsMaxSize = 1 basically makes it single simple writes ?

sanjuthomas commented 1 year ago

I don't see any side effects due to that. I will get some time this weekend to test out the JVM version and the issue you mentioned.

sanjuthomas commented 1 year ago

Do you have a public docker image of the OS and JVM?

ee07dazn commented 1 year ago

https://hub.docker.com/r/ee07dazn/kafkaconnect-connectors

has 1.0.10 jar installed in opt/kafka/plugins