wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 191 forks source link

Changing avro schema in schemaRetriever #124

Open mukund26 opened 6 years ago

mukund26 commented 6 years ago

I am accessing postgres data in kafka connect and i want to only access some part of data to send it to bigQuery. I want to know where your code takes data from kafka topic and gives the schema to sink Connector. Can someone tell me which files can i modify to change the actual schema and send updated schema to Google Big Query.

mukund26 commented 6 years ago

Do I need to update the jar package of kafka connector and kafka confluent ?

C0urante commented 6 years ago

@mukund26 you may be able to use an ExtractField SMT to alter the data that the connector writes to BigQuery. Can you give a concrete example of what you're trying to accomplish (i.e., what does the data look like in Kafka and what would you like to see in BigQuery)?

mukund26 commented 6 years ago

@C0urante data looks like this : {u'source': {u'name': u'full', u'lsn': 146716754490, u'last_snapshot_record': None, u'txId': 11059, u'version': u'0.7.5', u'snapshot': None, u'ts_usec': 1530220086573566000}, u'ts_ms': 1530220086599, u'after': {u'b_date': 51927, u'id': 23, u'name': u'dsd'}, u'op': u'c', u'before': None}

but i only want {u'b_date': 51927, u'id': 23, u'name': u'dsd'} from u'after' field

C0urante commented 6 years ago

@mukund26 hmmm... if you're only working with a single topic and the input/output schema for the data is unlikely to change, you could try an SMT like this (to be included in the config for the connector):

transforms=extractAfterField
transforms.extractAfterField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractAfterField.field=after

Does that suit your needs?

mukund26 commented 6 years ago

@C0urante I tried SMTs but they are not working ... is there an alternative using the schemaRetriever class modifications ? I want to pass the json of only "after" to BigQuerySinkConnector

mukund26 commented 6 years ago

I would also want to know which class serves as main class if I want to create jar file for each package inside the connectors directory.

C0urante commented 6 years ago

@mukund26 can you provide more information about what's not working with the SMT-based approach?

I'm not sure an approach involving a custom SchemaRetriever is actually feasible. As far as I can recall, that API is used for determining create/update behavior for schemas of BigQuery tables and won't actually affect how data from Kafka is converted into BigQuery format.

As far as main classes go--there is no main class in this project; the main class is located in the Kafka Connect framework, which in turn can then invoke this connector.

mukund26 commented 6 years ago

@C0urante I added the same SMT as you gave in the properties file but the SMT did not pass only the required field. All fields entered the bigQuery.

Which class is used to deserialise the object so that i can manipulate the object by itself.