nanoporetech / pod5-file-format

Pod5: a high performance file format for nanopore reads.
https://pod5-file-format.readthedocs.io/
Other
124 stars 17 forks source link

Question about iteratively getting a read subset #136

Open hasindu2008 opened 1 month ago

hasindu2008 commented 1 month ago

Hi pod5 developers,

I am trying to do some random access to a POD5 file, a small batch of around 1000 at a time. For instance, say we want to grab the reads for a small gene region at a time. Looking at the Dorado code, we have written the following piece of code that would be exploiting multiple threads.

We get like ~316 reads per second on a system with SSD. We are running with 40 threads and the CPU utilisation seems to be good.

Could you advise if this looks right interms of maximally getting the expected performance?

int read_and_process_pod5_file(const std::string& path, const char *rid_list_path, size_t num_thread, int batch_size) {

    int read_count = 0;

    //read id list
    FILE *fpr = fopen(rid_list_path,"r");
    if(fpr==NULL){
        fprintf(stderr,"Error in opening file %s for reading\n",rid_list_path);
        perror("perr: ");;
        exit(EXIT_FAILURE);
    }
    char tmp[1024];

    /**** Initialisation and opening of the file ***/

    pod5_init();
    Pod5FileReader_t* file = pod5_open_file(path.c_str());
    if (!file) {
        fprintf(stderr, "Failed to open file %s: %s\n", path.c_str(), pod5_get_error_string());
        exit(EXIT_FAILURE);
    }
    std::size_t batch_count = 0;
    if (pod5_get_read_batch_count(&batch_count, file) != POD5_OK) {
        fprintf(stderr, "Failed to query batch count: %s\n", pod5_get_error_string());
        exit(EXIT_FAILURE);
    }
    fprintf(stderr, "batch_count: %zu\n", batch_count);
    cxxpool::thread_pool pool{num_thread};

    /**** End of init ***/

    while (1){

        // reading a batch of read IDs from the list
        std::vector<ReadID> read_ids;
        int i_count = 0;
        for(i_count=0; i_count < batch_size; i_count++){
            if (fscanf(fpr,"%s",tmp) < 1) {
                break;
            }
            uint8_t  arr[16] = {0};
            int ret = sscanf(tmp, "%2hhx%2hhx%2hhx%2hhx-%2hhx%hhx-%2hhx%2hhx-%2hhx%2hhx-%2hhx%2hhx%2hhx%2hhx%2hhx%2hhx",
                             &arr[0], &arr[1], &arr[2], &arr[3], &arr[4], &arr[5], &arr[6], &arr[7],
                             &arr[8], &arr[9], &arr[10], &arr[11], &arr[12], &arr[13], &arr[14], &arr[15]);
            if(ret !=16){
                fprintf(stderr,"Parsing uuid failed. Return val %d\n",ret);
                exit(1);
            }

            ReadID read_id;
            memcpy(read_id.data(), arr, POD5_READ_ID_SIZE);
            read_ids.push_back(std::move(read_id));
        }

        if(i_count == 0) break;
        int ret = i_count;
        read_count += ret;

        /**** Plan traversal ***/

        std::vector<std::uint32_t> traversal_batch_counts(batch_count);
        std::vector<std::uint32_t> traversal_batch_rows(ret);
        std::size_t find_success_count = 0;
        std::vector<uint8_t> read_id_array(POD5_READ_ID_SIZE * read_ids.size());
        for (size_t i = 0; i < read_ids.size(); i++) {
            memcpy(read_id_array.data() + POD5_READ_ID_SIZE * i, read_ids[i].data(), POD5_READ_ID_SIZE);
        }
        pod5_error_t err = pod5_plan_traversal(file, read_id_array.data(), read_ids.size(),
                                               traversal_batch_counts.data(),
                                               traversal_batch_rows.data(), &find_success_count);
        if (err != POD5_OK) {
            fprintf(stderr,"Couldn't create plan for %s with reads %zu\n", path.c_str(), read_ids.size());
            return EXIT_FAILURE;
        }
        if (find_success_count != read_ids.size()) {
            fprintf(stderr,"Reads found by plan %zu, reads in input %zu\n", find_success_count, read_ids.size());
            fprintf(stderr,"Plan traveral didn't yield correct number of reads\n");
            return EXIT_FAILURE;
        }

        /**** Plan traversal done ***/

        uint32_t row_offset = 0;
        // pod5 batching
        for (std::size_t batch_index = 0; batch_index < batch_count; ++batch_index) {

            Pod5ReadRecordBatch_t* batch = nullptr;
            if (pod5_get_read_batch(&batch, file, batch_index) != POD5_OK) {
                fprintf(stderr, "Failed to get batch: %s\n", pod5_get_error_string());
                exit(EXIT_FAILURE);
            }
            int mini_batch_size = traversal_batch_counts[batch_index];
            rec_t *rec = (rec_t*)malloc(mini_batch_size * sizeof(rec_t));
            std::vector<std::future<int>> futures;
            for (std::size_t row_idx = 0; row_idx < traversal_batch_counts[batch_index]; row_idx++) {
                uint32_t row = traversal_batch_rows[row_idx + row_offset];
                futures.push_back(pool.push(load_pod5_read,row, batch, file, &rec[row_idx]));
            }
            for (auto& v : futures) {
                v.get();
            }

            /**** Batch fetched ***/

            //process
            process_read_batch(rec, mini_batch_size);

            /**** Deinit ***/
            if (pod5_free_read_batch(batch) != POD5_OK) {
                fprintf(stderr, "Failed to release batch\n");
                exit(EXIT_FAILURE);
            }
            /**** End of Deinit***/

            row_offset += traversal_batch_counts[batch_index];
        }

        if(ret<batch_size){ //this indicates nothing left to read
            break;
        }

    }

    /**** Deinit ***/
    if (pod5_close_and_free_reader(file) != POD5_OK) {
        fprintf(stderr, "Failed to close and free POD5 reader\n");
        exit(EXIT_FAILURE);
    }
    /**** End of Deinit***/

    fclose(fpr);

    return read_count;
}
0x55555555 commented 1 month ago

