databricks / spark-avro

Avro Data Source for Apache Spark
http://databricks.com/
Apache License 2.0
539 stars 310 forks source link

Row to AVRO for Streaming Usecase #274

Open sunny1978 opened 6 years ago

sunny1978 commented 6 years ago

Hi I have a use case to convert a Row to AVRO and write those Bytes to Kafka Sink. Kindly help me. I tried this:

Tried This:

RecordBuilder recordBuilder = SchemaBuilder.record(kafkaSinkConfig.getRecordName()) .namespace(kafkaSinkConfig.getRecordNamespace()); Schema schema = SchemaConverters.convertStructToAvro(row.schema(), recordBuilder, kafkaSinkConfig.getRecordNamespace()); GenericData.Record avroRecord = (GenericData.Record) GenericData.get().newRecord(null, schema); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); byte[] data = recordInjection.apply(avroRecord); kafkaProducerNew.send(new ProducerRecord<String, byte[]>(topicName, data));

This works for Flat AVRO schemas. If we have nested, it fails.

Write To Kafka: (Need Help) I am expecting an API like: If you can expose this private method - that will do it I guess https://github.com/databricks/spark-avro/blob/branch-4.0/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala-->converter(row).asInstanceOf[GenericRecord])

GenericRecord avroRecord = AvroOutputWriter.convertToAvro(row) Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); byte[] data = recordInjection.apply(avroRecord); kafkaProducerNew.send(new ProducerRecord<String, byte[]>(topicName, data));

Read from Kafka:

JavaInputDStream<ConsumerRecord<String, byte[]>> messages = KafkaUtils.createDirectStream(jsc,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParamsD)); JavaDStream<byte[]> dStream = messages.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {....}

I cannot find RddUtils.rddToDataFrame. Where is this api? Basically, I want to pass bytes[] and avroSchema json string. Given these two - can I get a GenericRecord back?

sunny1978 commented 6 years ago

@gsolasab, @squito, @gengliangwang @rxin Can some one look in to this request. We have a use case to write AVRO records to Kafka. We read some data from Kafka as String/Ascii, perform some ETL operation like extract zipped content, join on some lookup and finally prepare some nested row DataFrame. Finally write to Kafka as AVRO. In this process I am looking for generic way to convert Row to avro byte[] so that I can write it to Kafka.

sunny1978 commented 6 years ago

My Row Schema:

Row.Schema StructType( StructField(masterReservation,StructType( StructField(cancellationLocalTimestamp,StringType,true), StructField(cancellationNumber,StringType,true), StructField(confirmationNumber,StringType,true), StructField(confirmationUtcDate,StringType,true), StructField(createdTimestamp,StringType,true), StructField(guestProfile,ArrayType(StructType( StructField(guestFirstName,StringType,true), StructField(guestIdentifier,LongType,true), StructField(guestLastName,StringType,true), StructField(guestMember,StructType( StructField(memberIdentifier,StringType,true)),true)),true),true), StructField(imageCommittedSequenceNumber,LongType,true), StructField(imageIdentifier,LongType,true), StructField(imageStatusCode,StringType,true), StructField(imageUtcTimestamp,StringType,true), StructField(pmsConfirmationNumber,StringType,true), StructField(ratePlan,ArrayType(StructType( StructField(priceGridCode,StringType,true), StructField(ratePlanCode,StringType,true), StructField(ratePlanName,StringType,true)),true),true), StructField(reservationActivityType,StringType,true), StructField(reservationChannel,ArrayType(StructType( StructField(channelCode,StringType,true), StructField(channelRoleCode,StringType,true), StructField(channelSubCode,StringType,true), StructField(iataNumber,StringType,true), StructField(vendorCode,StringType,true), StructField(vendorType,ArrayType(LongType,true),true)),true),true), StructField(reservationProduct,StructType(StructField(prod_desc,StringType,true), StructField(rms_prod_class_cd,StringType,true)),true), StructField(reservationProperty,StructType(StructField(brandName,StringType,true), StructField(crsBrandCode,StringType,true), StructField(hotelFactText,ArrayType(StringType,true),true), StructField(propertyChainCode,StringType,true), StructField(propertyCode,StringType,true), StructField(propertyName,StringType,true)),true), StructField(reservationSegment,ArrayType(StructType( StructField(averageBaseRateAmount,StringType,true), StructField(checkInDate,StringType,true), StructField(checkOutDate,StringType,true), StructField(localCurrencyCode,StringType,true), StructField(loyaltyGuestIdentifier,LongType,true), StructField(propertyCode,StringType,true), StructField(ratePlanCode,StringType,true), StructField(reservationHolderGuestIdentifier,LongType,true), StructField(reservationProductOffer,StructType( StructField(businessProductCode,StringType,true), StructField(guestAdultQuantity,LongType,true), StructField(guestChildQuantity,LongType,true), StructField(productQuantity,LongType,true)),true), StructField(segConsolidatedStatusCode,StringType,true), StructField(segmentActionStatusCode,StringType,true), StructField(segmentDailyRate,ArrayType(StructType( StructField(dailyRateEndDate,StringType,true), StructField(dailyRateStartDate,StringType,true), StructField(extraPersonAfterTaxAmount,StringType,true), StructField(occupancyBaseAfterTaxAmount,StringType,true), StructField(occupancyBaseRateAmount,StringType,true), StructField(totalServiceChargeAmount,StringType,true), StructField(totalTaxAmount,StringType,true)),true),true), StructField(segmentGroupCode,StringType,true), StructField(segmentIdentifier,LongType,true), StructField(segmentPmsStatusCode,StringType,true), StructField(segmentSequenceNumber,LongType,true)),true),true)),true))