shelfio / dynamodb-parallel-scan

Scan large DynamoDB tables faster with parallelism
MIT License
71 stars 10 forks source link

Example for parallelScanAsStream #266

Open robertkossendey opened 3 weeks ago

robertkossendey commented 3 weeks ago

Hello,

I am running parallelScanAsStream to dump a whole DynamoDB table to S3. It is awfully slow though, slower than running sequential scan. Running it locally I am getting results like:

Config: {"concurrency":16,"chunkSize":1000,"highWaterMark":1000}
Total items: 167744, Duration: 92.73s
Throughput: 1808.91 items/second

Config: {"concurrency":32,"chunkSize":2000,"highWaterMark":2000}
Total items: 167744, Duration: 108.71s
Throughput: 1543.01 items/second

Config: {"concurrency":64,"chunkSize":5000,"highWaterMark":5000}
Total items: 167744, Duration: 109.73s
Throughput: 1528.67 items/second

My code looks the following:

const agent = new https.Agent({
    maxSockets: 100
});

const dynamodbClient = new DynamoDBClient({
    requestHandler: new NodeHttpHandler({
        httpsAgent: agent
    })
});

const stream = await parallelScanAsStream(
        { TableName: event.tableName },
        {
            concurrency: 100,
            chunkSize: 200,
            client: dynamodbClient
        }
    );

    for await (const items of stream) {
        const records = items.map((item: any) => {
            return {
                Data: Buffer.from(
                    JSON.stringify(item,
                        (_, v) => (typeof v === 'bigint' ? v.toString() : v)
                    ) + '\n'
                )
            };
        });

        await aysncProcessingFunction(records);
    }
}

The asyncProcessing function itself is not the problem. Does anyone see something obvious that I am doing wrong? Or can someone provide me some examples?

vladholubiev commented 3 weeks ago
  1. Is your partition key unique enough? 2024-09-23T17 43 19-DvXkaOjS@2x

  2. If it's for S3 export, why not use the native DDB -> S3 export AWS feature?

robertkossendey commented 3 weeks ago

Thanks for the quick answer!

  1. our Partition Keys / Range Keys contain UUIDs. Each Partition Key has roughly 10 Range Key Variations.

  2. is not really an option because you can only export the Data in Ion / DynamoDB JSON notation, and we want regular unmarshalled JSONs..

vladholubiev commented 3 weeks ago

Another helpful thing to do is running the script with the DEBUG=* environment variable set. It will print more detailed statistics about the scanning progress, allowing you to analyze the results yourself or share them here if you're comfortable doing so.

robertkossendey commented 3 weeks ago

