jcustenborder / kafka-connect-solr

Kafka Connect connector for writing to Solr.
Apache License 2.0
43 stars 28 forks source link

What is the JSON format used to add new rows to Solr #30

Open KabDeveloper opened 4 years ago

KabDeveloper commented 4 years ago

HI,

Can you give an example of schema/payload used in JSON to add new rows to Solr please ?

Thank you

cwsusa commented 4 years ago

Do you want a schema or a payload? I use the connector in schemaless mode. More information about solr and schemaless adds/updates can be found at

Using this supported mechanism I can add a record(s) that is an ordered json representation of a solr document just by placing it in the "value" field of a "record".

If you were to curl a topic to kafka, it would look like: ' "records": [ { "value": } ] } '

where SOLR_DOC_PLACE holder is a properly validated JSON object; you can, as the json shows batch multiple documents in this manner.

If you use apache kafka-producer it would look like

ProducerRecord<String, String> prec = new ProducerRecord<String, String>(topicName, postData); where postData is a properly serialized representation of your SolrDocument.

KabDeveloper commented 4 years ago

@cwsusa Thank you for your answer.

Unfortunately my question is related to the format of message that must be send (Payload).

moisesrc13 commented 4 years ago

@cwsusa Have this connector been tested using a schema for the topic?. If so, could you please share a topic schema example (for value & key), and also provide an example of the producer message that meet the schema?. Thanks.

moisesrc13 commented 4 years ago

@cwsusa @jcustenborder My collection name and topic is moises-test I'm using HTTP connector, with no auth. And this is the schema for my topic value

{
  "type": "record",
  "name": "value_moises_test",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "url",
      "type": "string"
    },
    {
      "name": "title_en",
      "type": "string"
    },
    {
      "name": "uid",
      "type": "string"
    }
  ]
}

This fields are the only ones required in my solr schema (using classic schema)

However, I'm getting the following error when sending a message to my topic. What is the value converter class I should be using?. Thanks

[2019-12-18 19:21:49,947] ERROR Error encountered in task w3sSolrConnector-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='moises-test', partition=0, offset=9, timestamp=1576696909909, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic moises-test to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2019-12-18 19:21:50,448] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-w3sSolrConnector-0] Cluster ID: X_y19GYvRgCZ2XnQ2LrROQ (org.apache.kafka.clients.Metadata)
jcustenborder commented 4 years ago

@moisesrc13 I think there is a little bit of confusion here. I think there is some confusion in how data is stored in Kafka. If you are using Avro, use Avro end to end. If you are using JSON do the same. Kafka Connect uses something called a converter. It's a serializer and deserializer for data going into kafka connect. The exception above is because you wrote data to Kafka as JSON and then configured Kafka Connect saying that data was AVRO. You should be using the JSON converter. There are a couple considerations you need to keep in mind when using Kafka Connect and Json. You basically pick a format and stick with that format. I personally use schema-registry because it gives me strongly typed records. Kafka Connect has it's own idea of a schema and that is different than the schema Solr uses. Using JSON with Kafka Connect you do not have schema information tied to a field. Take this for example.

{
  "first_name":"jeremy",
  "date":"2019-10-12",
}

I have no real way of knowing that the date field is a date when it's stored as JSON. The JSON converter reads the json object and this gets returned as a string. Schemaless JSON doesn't have a way to say what the field is. This is what schema-registry does for you. Formatted properly we would know that the date field is a date.

Now all this connector does is loop through all of the records and create a SolrInputDocument based on the record it's given. Then posting that batch to the target system.

lets assume that your solr schema is this:

id: text (key)
url: text
title_en: text
uid: text

Using json you would want to write this:

Key asdfwasdfasd Value

{
"id": "asdfwasdfasd",
"url": "http://asdfasd",
"title_en": "asdfasdf asdfasdf",
"uid": "asdfas"
}

The key is that you keep the same format every where.

moisesrc13 commented 4 years ago

@jcustenborder thanks for the reply. Yes, I left the avro schema and was sending a JSON object to the kafka topic. I'm testing first with JSON as this is my input for a solr document, so I changed the value converter class to org.apache.kafka.connect.json.JsonConverter and added the property value.converter.schemas.enable=false so my connector config looks like

{
  "value.converter.schemas.enable": "false",
  "name": "mySolrConnector",
  "connector.class": "com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector",
  "tasks.max": "1",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "moises-test"
  ],
  "solr.commit.within": "10000",
  "solr.delete.documents.enabled": "true",
  "solr.url": "http://MY-SERVER:8000/",
  "solr.thread.count": "1"
}

Then based on the log I got a version compatibility error Invalid version (expected 2, but 60) and I fixed this by usin solr-solrj-7.7.0.jar in the connector, to be the same as the one from my solr server. I also had to add noggit-0.5.jar And after all, I'm now stucked with the error

[2019-12-19 03:53:25,119] ERROR error (org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient)
java.lang.IllegalStateException: Connection pool shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34)
    at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:191)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:267)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient$Runner.sendUpdateStream(ConcurrentUpdateSolrClient.java:349)
    at org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient$Runner.run(ConcurrentUpdateSolrClient.java:183)
    at org.apache.solr.common.util.ExecutorUtil$MDCAwareThreadPoolExecutor.lambda$execute$0(ExecutorUtil.java:209)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Hope you have seen this before and know how to fix it

jcustenborder commented 4 years ago

@moisesrc13 Is there anything else in the logs? What about on the solr server?

moisesrc13 commented 4 years ago

@jcustenborder this is the complete error

value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2019-12-19 04:26:17,726] ERROR error (org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient)
java.lang.IllegalStateException: Connection pool shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34)
    at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:191)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:267)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient$Runner.sendUpdateStream(ConcurrentUpdateSolrClient.java:349)
    at org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient$Runner.run(ConcurrentUpdateSolrClient.java:183)
    at org.apache.solr.common.util.ExecutorUtil$MDCAwareThreadPoolExecutor.lambda$execute$0(ExecutorUtil.java:209)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2019-12-19 04:26:19,201] INFO WorkerSinkTask{id=w3sSolrConnector-0} Committing offsets asynchronously using sequence number 17: {moises-test-0=OffsetAndMetadata{offset=21, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask)

I don't think this is even reaching my solr server bcz don't see any error at the server side