confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
122 stars 1.04k forks source link

Insert into values throws an error on a table without a key #3021

Closed MichaelDrogalis closed 5 years ago

MichaelDrogalis commented 5 years ago

KSQL recently relaxed the need to specify a key for a table. This seems to cause insert into values to throw a strange error:

ksql> create table movies (id INT, title VARCHAR, release_year INT) with (kafka_topic='movies', partitions=1, value_format='avro');
i
 Message       
---------------
 Table created 
---------------

ksql> insert into movies (id, title, release_year) values (10, 'anything', 1998);
Failed to insert values into stream/table: MOVIES
Caused by: Producer is closed forcefully.

Notably, this works when I specify a key for the table:

ksql> create table movies_2 (id INT, title VARCHAR, release_year INT) with (kafka_topic='movies_2', partitions=1, value_format='avro', key='id');

 Message       
---------------
 Table created 
---------------
ksql> insert into movies_2 (id, title, release_year) values (10, 'anything', 1998);
ksql> 
big-andy-coates commented 5 years ago

@purplefox can you also take a look at this related issue please?

purplefox commented 5 years ago

I've tried to reproduce this. I don't get the error message as reported above, but I do get a different one. Will investigate further:

ksql> create table movies (id INT, title VARCHAR, release_year INT) with (kafka_topic='test1', partitions=1, value_format='avro');
create table movies (id INT, title VARCHAR, release_year INT) with (kafka_topic='test1', partitions=1, value_format='avro')
 Message       

 Table created 

ksql> 
ksql> insert into movies (id, title, release_year) values (10, 'anything', 1998);
insert into movies (id, title, release_year) values (10, 'anything', 1998)
Failed to insert values into 'MOVIES'. Could not serialize row: [ 10 | 'anything' | 1998 ]
big-andy-coates commented 5 years ago

As the CT statement doesn't define a key field in the WITH clause the user must supply a value for ROWKEY in the INSERT INTO statement. If they don't, as is the case above, then the statement should fail and a suitable error returned to the user.

In that sense this is related to https://github.com/confluentinc/ksql/issues/3349

For tables we probably shouldn't allow users to insert with null keys until KSQL itself can handle null keys. For streams a null key is fine.

purplefox commented 5 years ago

The previous error message I was getting because the schema registry was not running. When I run with confluent platform, I cannot reproduce this error, and the insert seem to proceed fine:

ksql> create table movies4 (id INT, title VARCHAR, release_year INT) with (kafka_topic='test3', partitions=1, value_format='avro');
create table movies4 (id INT, title VARCHAR, release_year INT) with (kafka_topic='test3', partitions=1, value_format='avro')
 Message       

 Table created 

ksql> insert into movies4 (id, title, release_year) values (10, 'anything', 1998);
insert into movies4 (id, title, release_year) values (10, 'anything', 1998)ksql> 
ksql> print test3 from beginning;
Format:AVRO
18/09/19 13:30:15 BST, null, {"ID": 10, "TITLE": "anything", "RELEASE_YEAR": 1998}

test3 is a newly created topic in the above.

purplefox commented 5 years ago

I'm going to close this as cannot reproduce. I assume the underlying issue has been fixed by another change recently.

If anyone can reproduce it, please re-open.

big-andy-coates commented 5 years ago

Reopening as this is definitely still an issue - we should not allow producing data to tables with null keys, as we do not support null keys in tables.

big-andy-coates commented 5 years ago

Steps to reproduce, (running against 5.1 CP started with confluent start followed by confluent stop ksql-server and running version of KSQL server + cli from master branch).

-- look mum, no KEY set in WITH clause:
ksql> create table Trades(TradeId string, AccountId string, Amount double) with (KAFKA_TOPIC = 'TradeHistory', VALUE_FORMAT = 'JSON', PARTITIONS=1);

 Table created 

-- When we:
ksql> INSERT INTO Trades (RowKey, AccountId, Amount) VALUES ('t1', 'acc1', 106.0);
Failed to insert values into 'TRADES'.
Caused by: Producer is closed forcefully.

Logs server side say:

