Open ryancrawcour opened 3 years ago
Details of dead-letter queue support and configurations: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
the problem with this sample message
is that all 3 values are numbers, so i am not sure if it's the non-string id causing the issue, or in fact a non-string PK value causing the issue.
if it's the id field causing the problem, we should ensure that id is always a string.
if PK path is NOT /id then PK can be any data type hence i believe this is actually happening because the id field is not a string.
Proposed Solution add new config option that allows the user to set whether they want to transform id to string if exists. transformIdToString==true|false default == false
if true, then do a toString transform on id field else fail and reject document
Alternate Solution look in to whether an SMT exists to do data type transforms if it does, then it's easier to get user to configure a SMT to do the data type conversion.
See if #390 fixes this issue.
Description
sink connector fails when PK value in topic is a long (data type conversion not happening correction long->string)
Message: { "id": 2804, "product_number": 2804, "product_name": “2804" }
Error: [2021-03-05 14:06:31,844] ERROR WorkerSinkTask{id=cosmosdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.azure.cosmos.kafka.connect.sink.CosmosDBWriteException: Unable to write record to CosmosDB: null (value schema:Schema{datagen.product:STRUCT} at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:86) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
Caused by: {"ClassName":"BadRequestException","userAgent":"azsdk-java-cosmos/4.11.0 MacOSX/10.15.7 JRE/11.0.10","statusCode":400,"resourceAddress":"rntbd://cdb-ms-prod-centralindia1-fd0.documents.azure.com:14092/apps/bbb89637-58d3-4009-a8d0-7803963a3701/services/4f9e883b-fd5b-42cf-8297-6d60f0f2f74d/partitions/cd221f07-7f90-4c7e-b614-bc9ecfa2b727/replicas/132593325004050143p/","error":"{\"Errors\":[\"The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters.\"]}","innerErrorMessage":"[\"The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters.\"]","causeInfo":null,"responseHeaders":"{x-ms-current-replica-set-size=4, x-ms-last-state-change-utc=Thu, 04 Mar 2021 11:55:23.110 GMT, x-ms-session-token=0:-1#1, lsn=1, x-ms-request-charge=1.24, x-ms-schemaversion=1.11, x-ms-transport-request-id=3, x-ms-number-of-read-regions=0, x-ms-current-write-quorum=3, x-ms-cosmos-quorum-acked-llsn=1, x-ms-quorum-acked-lsn=1, x-ms-activity-id=e1ed2d86-7d8d-11eb-a4e9-131f486068f5, x-ms-xp-role=1, x-ms-global-Committed-lsn=1, x-ms-cosmos-llsn=1, x-ms-serviceversion= version=2.11.0.0}","cosmosDiagnostics":{"userAgent":"azsdk-java-cosmos/4.11.0 MacOSX/10.15.7 JRE/11.0.10","requestLatencyInMs":1865,"requestStartTimeUTC":"2021-03-05T08:36:29.916147Z","requestEndTimeUTC":"2021-03-05T08:36:31.781581Z","connectionMode":"DIRECT","responseStatisticsList":[{"storeResult":{"storePhysicalAddress":"rntbd://cdb-ms-prod-centralindia1-fd0.documents.azure.com:14092/apps/bbb89637-58d3-4009-a8d0-7803963a3701/services/4f9e883b-fd5b-42cf-8297-6d60f0f2f74d/partitions/cd221f07-7f90-4c7e-b614-bc9ecfa2b727/replicas/132593325004050143p/","lsn":1,"globalCommittedLsn":1,"partitionKeyRangeId":"0","isValid":true,"statusCode":400,"subStatusCode":0,"isGone":false,"isNotFound":false,"isInvalidPartition":false,"requestCharge":1.24,"itemLSN":-1,"sessionToken":"-1#1","exception":"[\"The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters.\"]","transportRequestTimeline":[{"eventName":"created","startTimeUTC":"2021-03-05T08:36:30.987993Z","durationInMicroSec":386},{"eventName":"queued","startTimeUTC":"2021-03-05T08:36:30.988379Z","durationInMicroSec":3},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2021-03-05T08:36:30.988382Z","durationInMicroSec":658516},{"eventName":"pipelined","startTimeUTC":"2021-03-05T08:36:31.646898Z","durationInMicroSec":1832},{"eventName":"transitTime","startTimeUTC":"2021-03-05T08:36:31.648730Z","durationInMicroSec":124787},{"eventName":"received","startTimeUTC":"2021-03-05T08:36:31.773517Z","durationInMicroSec":7567},{"eventName":"completed","startTimeUTC":"2021-03-05T08:36:31.781084Z","durationInMicroSec":1982}],"rntbdRequestLengthInBytes":468,"rntbdResponseLengthInBytes":325,"requestPayloadLengthInBytes":55,"responsePayloadLengthInBytes":null,"channelTaskQueueSize":1,"pendingRequestsCount":1,"serviceEndpointStatistics":{"availableChannels":0,"acquiredChannels":0,"executorTaskQueueSize":0,"inflightRequests":1,"lastSuccessfulRequestTime":"2021-03-05T08:36:30.983Z","lastRequestTime":"2021-03-05T08:36:30.983Z","createdTime":"2021-03-05T08:36:30.988044Z","isClosed":false}},"requestResponseTimeUTC":"2021-03-05T08:36:31.781581Z","requestResourceType":"Document","requestOperationType":"Create"}],"supplementalResponseStatisticsList":[],"addressResolutionStatistics":{"e281f37a-7d8d-11eb-a4e9-131f486068f5":{"startTimeUTC":"2021-03-05T08:36:30.896711Z","endTimeUTC":"2021-03-05T08:36:30.985504Z","targetEndpoint":"https://rankesh-partner-demo-centralindia.documents.azure.com:443/addresses/?$resolveFor=dbs%2FLYY4AA%3D%3D%2Fcolls%2FLYY4ANz1SXE%3D%2Fdocs&$filter=protocol%20eq%20rntbd&$partitionKeyRangeIds=0","errorMessage":null,"inflightRequest":false}},"regionsContacted":["https://rankesh-partner-demo-centralindia.documents.azure.com:443/"],"retryContext":{"retryCount":0,"statusAndSubStatusCodes":null,"retryLatency":0},"metadataDiagnosticsContext":{"metadataDiagnosticList":[{"metaDataName":"CONTAINER_LOOK_UP","startTimeUTC":"2021-03-05T08:36:29.925804Z","endTimeUTC":"2021-03-05T08:36:30.503833Z","durationinMS":578},{"metaDataName":"PARTITION_KEY_RANGE_LOOK_UP","startTimeUTC":"2021-03-05T08:36:30.520209Z","endTimeUTC":"2021-03-05T08:36:30.890028Z","durationinMS":369},{"metaDataName":"SERVER_ADDRESS_LOOKUP","startTimeUTC":"2021-03-05T08:36:30.896720Z","endTimeUTC":"2021-03-05T08:36:30.985493Z","durationinMS":88}]},"serializationDiagnosticsContext":{"serializationDiagnosticsList":[{"serializationType":"ITEM_SERIALIZATION","startTimeUTC":"2021-03-05T08:36:29.918371Z","endTimeUTC":"2021-03-05T08:36:29.921127Z","durationInMicroSec":2756},{"serializationType":"PARTITION_KEY_FETCH_SERIALIZATION","startTimeUTC":"2021-03-05T08:36:30.505873Z","endTimeUTC":"2021-03-05T08:36:30.506922Z","durationInMicroSec":1049}]},"gatewayStatistics":null,"systemInformation":{"usedMemory":"662544 KB","availableMemory":"1434608 KB","systemCpuLoad":"(2021-03-05T08:36:02.928443Z 5.4%), (2021-03-05T08:36:07.928489Z 4.3%), (2021-03-05T08:36:12.930572Z 5.4%), (2021-03-05T08:36:17.928052Z 6.3%), (2021-03-05T08:36:22.930060Z 3.9%), (2021-03-05T08:36:27.925883Z 4.7%)"},"clientCfgs":{"id":0,"numberOfClients":1,"connCfg":{"rntbd":"(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:false)","gw":"(cps:1000, rto:PT5S, icto:null, p:false)","other":"(ed: true, cs: false)"},"consistencyCfg":"(consistency: null, mm: true, prgns: [])"}}} at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceived(RntbdRequestManager.java:768) at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:181) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533) at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Mono.block(Mono.java:1685) at com.azure.cosmos.CosmosContainer.blockItemResponse(CosmosContainer.java:232) at com.azure.cosmos.CosmosContainer.createItem(CosmosContainer.java:153) at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.addItemToContainer(CosmosDBSinkTask.java:96) at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more [2021-03-05 14:06:31,846] INFO Attempting to close client 0 (com.azure.cosmos.implementation.RxDocumentClientImpl:3714) [2021-03-05 14:06:31,846] INFO Shutting down ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3716) [2021-03-05 14:06:31,846] INFO Closing Global Endpoint Manager ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3717) [2021-03-05 14:06:31,846] INFO Closing StoreClientFactory ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3719) [2021-03-05 14:06:31,846] INFO Shutting down RntbdClientChannelPoolMonitoringProvider ... (com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider:591) [2021-03-05 14:06:31,851] INFO Shutting down reactorHttpClient ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3721) [2021-03-05 14:06:31,852] INFO Shutting down CpuMonitor ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3723) [2021-03-05 14:06:31,852] INFO Shutting down completed. (com.azure.cosmos.implementation.RxDocumentClientImpl:3725) [2021-03-05 14:06:31,853] INFO [Consumer clientId=connector-consumer-cosmosdb-sink-0, groupId=connect-cosmosdb-sink] Revoke previously assigned partitions products-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307) [2021-03-05 14:06:31,853] INFO [Consumer clientId=connector-consumer-cosmosdb-sink-0, groupId=connect-cosmosdb-sink] Member connector-consumer-cosmosdb-sink-0-60127476-9318-4d62-bfe3-e6db62da93a8 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029) [2021-03-0
Expected Behavior
The long value should be converted in to a string before being used as the id field
Reproduce
Additional Context
consider reject any bad messages (due to data type conversions etc.) and using the deadletter APIs to push these bad messages to the dead-letter topic. awaiting further documentation andor examples on how to push to dead-letter from within a connector