awslabs / aws-athena-query-federation

The Amazon Athena Query Federation SDK allows you to customize Amazon Athena with your own data sources and code.
Apache License 2.0
560 stars 297 forks source link

[QUESTION] Implementing Athena Dynamodb connector in Javascript #754

Open sanatdeshpande1 opened 2 years ago

sanatdeshpande1 commented 2 years ago

I am trying to implement my custom Athena Dynamodb connector in Javascript using Apache Arrow JS for building the schema and tables instead of using the Glue tables. The reason behind this is to allow custom data types such as a Map<Struct> to be supported by the connector, which currently is not supported by the dynamodb connector offered by Athena.

So far, I have been going through the Dynamodb Java connector code and understanding and implementing the same in Javascript without using Glue to map the tables and fields for the schema that I have created. This helps me solve the problem of having custom data types. I am now able to have a field of a custom data type which can be as complex as a Map<Struct>. I have understood from the existing Java connector that I need to have Metadata handler and Records handler in order to have a working connector. Out of which, I am currently working on handling all the six (PingRequest/Response, ListSchemasRequest/Response, ListTablesRequest/Response, GetTableRequest/Response, GetTableLayoutRequest/Response, GetSplitsRequest/Response) metadata request types.

The sample GetTableLayoutResponse looks something like this:

{
  "@type" : "GetTableLayoutResponse",
  "catalogName" : "test-catalog",
  "tableName" : {
    "schemaName" : "test-schema",
    "tableName" : "test-table"
  },
  "partitions" : {
    "aId" : "test-allocator-id",
    "schema" : "PAEAABAAAAAAAAoADgAGAA0ACAAKAAAAAAADABAAAAAAAQoADAAAAAgABAAKAAAACAAAAAgAAAAAAAAABAAAAMAAAAB0AAAAPAAAAAQAAABi////FAAAABQAAAAYAAAAAAAFARQAAAAAAAAAAAAAAAQABAAEAAAABAAAAGNvbDMAAAAAlv///xQAAAAUAAAAFAAAAAAAAgEYAAAAAAAAAAAAAACE////AAAAASAAAAADAAAAZGF5AMr///8UAAAAFAAAABQAAAAAAAIBGAAAAAAAAAAAAAAAuP///wAAAAEgAAAABQAAAG1vbnRoABIAGAAUABMAEgAMAAAACAAEABIAAAAUAAAAFAAAABwAAAAAAAIBIAAAAAAAAAAAAAAACAAMAAgABwAIAAAAAAAAASAAAAAEAAAAeWVhcgAAAAA=",
    "records" : "LAEAABQAAAAAAAAADAAWAA4AFQAQAAQADAAAAMgAAAAAAAAAAAADABAAAAAAAwoAGAAMAAgABAAKAAAAFAAAAKgAAAAKAAAAAAAAAAAAAAAJAAAAAAAAAAAAAAACAAAAAAAAAAgAAAAAAAAAKAAAAAAAAAAwAAAAAAAAAAIAAAAAAAAAOAAAAAAAAAAoAAAAAAAAAGAAAAAAAAAAAgAAAAAAAABoAAAAAAAAACgAAAAAAAAAkAAAAAAAAAACAAAAAAAAAJgAAAAAAAAALAAAAAAAAADIAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAKAAAAAAAAAAAAAAAAAAAACgAAAAAAAAAAAAAAAAAAAAoAAAAAAAAAAAAAAAAAAAAKAAAAAAAAAAoAAAAAAAAAAAAAAP8DAAAAAAAA4AcAAOEHAADiBwAA4wcAAOQHAADlBwAA5gcAAOcHAADoBwAA6QcAAP8DAAAAAAAAAQAAAAIAAAADAAAABAAAAAUAAAAGAAAABwAAAAgAAAAJAAAACgAAAP8DAAAAAAAAAQAAAAIAAAADAAAABAAAAAUAAAAGAAAABwAAAAgAAAAJAAAACgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
  }
}

I need help with implementing this response as I am trying to figure out how the schema and records in the partitions property in the sample response are getting serialized. I also wanted to know how I can get Athena to trigger SplitsRequest and ReadRecordsRequest. My understanding regarding this is that GET_TABLE_LAYOUT, GET_SPLITS and READ_RECORDS are getting triggered in that respective order when I run a query on the Athena console. However, so far I am only getting a table layout request in the CloudWatch logs and I am handling the request and generating a response (incorrectly I guess) which is returned and logged on to the Cloudwatch.

cadnce commented 2 years ago

Answering this not as a contributor, but based upon personal experience.

The data in schema and records are apache-arrow encoded.

Excuse the coding style as i'm not an expert in js. But hopefully the following helps you out and you get the gist

import {  tableFromIPC } from 'apache-arrow';