[2019-10-17 09:45:38,817] INFO Received: KsqlRequest{ksql='INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t1', 'acc1', 106.0);', streamsProperties={}, commandSequenceNumber=Optional[3]} (io.confluent.ksql.rest.server.resources.KsqlResource:198)
[2019-10-17 09:45:38,822] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.avro.maps.named = true
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.functions.substring.legacy.args = false
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.named.internal.topics = on
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.query.fields.key.legacy = false
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.stream.groupby.rowkey.repartition = false
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.partitions = null
    ksql.sink.replicas = null
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ksql.windowed.session.key.legacy = false
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2019-10-17 09:45:38,823] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.avro.maps.named = true
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.functions.substring.legacy.args = false
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.named.internal.topics = on
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.query.fields.key.legacy = false
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.stream.groupby.rowkey.repartition = false
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.partitions = null
    ksql.sink.replicas = null
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ksql.windowed.session.key.legacy = false
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2019-10-17 09:45:38,879] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.avro.maps.named = true
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.functions.substring.legacy.args = false
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.named.internal.topics = on
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.query.fields.key.legacy = false
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.stream.groupby.rowkey.repartition = false
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.partitions = null
    ksql.sink.replicas = null
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ksql.windowed.session.key.legacy = false
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2019-10-17 09:45:38,880] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.avro.maps.named = true
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.functions.substring.legacy.args = false
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.named.internal.topics = on
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.query.fields.key.legacy = false
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.stream.groupby.rowkey.repartition = false
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.partitions = null
    ksql.sink.replicas = null
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ksql.windowed.session.key.legacy = false
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2019-10-17 09:45:38,895] WARN [Producer clientId=producer-2] Got error produce response with correlation id 5 on topic-partition TradeHistory-0, retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,001] WARN [Producer clientId=producer-2] Got error produce response with correlation id 6 on topic-partition TradeHistory-0, retrying (2147483645 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,105] WARN [Producer clientId=producer-2] Got error produce response with correlation id 7 on topic-partition TradeHistory-0, retrying (2147483644 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,210] WARN [Producer clientId=producer-2] Got error produce response with correlation id 8 on topic-partition TradeHistory-0, retrying (2147483643 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,316] WARN [Producer clientId=producer-2] Got error produce response with correlation id 9 on topic-partition TradeHistory-0, retrying (2147483642 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,418] WARN [Producer clientId=producer-2] Got error produce response with correlation id 10 on topic-partition TradeHistory-0, retrying (2147483641 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,524] WARN [Producer clientId=producer-2] Got error produce response with correlation id 11 on topic-partition TradeHistory-0, retrying (2147483640 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,629] WARN [Producer clientId=producer-2] Got error produce response with correlation id 12 on topic-partition TradeHistory-0, retrying (2147483639 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,733] WARN [Producer clientId=producer-2] Got error produce response with correlation id 13 on topic-partition TradeHistory-0, retrying (2147483638 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,839] WARN [Producer clientId=producer-2] Got error produce response with correlation id 14 on topic-partition TradeHistory-0, retrying (2147483637 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:39,942] WARN [Producer clientId=producer-2] Got error produce response with correlation id 15 on topic-partition TradeHistory-0, retrying (2147483636 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,046] WARN [Producer clientId=producer-2] Got error produce response with correlation id 16 on topic-partition TradeHistory-0, retrying (2147483635 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,151] WARN [Producer clientId=producer-2] Got error produce response with correlation id 17 on topic-partition TradeHistory-0, retrying (2147483634 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,255] WARN [Producer clientId=producer-2] Got error produce response with correlation id 18 on topic-partition TradeHistory-0, retrying (2147483633 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,360] WARN [Producer clientId=producer-2] Got error produce response with correlation id 19 on topic-partition TradeHistory-0, retrying (2147483632 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,464] WARN [Producer clientId=producer-2] Got error produce response with correlation id 20 on topic-partition TradeHistory-0, retrying (2147483631 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,569] WARN [Producer clientId=producer-2] Got error produce response with correlation id 21 on topic-partition TradeHistory-0, retrying (2147483630 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,674] WARN [Producer clientId=producer-2] Got error produce response with correlation id 22 on topic-partition TradeHistory-0, retrying (2147483629 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,778] WARN [Producer clientId=producer-2] Got error produce response with correlation id 23 on topic-partition TradeHistory-0, retrying (2147483628 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,882] WARN [Producer clientId=producer-2] Got error produce response with correlation id 24 on topic-partition TradeHistory-0, retrying (2147483627 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:40,987] WARN [Producer clientId=producer-2] Got error produce response with correlation id 25 on topic-partition TradeHistory-0, retrying (2147483626 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,091] WARN [Producer clientId=producer-2] Got error produce response with correlation id 26 on topic-partition TradeHistory-0, retrying (2147483625 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,198] WARN [Producer clientId=producer-2] Got error produce response with correlation id 27 on topic-partition TradeHistory-0, retrying (2147483624 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,303] WARN [Producer clientId=producer-2] Got error produce response with correlation id 28 on topic-partition TradeHistory-0, retrying (2147483623 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,405] WARN [Producer clientId=producer-2] Got error produce response with correlation id 29 on topic-partition TradeHistory-0, retrying (2147483622 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,509] WARN [Producer clientId=producer-2] Got error produce response with correlation id 30 on topic-partition TradeHistory-0, retrying (2147483621 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,616] WARN [Producer clientId=producer-2] Got error produce response with correlation id 31 on topic-partition TradeHistory-0, retrying (2147483620 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,722] WARN [Producer clientId=producer-2] Got error produce response with correlation id 32 on topic-partition TradeHistory-0, retrying (2147483619 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,824] WARN [Producer clientId=producer-2] Got error produce response with correlation id 33 on topic-partition TradeHistory-0, retrying (2147483618 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:41,927] WARN [Producer clientId=producer-2] Got error produce response with correlation id 34 on topic-partition TradeHistory-0, retrying (2147483617 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,033] WARN [Producer clientId=producer-2] Got error produce response with correlation id 35 on topic-partition TradeHistory-0, retrying (2147483616 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,139] WARN [Producer clientId=producer-2] Got error produce response with correlation id 36 on topic-partition TradeHistory-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,243] WARN [Producer clientId=producer-2] Got error produce response with correlation id 37 on topic-partition TradeHistory-0, retrying (2147483614 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,345] WARN [Producer clientId=producer-2] Got error produce response with correlation id 38 on topic-partition TradeHistory-0, retrying (2147483613 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,449] WARN [Producer clientId=producer-2] Got error produce response with correlation id 39 on topic-partition TradeHistory-0, retrying (2147483612 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,555] WARN [Producer clientId=producer-2] Got error produce response with correlation id 40 on topic-partition TradeHistory-0, retrying (2147483611 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,658] WARN [Producer clientId=producer-2] Got error produce response with correlation id 41 on topic-partition TradeHistory-0, retrying (2147483610 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,763] WARN [Producer clientId=producer-2] Got error produce response with correlation id 42 on topic-partition TradeHistory-0, retrying (2147483609 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,868] WARN [Producer clientId=producer-2] Got error produce response with correlation id 43 on topic-partition TradeHistory-0, retrying (2147483608 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:42,973] WARN [Producer clientId=producer-2] Got error produce response with correlation id 44 on topic-partition TradeHistory-0, retrying (2147483607 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,076] WARN [Producer clientId=producer-2] Got error produce response with correlation id 45 on topic-partition TradeHistory-0, retrying (2147483606 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,181] WARN [Producer clientId=producer-2] Got error produce response with correlation id 46 on topic-partition TradeHistory-0, retrying (2147483605 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,286] WARN [Producer clientId=producer-2] Got error produce response with correlation id 47 on topic-partition TradeHistory-0, retrying (2147483604 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,392] WARN [Producer clientId=producer-2] Got error produce response with correlation id 48 on topic-partition TradeHistory-0, retrying (2147483603 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,498] WARN [Producer clientId=producer-2] Got error produce response with correlation id 49 on topic-partition TradeHistory-0, retrying (2147483602 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,603] WARN [Producer clientId=producer-2] Got error produce response with correlation id 50 on topic-partition TradeHistory-0, retrying (2147483601 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,706] WARN [Producer clientId=producer-2] Got error produce response with correlation id 51 on topic-partition TradeHistory-0, retrying (2147483600 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)
[2019-10-17 09:45:43,808] WARN [Producer clientId=producer-2] Got error produce response with correlation id 52 on topic-partition TradeHistory-0, retrying (2147483599 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender:637)