ghdna / athena-express

Athena-Express can simplify executing SQL queries in Amazon Athena AND fetching cleaned-up JSON results in the same synchronous or asynchronous request - well suited for web applications.
https://www.npmjs.com/package/athena-express
MIT License
181 stars 70 forks source link

Concurrently run multiple queries #40

Closed csilzen closed 3 years ago

csilzen commented 4 years ago

Is there a way to run multiple queries concurrently? I'm working with AWS Lambda and trying to run the following code seems to take exponentially longer than when I was simply using just one latitude/longitude point, so I'm unsure of how else to set it up.


const AthenaExpress = require("athena-express");
const aws = require("aws-sdk");

const awsCredentials = {
     region: "us-east-1",
     apiVersion: '2017-05-18'
};
aws.config.update(awsCredentials);
const athenaExpressConfig = {
    aws,
    db: 'geo_db',
    getStats: true
}

const athenaExpress = new AthenaExpress(athenaExpressConfig);
const athena = new aws.Athena(awsCredentials);

// get address loc results from geocoder
// let lat = 40.705220
// let long = -74.006801

let distInDeg = 0.0444 // TODO how to calculate distance FROM mi to deg accurately

let array = [ [ '32.5737769', '116.01714366' ],
  [ '25.40980023', '118.15779794' ],
  [ '68.2425492', '80.14816641' ],
  [ '60.65997057', '150.80291157' ],
  [ '14.93777512', '103.29848751' ],
  [ '24.74481687', '57.31339769' ],
  [ '17.60162305', '104.75987097' ],
  [ '31.94638762', '82.73833911' ],
  [ '32.04597091', '98.85868219' ],
  [ '22.02738675', '83.87266246' ],
  [ '36.66230105', '99.70797053' ],
  [ '38.07005703', '136.95255446' ],
  [ '36.45583181', '110.92034934' ],
  [ '6.27120166', '97.79602605' ],
  [ '32.86491109', '127.92750869' ],
  [ '9.46995947', '111.93653337' ],
  [ '8.89614985', '106.65708803' ],
  [ '27.76790032', '114.46990861' ],
  [ '23.11023319', '81.40803655' ],
  [ '44.75239327', '93.53746377' ],
  [ '34.57401632', '119.75627289' ],
  [ '39.40482007', '86.24075917' ],
  [ '35.01868142', '109.06331215' ],
  [ '37.79967845', '104.80379403' ],
  [ '45.06082768', '146.65664048' ],
  [ '36.71050513', '147.79287045' ],
  [ '52.51063104', '94.85995256' ],
  [ '27.89160903', '119.00969158' ],
  [ '5.44637921', '115.77533073' ],
  [ '30.88716498', '57.88946846' ],
  [ '51.68224382', '57.8178426' ],
  [ '68.67037526', '58.11614626' ],
  [ '62.70190445', '113.66472191' ],
  [ '6.04868292', '84.51616843' ],
  [ '42.02524891', '68.90058733' ],
  [ '20.91679046', '79.39269098' ],
  [ '19.45258365', '79.86085812' ],
  [ '53.93058685', '46.72946506' ],
  [ '65.83600016', '101.24783803' ],
  [ '26.71669512', '84.26723162' ],
  [ '25.79181528', '119.61066606' ],
  [ '36.62770405', '137.27113857' ] ]

exports.handler = event => {
    async function getAllQueries() {
        try {

            let data = await Promise.all (

                array.map((pointArr) => {

                    [`select count(*) from db1
                        WHERE ST_CONTAINS(ST_BUFFER(ST_POINT(${pointArr[0]}, ${pointArr[1]}), ${distInDeg}), ST_POINT(latitude, longitude))`,

                    `SELECT tags['amenity'], count(*) from "default"."planet"
                        WHERE type = 'node'
                        and tags['amenity'] in ('restaurant', 'bar', 'bank', 'cafe', 'fast_food')
                        and ST_Contains(ST_BUFFER(ST_POINT(${pointArr[0]}, ${pointArr[1]}), ${distInDeg}), st_point(lat, lon))
                        group by tags['amenity']`
                    ].map((query)=> {
                        athenaExpress
                        .query(query)
                        .then(results => {
                            console.log('RESULTS', results);
                            results.json()
                        })
                        .catch(error => {
                            console.log(error);
                        });
                    })
                })
            )

            console.log('DATA', data);
            return (data);
        } catch (error) {
            console.log(error)
            throw (error)
        }
    }

    getAllQueries();
};
ghdna commented 4 years ago

Yeah athena often runs into its own throttling limits. One thing you could try is setting the waitForResults flag as false in the constructor. This will return the QueryExecutionId for each query executed in Athena, which will be much faster than waiting for query results. You can then pass this QueryExecutionId back into athena-express after waiting a few arbitrary seconds. The overall time will likely remain the same as before, but you can manage this better and as a bonus - your Lambda costs will be lower as you won't be waiting for queries to finish.