Hi @hasindu2008 ,

It looks reasonable, do you know the number of megabytes per second of signal you receive from the loading? This is a more interesting performance metric as any operation you run involving the sample data will bottleneck on decompression - and potentially the disk IO speed depending on the speed of your processor.

If you aren't hitting your disk's max read speed I would suggest it could be more optimal.

Depending on the size of the pod5 files and the size of the selection you are making, you could explore passing the reads from an entire batch as one async operation to reduce overhead of the threadpool - but you would need to configure this based on profiling of your use case.

Hope that helps,

hasindu2008 commented 1 month ago

Hi

Could you clarify what you meant by "megabytes per second of signal" - before decompression or after compression? effective speed for the subsets or what is observed through something like iostat?

The CPU is maxing out, so I guess the overhead of the threadpool is negllegable and optimising this would not make a difference I guess? image

hasindu2008 commented 1 month ago

@0x55555555 to add further:

As per iostats, it is reaching like 300 MB/s:

r/s     w/s     rMB/s     wMB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
3538.60    0.00    336.05      0.00     0.00     0.00   0.00   0.00    0.17    0.00   0.16    97.25     0.00   0.04  15.36

r/s     w/s     rMB/s     wMB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
3122.80    0.00    297.02      0.00     0.00     0.00   0.00   0.00    0.18    0.00   0.15    97.40     0.00   0.04  14.00

r/s     w/s     rMB/s     wMB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
3297.00    0.00    313.64      0.00     0.00     0.00   0.00   0.00    0.17    0.00   0.13    97.41     0.00   0.04  12.48

r/s     w/s     rMB/s     wMB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
3346.00    0.00    317.91      0.00     0.00     0.00   0.00   0.00    0.16    0.00   0.13    97.29     0.00   0.04  12.72

r/s     w/s     rMB/s     wMB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
3444.20    0.00    328.32      0.00     0.00     0.00   0.00   0.00    0.17    0.00   0.14    97.61     0.00   0.04  13.12

For a typical SSD, sequential read speed is around 500MB/s, so I guess for random access 300MB/s is likely to be closer to the max? I am not sure how random access in POD5 is implemented under the API and thus what kind of size each random access is, but you could answer this better I presume.

The size of my POD5 file is 741G which contains a whole PromethION sample. The size of the selection I make at a time is around 1000 reads, imagine getting reads belonging to a certain gene region. If you have any code examples of the suggestion you mentioned about "passing the reads from an entire batch as one async operation to reduce overhead of the threadpool", we could try that too.

Thanks.

0x55555555 commented 1 month ago

If your CPU is maxing out, its more likely you are bottlenecking on decompression of signal - which you could confirm using perf tools, but you're right, for a single SSD system, this might be the max you can achieve.

It looks like you are achieving a reasonable level of performance though - it'll depend on the downstream application of the signal if you need to optimise further.

Thanks,

hasindu2008 commented 1 month ago

I do not quite understand how the random access in pod5 works. Is there an index or something? If not, why not?