Open sontalra opened 4 years ago
Here's what I get when I decode the base64 data and interpret it as a UTF-8 string:
qldbStreamArnrecordTypepayloadblockAddressstrandIdsequenceNotransactionIdblockTimestampblockHashentriesHashpreviousBlockHashentriesHashListtransactionInfostatementsstatementstartTimestatementDigestarn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxBLOCK_SUMMARY9xSxw9hMRXaIdt0OYtT4Yx"6rq59bBL7F22iPiC7txCX3k^wv~OK^bbN2@uQGesgU1>6XI>M^zdC
2g-6~ aoU/7eT2 A&qJZn7R
>$0VEk=UbpK|q?i x\Vo0eSELECT * FROM information_schema.user_tablesk<8" lt8qldbStreamArnrecordTypepayloadblockAddressstrandIdsequenceNotransactionIdblockTimestampblockHashentriesHashpreviousBlockHashentriesHashListtransactionInfostatementsstatementstartTimestatementDigestdocuments0bGI3vhFFpbEZJXFislprCtableNametableIdrevisionSummarieshashdocumentIdarn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxBLOCK_SUMMARY9xSxw9hMRXaIdt0OYtT4Yx"6rq59bBL3vqFUHWf224rr8k,qr}C!*Z-k[NhDYE|O_YP+0}zY522Kwv~OK^bbN2@uQGes|Ar~ e@X(fKlNfy:vWG7s
n3_F.FJ)q<J1%pyzN?~.\u&[
INSERT INTO VirtualAccountTransactions { 'VirtualAccountId': 'id5', 'Amount': 10, 'TransactionDate': '2020-06-02T16:30:55.054Z' }
k=FKpN,@tI5S$rxVirtualAccountTransactions2Zgr3v47S9GDsAd8stDLQ9 y:vWG7s
n3_F.0bGI3vhFFpbEZJXFislprCqldbStreamArnrecordTypepayloadtableInfotableNametableIdrevisionblockAddressstrandIdsequenceNohashdataVirtualAccountIdAmountTransactionDatemetadataidtxTimetxIdarn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxREVISION_DETAILSVirtualAccountTransactions2Zgr3v47S9GDsAd8stDLQ99xSxw9hMRXaIdt0OYtT4Yx"y:vWG7s
n3_F.id5!
This is not Ion text or Ion binary, hence the failures raised by the Ion parser.
Hi @tgregg Thanks for your reply.
The data is received as is from the logs when QLDB stream(changes to QLDB table) raises a Kinesis Event. As per AWS documentation the data format is Amazon ION. https://docs.aws.amazon.com/qldb/latest/developerguide/streams.records.html
I have just created a stream in Amazon Console and expect the data to be in Ion format. There is no manipulation of data anywhere.
Unfortunately, this blob of data is not in the Ion format.
I recommend reaching out to the QLDB support team. They'll be better able to answer questions about the data emitted by QLDB.
@sontalra Were you able to fix this issue? I am working on a similar pipeline and facing the exact same issue.
Hey @sontalra QLDB delivers stream records to Kinesis as binary-encoded Ion objects and you need to use the Kinesis data stream directly with the Load
function. Similar to this:
var ionDatagram = IonLoader.Default.Load(record.Kinesis.Data);
var ionData = ionDatagram.GetElementAt(0);
//take the field record type from the body
Console.WriteLine($"record Type: {ionData.GetField("recordType").StringValue}");
Let me know if this works for you.
Was there any resolution found for this? We are facing a similar issue, in our case, the stream is parsed directly into Ion values as @juniortads suggested, but our Lambda is showing intermittent behavior. The Ion Loader is able to load sometimes, but fails to load the other time. Our Kinesis Stream is set to only receive from QLDB stream, so it's guaranteed that the data in kinesis stream is a QLDB stream.
It seems like QLDB stream is sending non-Ion value data through the Kinesis stream, the doc for QLDB states that the stream is ion binary data, but for some reason, that doesn't seem to be true.
@ashritbista Below worked for me in Spark:
kinesis = ( spark.readStream.format("kinesis") .option("streamName", "your kinesis stream name") .option("region", "your AWS region") .option("roleArn", your role arn) .option("initialPosition", "earliest") .load() )
def binary_json_encode(payload): ion_record = ion.load(BytesIO(payload)) return json.dumps(ion_record, cls=IonToJSONEncoder)
convert_UDF_bin2json = udf(lambda z: binary_json_encode(z), StringType())
json_df = kinesis.withColumn("json_data", convert_UDF_bin2json(kinesis["data"]))
Hi Team,
I'm writing a lambda for kinesis event processing on the changes to QLDB table. Similar to https://github.com/aws-samples/amazon-qldb-streams-dmv-sample-lambda-python
I'm using "Amazon.QLDB.Driver" Version="1.0.0-rc.1" which has a dependency on Amazon.IonDotnet 1.0.0.
My Lambda is receiving the kinesis event as:
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "xxxxx", "sequenceNumber": "xxxxx", "data": "CkA3N2I1MTNlYzc2N2U0ZjRiMWM4Y2QzZWE1ZTg1NjI2MmViNGUzMjQwZGExMDE1YWM3NWRhYTE1MTQ3Yzk2NTczCkAyY2Q0NzE3MjdkNDMyMTJhZDdiZjVhMmQ4MjZiZTI4ODViYzVhZWI2NGU2ODQ0ZTE1OTkxY2EwMTkwOGJkODQ1CkAwNmQ4MGNiOTc5M2E3NjE0NTcwNTdmZjM0NzM3YTUwNjczMGRiNmY4ZDBhNTZlMzNkZGMzMWE1ZjQ2ODEyZWE2GgUIGgUBAQEBcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vdHJhbnNhY3Rpb25JZGJsb2NrVGltZXN0YW1wYmxvY2tIYXNoZW50cmllc0hhc2hwcmV2aW91c0Jsb2NrSGFzaGVudHJpZXNIYXNoTGlzdHRyYW5zYWN0aW9uSW5mb3N0YXRlbWVudHNzdGF0ZW1lbnRzdGFydFRpbWVzdGF0ZW1lbnREaWdlc3QDYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeEJMT0NLX1NVTU1BUlkCOXhTeHc5aE1SWGFJZHQwT1l0VDRZeCIBFzZycTU5YkJMN0YyMmlQaUM3dHhDWDNrDwJedxN2fk9LHF5iYk4yQBAVdVFHZXNnVTE+NlhJBhY+TV56ZEMKMmctNhh+CWFvVS83ZVQyCQNBJnFKHlpuN1IMCAoPPh8kMFZFawM9VWJwS3xxFT9pIHhcVn9vMGVTRUxFQ1QgKiBGUk9NIGluZm9ybWF0aW9uX3NjaGVtYS51c2VyX3RhYmxlc2sPAjw4IgkRbBIHdBIXOEoaCAgBGggBAgICcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vdHJhbnNhY3Rpb25JZGJsb2NrVGltZXN0YW1wYmxvY2tIYXNoZW50cmllc0hhc2hwcmV2aW91c0Jsb2NrSGFzaGVudHJpZXNIYXNoTGlzdHRyYW5zYWN0aW9uSW5mb3N0YXRlbWVudHNzdGF0ZW1lbnRzdGFydFRpbWVzdGF0ZW1lbnREaWdlc3Rkb2N1bWVudHMwYkdJM3ZoRkZwYkVaSlhGaXNscHJDdGFibGVOYW1ldGFibGVJZHJldmlzaW9uU3VtbWFyaWVzaGFzaGRvY3VtZW50SWQFYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeEJMT0NLX1NVTU1BUlkEOXhTeHc5aE1SWGFJZHQwT1l0VDRZeCIBGDZycTU5YkJMM3ZxRlVIV2YyMjRycjhrDwIscXJ9QyEqWi1rW05oRFkBRXxPX1lQKzB9elkdNRcyMkt3E3Z+T0scXmJiTjJAEBV1UUdlcwEeHHxBHxtyfiAbZUBYKGZLD2xOZgYMeTp2FFcFf0c3BnMKbjMaX0YuRkoHKXE8SjElcAN5elIIEgdOFBY/fi5cdSZbAQEBAQpJTlNFUlQgSU5UTyBWaXJ0dWFsQWNjb3VudFRyYW5zYWN0aW9ucyB7ICdWaXJ0dWFsQWNjb3VudElkJzogJ2lkNScsICdBbW91bnQnOiAxMCwgJ1RyYW5zYWN0aW9uRGF0ZSc6ICcyMDIwLTA2LTAyVDE2OjMwOjU1LjA1NFonIH0Kaw8CPUZLFHBOLB9AdEk1UyRyeFZpcnR1YWxBY2NvdW50VHJhbnNhY3Rpb25zMlpncjN2NDdTOUdEc0FkOHN0RExROSAGDHk6dhRXBX9HNwZzCm4zGl9GLjBiR0kzdmhGRnBiRVpKWEZpc2xwckMaBAgCGgQBAQEBcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkdGFibGVJbmZvdGFibGVOYW1ldGFibGVJZHJldmlzaW9uYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vaGFzaGRhdGFWaXJ0dWFsQWNjb3VudElkQW1vdW50VHJhbnNhY3Rpb25EYXRlbWV0YWRhdGFpZHR4VGltZXR4SWQCYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeFJFVklTSU9OX0RFVEFJTFMBVmlydHVhbEFjY291bnRUcmFuc2FjdGlvbnMyWmdyM3Y0N1M5R0RzQWQ4c3RETFE5ATl4U3h3OWhNUlhhSWR0ME9ZdFQ0WXgiARgGDHk6dhRXBX9HNwZzCm4zGl9GLmlkNSEKMjAyMC0wNi0wMlQxNjozMDo1NS4wNTRaMGJHSTN2aEZGcGJFWkpYRmlzbHByQyBrDwI2cnE1OWJCTDN2cUZVSFdmMjI0cnI4MhpIFXF+OQ==", "approximateArrivalTimestamp": 1592877325.432 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-xxxx", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::xxx:role/TransactionNotifierLambdaRole", "awsRegion": "ap-southeast-2", "eventSourceARN": "arn:aws:kinesis:ap-southeast-2:xxx:stream/TransactionNotificationStream" } ] }
When I am trying to load the datagram using the IonLoader with decoded Kinesis.Data using base64, I'm getting below error:
Invalid state: IonException at Amazon.IonDotnet.Internals.Text.RawTextReader.ParseNext() at Amazon.IonDotnet.Internals.Text.RawTextReader.HasNext() at Amazon.IonDotnet.Internals.Text.UserTextReader.HasNext() at Amazon.IonDotnet.Internals.Text.RawTextReader.MoveNext() at Amazon.IonDotnet.Internals.PrivateIonWriterBase.WriteValues(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.WriteDatagram(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.Load(Byte[] data) at Serverless.Handlers.TransactionEventHandler.Add(Stream stream, ILambdaContext contexts) in /home/runner/work/poc-ledger/poc-ledger/Serverless/Handlers/TransactionEventHandler.cs:line 52 at lambda_method(Closure , Stream , Stream , LambdaContextInternal )
The code is as below:
var decodedData = Convert.FromBase64String(record.Kinesis.Data);
var datagram = IonLoader.WithReaderOptions(new ReaderOptions { Format = ReaderFormat.Detect, Encoding = Encoding.ASCII }).Load(decodedData);
And if I dont decode the Kinesis.Data and pass the string directly to loader I'm getting below error:
Numeric value [84] followed by invalid character: 109: FormatException at Amazon.IonDotnet.Internals.Text.TextScanner.FinishLoadNumber(StringBuilder numericText, Int32 c, Int32 token) at Amazon.IonDotnet.Internals.Text.TextScanner.LoadNumber(StringBuilder valueBuffer) at Amazon.IonDotnet.Internals.Text.RawTextReader.LoadTokenContents(Int32 scannerToken) at Amazon.IonDotnet.Internals.Text.RawTextReader.MoveNext() at Amazon.IonDotnet.Internals.PrivateIonWriterBase.WriteValues(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.WriteDatagram(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.Load(String ionText) at Serverless.Handlers.TransactionEventHandler.Add(Stream stream, ILambdaContext contexts) in /home/runner/work/poc-ledger/poc-ledger/Serverless/Handlers/TransactionEventHandler.cs:line 49 at lambda_method(Closure , Stream , Stream , LambdaContextInternal )
I'm not sure how shall I decode the data from kinesis stream using this sdk. Any help really appreciated.
Note: I have used UTF8 decoding as well.