Log excerpt:

  ddb-parallel-scan (93%) [0/16] [time:3555ms] [fetched:341] [total (fetched/scanned/table-size):149194/155765/167708] +31ms
  ddb-parallel-scan (93%) [10/16] [time:3613ms] [fetched:342] [total (fetched/scanned/table-size):150228/156107/167708] +28ms
  ddb-parallel-scan (93%) [12/16] [time:3271ms] [fetched:346] [total (fetched/scanned/table-size):150228/156453/167708] +518ms
  ddb-parallel-scan (93%) [8/16] [time:2714ms] [fetched:241] [total (fetched/scanned/table-size):150228/156694/167708] +23ms
  ddb-parallel-scan (94%) [14/16] [time:3537ms] [fetched:352] [total (fetched/scanned/table-size):151163/157046/167708] +159ms
  ddb-parallel-scan (94%) [6/16] [time:3840ms] [fetched:337] [total (fetched/scanned/table-size):152204/157383/167708] +46ms
  ddb-parallel-scan (94%) [9/16] [time:2025ms] [fetched:182] [total (fetched/scanned/table-size):153235/157565/167708] +69ms
  ddb-parallel-scan (94%) [15/16] [time:3279ms] [fetched:337] [total (fetched/scanned/table-size):153417/157902/167708] +193ms
  ddb-parallel-scan (94%) [0/16] [time:1176ms] [fetched:107] [total (fetched/scanned/table-size):153417/158009/167708] +140ms
  ddb-parallel-scan (94%) [11/16] [time:3548ms] [fetched:343] [total (fetched/scanned/table-size):153524/158352/167708] +54ms
  ddb-parallel-scan (95%) [2/16] [time:3083ms] [fetched:347] [total (fetched/scanned/table-size):154567/158699/167708] +47ms
  ddb-parallel-scan (95%) [13/16] [time:3665ms] [fetched:353] [total (fetched/scanned/table-size):154567/159052/167708] +101ms
  ddb-parallel-scan (95%) [4/16] [time:3580ms] [fetched:339] [total (fetched/scanned/table-size):154567/159391/167708] +28ms
  ddb-parallel-scan (95%) [14/16] [time:921ms] [fetched:94] [total (fetched/scanned/table-size):154567/159485/167708] +244ms
  ddb-parallel-scan (95%) [12/16] [time:1625ms] [fetched:180] [total (fetched/scanned/table-size):154661/159665/167708] +521ms
  ddb-parallel-scan (95%) [3/16] [time:3315ms] [fetched:345] [total (fetched/scanned/table-size):155530/160010/167708] +155ms
  ddb-parallel-scan (96%) [5/16] [time:2884ms] [fetched:337] [total (fetched/scanned/table-size):155530/160347/167708] +106ms
  ddb-parallel-scan (96%) [1/16] [time:3062ms] [fetched:344] [total (fetched/scanned/table-size):156558/160691/167708] +124ms
  ddb-parallel-scan (96%) [10/16] [time:2573ms] [fetched:344] [total (fetched/scanned/table-size):157593/161035/167708] +45ms
  ddb-parallel-scan (96%) [11/16] [time:1419ms] [fetched:172] [total (fetched/scanned/table-size):157593/161207/167708] +48ms
  ddb-parallel-scan (96%) [7/16] [time:2709ms] [fetched:349] [total (fetched/scanned/table-size):157765/161556/167708] +29ms
  ddb-parallel-scan (97%) [6/16] [time:2541ms] [fetched:347] [total (fetched/scanned/table-size):157765/161903/167708] +638ms
  ddb-parallel-scan (97%) [5/16] [time:1018ms] [fetched:157] [total (fetched/scanned/table-size):157765/162060/167708] +134ms
  ddb-parallel-scan (97%) [15/16] [time:2447ms] [fetched:338] [total (fetched/scanned/table-size):157922/162398/167708] +33ms
  ddb-parallel-scan (97%) [4/16] [time:2290ms] [fetched:345] [total (fetched/scanned/table-size):157922/162743/167708] +214ms
  ddb-parallel-scan (97%) [2/16] [time:2451ms] [fetched:342] [total (fetched/scanned/table-size):157922/163085/167708] +31ms
  ddb-parallel-scan (97%) [13/16] [time:2417ms] [fetched:351] [total (fetched/scanned/table-size):158954/163436/167708] +68ms
  ddb-parallel-scan (98%) [1/16] [time:1662ms] [fetched:259] [total (fetched/scanned/table-size):158954/163695/167708] +422ms
  ddb-parallel-scan (98%) [3/16] [time:2109ms] [fetched:351] [total (fetched/scanned/table-size):159213/164046/167708] +217ms
  ddb-parallel-scan (98%) [3/16] [time:300ms] [fetched:64] [total (fetched/scanned/table-size):160253/164110/167708] +300ms
  ddb-parallel-scan (98%) [7/16] [time:2179ms] [fetched:348] [total (fetched/scanned/table-size):160317/164458/167708] +122ms
  ddb-parallel-scan (98%) [2/16] [time:1221ms] [fetched:210] [total (fetched/scanned/table-size):161362/164668/167708] +92ms
  ddb-parallel-scan (98%) [15/16] [time:1494ms] [fetched:337] [total (fetched/scanned/table-size):161572/165005/167708] +28ms
  ddb-parallel-scan (99%) [6/16] [time:1708ms] [fetched:341] [total (fetched/scanned/table-size):162584/165346/167708] +47ms
  ddb-parallel-scan (99%) [10/16] [time:2497ms] [fetched:354] [total (fetched/scanned/table-size):162584/165700/167708] +74ms
  ddb-parallel-scan (99%) [4/16] [time:1536ms] [fetched:351] [total (fetched/scanned/table-size):163624/166051/167708] +135ms
  ddb-parallel-scan (99%) [15/16] [time:263ms] [fetched:47] [total (fetched/scanned/table-size):164659/166098/167708] +7ms
  ddb-parallel-scan (99%) [7/16] [time:536ms] [fetched:225] [total (fetched/scanned/table-size):164706/166323/167708] +153ms
  ddb-parallel-scan (99%) [13/16] [time:1705ms] [fetched:350] [total (fetched/scanned/table-size):164931/166673/167708] +108ms
  ddb-parallel-scan (99%) [10/16] [time:664ms] [fetched:195] [total (fetched/scanned/table-size):165985/166868/167708] +261ms
  ddb-parallel-scan (100%) [4/16] [time:547ms] [fetched:168] [total (fetched/scanned/table-size):166180/167036/167708] +19ms
  ddb-parallel-scan (100%) [13/16] [time:407ms] [fetched:168] [total (fetched/scanned/table-size):166348/167204/167708] +127ms
  ddb-parallel-scan (100%) [6/16] [time:916ms] [fetched:348] [total (fetched/scanned/table-size):166516/167552/167708] +32ms
  ddb-parallel-scan (100%) [6/16] [time:304ms] [fetched:194] [total (fetched/scanned/table-size):167552/167746/167708] +304ms

Note: this is running locally, so latency is expected. But also when I run it in a Lambda it is really slow.

vladholubiev commented 3 weeks ago

Is your table using provisioned or on-demand capacity mode?

vladholubiev commented 3 weeks ago

Have you tried running it without the asyncProcessingFunction? Even if it's fast, it still blocks the stream consumption, as the implementation respects the backpressure mechanism and won't resume fetching until you've consumed everything serially.

robertkossendey commented 3 weeks ago

On Demand, but I don't hit throttling. This is already without the function, just running Promise.resolve().

When is a batch considered consumed? And is there the possibility to consume it in parallel with the watermark? I want to make sure that the process does not OOM

vladholubiev commented 2 weeks ago

When is a batch considered consumed?

A batch is considered consumed as soon as the async iterator in the for await loop yields the current iteration.

Based on the logs, I suspect the issue might be related to the partition key. Have you tried querying other tables with more uniform distribution?

I've tested this on my tables, and each log has a duration of less than 100ms, while yours take 2-3 seconds for each request.

Is it possible that the item size is quite large, approaching the 400 KB limit?

robertkossendey commented 2 weeks ago

So the table that I used for the logs, I only have a Partition Key that is UUIDv4. That should not be the problem.

The average item size on the table is 3KB.