apache / openwhisk-package-kafka

Apache OpenWhisk package for communicating with Kafka or Message Hub
https://openwhisk.apache.org/
Apache License 2.0
32 stars 43 forks source link

Should decode not encode UTF-8 messages? #277

Open ScottChapman opened 6 years ago

ScottChapman commented 6 years ago

Discovered that messages containing unicode text appear to be getting corrupted.

I suspect it is this: https://github.com/apache/incubator-openwhisk-package-kafka/blob/449bbae13e813ba4dcd11dc33f47ab29d5e3541a/provider/consumer.py#L455

From the kafka-python docs image

abaruni commented 6 years ago

@ScottChapman

we run

value = value.encode('utf-8')

merely to ensure that the data is valid unicode. the motivation behind this is that in the past we have received corrupted data from Message Hub and that message is passed as part of the payload to the request which itself attempts to encode the incoming data as part of the json module. In fact, encode runs an implicit decode prior to attempting to actually encode. likewise decode runs an implicit encode prior to attempting to actually decode.

>>> '\xb6'.encode('utf-8')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
UnicodeDecodeError: 'ascii' codec can't decode byte 0xb6 in position 0: ordinal not in range(128)
>>> u'\xb6'.decode('utf-8')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeEncodeError: 'ascii' codec can't encode character u'\xb6' in position 0: ordinal not in range(128)

As you can see the call to '\xb6'.encode('utf-8') results in a UnicodeDecodeError and the call to u'\xb6'.decode('utf-8') results in a UnicodeEncodeError

But the ultimate point of running value.encode('utf-8') is merely to ensure that we are working with valid unicode before passing it down to other modules such as json and requests as those modules will surface errors if we don't verify beforehand

The data therefore is arriving corrupt and is not being corrupted by the use of this function call

ScottChapman commented 6 years ago

Geez, that is super confusing ( ;-) ). All I could tell from the docs (and my understanding of Kafka) is that the messages are always bytes which need to be properly converted to text (assuming it is text of course). I think that looks like a destructive test though since it assigns value to the result.

There is some discussion in the #whisk-users channel about some data corruption. It was confirmed that the data is fine through Kafka (consumer shows right data) and obviously the action can be passed proper data and it looks fine. So this looked suspicious since it is the "middleman" here.

The discussion is here: https://ibm-ics.slack.com/archives/C0BUS3JE8/p1534022249000007

maneeshmehra commented 6 years ago

I agree with Scott that there is no issue with the IBM Message Hub instance (Kafka) WRT corruption of messages. To confirm that was indeed the case, I created:

  1. A Java-based Kafka Producer class (com.ibm.kafka.KafkaProducerClient)
  2. A Java-based Kafka Consumer class(com.ibm.kafka.KafkaConsumerClient)

The producer posts a message with some English and some non-English characters to a topic called Greeter. The consumer polls the same topic and prints out the message.

Here is the output I am receiving from each:

Producer: diamond:target mmehra$ java -cp KafkaClient-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.ibm.kafka.KafkaProducerClient SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Producer: Posting message to topic: Greeter ==> {"greeting":"Howdy","user":"Malalažbeta"} Producer: Message posted to Partition: 0, Offset: 21 diamond:target mmehra$

Consumer: diamond:target mmehra$ java -cp KafkaClient-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.ibm.kafka.KafkaConsumerClient SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Consumer: Polling For New Messages on Greeter topic... Consumer: Received from topic => Partition: 0, Offset: 21, Message: {"greeting":"Howdy","user":"Malalažbeta"}

As you can see, the message is being received with the correct encoding from the IBM Message Hub topic. However, if I post the same message using the producer class and consume it via the IBM Cloud Function, the message is received as corrupted as described in the Slack conversation that Scott pointed to in his earlier comment.

maneeshmehra commented 6 years ago

Also note that in none of my implementations, I am specifying any encoding. I am relying on the default of UTF-8 as the encoding on both the producer as well as the consumer side and assuming that the underlying transport is bytes.

ScottChapman commented 6 years ago

And we know OpenWhisk Actions can receive parameters don't corrupt the text. Should be pretty simple to validate.

maneeshmehra commented 6 years ago

Yes, I have already verified that and eliminated that when the OpenWhisk function/trigger is called directly (using curl) with the same payload, there is no corruption of data. Its only when the data is posted to the message hub topic and received by the whisk function (via this handler), we see corruption of data.

maneeshmehra commented 6 years ago

Confirming that there is no issue with neither the function nor the user provided IBM Message Hub trigger (calling the function) when invoked directly.

Here is the payload being sent each time:

diamond:kafka mmehra$ cat data.json { "greeting": "Howdy", "user": "Malalažbeta" }

  1. Direct curl call to the function using its endpoint API (take a note of the activationId returned by the call):

diamond:kafka mmehra$ curl -u [XXXX:YYYYY] -d @data.json --header "Content-Type: application/json" -X POST https://openwhisk.ng.bluemix.net/api/v1/namespaces/XXXXX_dev/actions/greeting_package/say_hello?blocking=true {"duration":238,"name":"say_hello","subject":"XXXXX","activationId":"34c60d32186040c4860d321860a0c4e1","publish":false,"annotations":[{"key":"path","value":"XXXXX_dev/greeting_package/say_hello"},{"key":"waitTime","value":592},{"key":"kind","value":"nodejs:8"},{"key":"limits","value":{"timeout":60000,"memory":256,"logs":10}},{"key":"initTime","value":232}],"version":"0.0.1","response":{"result":{"Message":"Howdy, Malalažbeta","Status":"Success","Code":200},"success":true,"status":"success"},"end":1534427678991,"logs":[],"start":1534427678753,"namespace":"XXXXX_dev"} diamond:kafka mmehra$

  1. Direct curl call to the user provided IBM Message Hub trigger using its endpoint API (again, note the activationId returned by the call): diamond:kafka mmehra$ curl -u XXXXX:YYYYY -d @data.json --header "Content-Type: application/json" -X POST https://openwhisk.ng.bluemix.net/api/v1/namespaces/XXXX_dev/triggers/say_hello_trigger?blocking=true {"activationId":"318092b6ce344d718092b6ce34fd7158"} diamond:kafka mmehra$

Attached below is a screenshot of the curl call made to the user-provided trigger: ibm_cloud_function_trigger

Attached below is a screenshot of the console log generated by the IBM Cloud function, showing what parameters it received in both cases: ibm_cloud_log_analysis

abaruni commented 6 years ago

@ScottChapman yes i agree. it is very confusing. especially the way python 2.7 handles strings and bytes.

@maneeshmehra thanks for providing that info. i'll do some testing on the provider end with your input using encode vs decode. my understanding of these builtin python functions is quite limited. i was going off of my understanding of the behavior, but as i said my understanding is quite limited and the way python handles things is rather confusing. a little testing should be able to clear things up though

thank you both for digging into this and providing feedback

maneeshmehra commented 6 years ago

You are most welcome. Please keep us posted via this issue on your findings.

rabbah commented 6 years ago

Any reason not to run as python 3? Could normalize utf8 string handling.

tnakajo commented 6 years ago

The fix for this encoding issue has now been deployed into our production environment on Sep 4th through a support ticket.