let sample_message = {
    "@type": "GetTableLayoutResponse",
    "catalogName": "test-catalog",
    "tableName": {
        "schemaName": "test-schema",
        "tableName": "test-table"
    },
    "partitions": {
        "aId": "test-allocator-id",
        "schema": "PAEAABAAAAAAAAoADgAGAA0ACAAKAAAAAAADABAAAAAAAQoADAAAAAgABAAKAAAACAAAAAgAAAAAAAAABAAAAMAAAAB0AAAAPAAAAAQAAABi////FAAAABQAAAAYAAAAAAAFARQAAAAAAAAAAAAAAAQABAAEAAAABAAAAGNvbDMAAAAAlv///xQAAAAUAAAAFAAAAAAAAgEYAAAAAAAAAAAAAACE////AAAAASAAAAADAAAAZGF5AMr///8UAAAAFAAAABQAAAAAAAIBGAAAAAAAAAAAAAAAuP///wAAAAEgAAAABQAAAG1vbnRoABIAGAAUABMAEgAMAAAACAAEABIAAAAUAAAAFAAAABwAAAAAAAIBIAAAAAAAAAAAAAAACAAMAAgABwAIAAAAAAAAASAAAAAEAAAAeWVhcgAAAAA=",
        "records": "LAEAABQAAAAAAAAADAAWAA4AFQAQAAQADAAAAMgAAAAAAAAAAAADABAAAAAAAwoAGAAMAAgABAAKAAAAFAAAAKgAAAAKAAAAAAAAAAAAAAAJAAAAAAAAAAAAAAACAAAAAAAAAAgAAAAAAAAAKAAAAAAAAAAwAAAAAAAAAAIAAAAAAAAAOAAAAAAAAAAoAAAAAAAAAGAAAAAAAAAAAgAAAAAAAABoAAAAAAAAACgAAAAAAAAAkAAAAAAAAAACAAAAAAAAAJgAAAAAAAAALAAAAAAAAADIAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAKAAAAAAAAAAAAAAAAAAAACgAAAAAAAAAAAAAAAAAAAAoAAAAAAAAAAAAAAAAAAAAKAAAAAAAAAAoAAAAAAAAAAAAAAP8DAAAAAAAA4AcAAOEHAADiBwAA4wcAAOQHAADlBwAA5gcAAOcHAADoBwAA6QcAAP8DAAAAAAAAAQAAAAIAAAADAAAABAAAAAUAAAAGAAAABwAAAAgAAAAJAAAACgAAAP8DAAAAAAAAAQAAAAIAAAADAAAABAAAAAUAAAAGAAAABwAAAAgAAAAJAAAACgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
    }
}

let apache_arrow_messge =[
    sample_message.partitions.schema,
    sample_message.partitions.records
].map((buf) => Buffer.from(buf, 'base64'))

let table = tableFromIPC(apache_arrow_messge)

console.table([...table]);

Outputs

┌─────────┬──────┬───────┬─────┬──────┐
│ (index) │ year │ month │ day │ col3 │
├─────────┼──────┼───────┼─────┼──────┤
│    0    │ 2016 │   1   │  1  │ null │
│    1    │ 2017 │   2   │  2  │ null │
│    2    │ 2018 │   3   │  3  │ null │
│    3    │ 2019 │   4   │  4  │ null │
│    4    │ 2020 │   5   │  5  │ null │
│    5    │ 2021 │   6   │  6  │ null │
│    6    │ 2022 │   7   │  7  │ null │
│    7    │ 2023 │   8   │  8  │ null │
│    8    │ 2024 │   9   │  9  │ null │
│    9    │ 2025 │  10   │ 10  │ null │
└─────────┴──────┴───────┴─────┴──────┘
cadnce commented 2 years ago

Oh, and your question about splits

I also wanted to know how I can get Athena to trigger SplitsRequest and ReadRecordsRequest. My understanding regarding this is that GET_TABLE_LAYOUT, GET_SPLITS and READ_RECORDS are getting triggered in that respective order when I run a query on the Athena console.

As explained in the wiki page https://github.com/awslabs/aws-athena-query-federation/wiki/MetadataHandler that ordering is correct. If you're not seeing the requested this means you've not told athena that your table has partitions, or it has deemed that the partitions you've said you have are not relevant.

Order Java Function Call Message Type Why
1 doListSchemas(...) ListSchemaRequest lists available schemas
2 doListTables(...) ListTablesRequest lists available tables in a schema.
3 doGetTable(...) GetTableRequest get the definition of a Table.
4 doGetTableLayout(...) GetTableLayoutRequest provides partition information and optionally performs partition pruning.
5 doGetSplits(...) GetSplitsRequest tells Athena how it can split up and parallelize reads of a Partition.

You should tell it you have paritions in response to the GetTableRequest message type

sanatdeshpande1 commented 2 years ago

Hi, thanks for the reply! I was able to get this working along with all the other required functions.

let apache_arrow_messge =[ sample_message.partitions.schema, sample_message.partitions.records ].map((buf) => Buffer.from(buf, 'base64'))

let table = tableFromIPC(apache_arrow_messge)

My question was how I can do the reverse? I have a table which I need to split into schema and records and then serialize them separately to form the partitions block. I did not find a direct way to do this using Apache Arrow JS and so I had to override some methods in a class to get the desired output.