samtools / htslib

C library for high-throughput sequencing data formats
Other
784 stars 447 forks source link

Using POSIX and htslib to create x compressed vcf files using x threads #1739

Closed Npaffen closed 3 months ago

Npaffen commented 4 months ago

I want to adjust the genotype of some large sample size vcf file by using global ancestry results. I have ancestry results for x origins so I want to recreate my vcf file with adjusted genotypes for those x origins. Since the filepointer in htslib are not threadsafe I used the POSIX library to create threads containing individual filepointer for each thread and then read the vcf, adjust the genotypes and write the vcf back to disk. Right now it looks like this:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include </usr/include/htslib/hts.h>
#include </usr/include/htslib/vcf.h>

typedef struct {
  char *name;
  char code;
} Origin;

typedef struct {
  int originIndex;
  char* vcfFilename;
  htsFile *fp;
  htsFile *fp2;
  bcf_hdr_t *hdr;
  // Add other necessary fields
} ThreadData;

// Declare global variables
Origin *origins;
char ***originLines;
char **tsvLines;
int originCount, tsvLineCount;
char *outputFilename = NULL;

bcf_hdr_t *hdr2 = NULL;
int allele1;
int allele2;

char **readTsvFile(FILE *file, int *lineCount);
int readOrigins(FILE *file, Origin **origins);
void* processOrigin(void* arg);

int main(int argc, char *argv[]) {
...

 for (int i = 0; i < originCount; ++i) {
    // Allocate and prepare outputFilename
    outputFilename = malloc(strlen(origins[i].name) + strlen(argv[4]) + 10); // for "_", ".vcf.gz", and null terminator
    if (!outputFilename) {
      fprintf(stderr, "Failed to allocate memory for outputFilename\n");
      // Handle error appropriately, possibly with cleanup and exit
      exit(EXIT_FAILURE); // Consider more graceful error handling
    }
    sprintf(outputFilename, "%s_%s.vcf.gz", origins[i].name, argv[4]);
    threadData[i].originIndex = i;
    threadData[i].vcfFilename = argv[1];
    threadData[i].fp   = bcf_open(argv[1], "r");
    threadData[i].fp2  = hts_open(outputFilename, "wz");
    // Read the VCF header
    threadData[i].hdr  = bcf_hdr_read(threadData[i].fp);
    // Assuming VCF filename is first argument
    // Initialize other fields of threadData[i] as necessary

    if (pthread_create(&threads[i], NULL, processOrigin, &threadData[i])) {
      fprintf(stderr, "Error creating thread for origin %d\n", i);
      // Handle error
    }
  }

  // Wait for all threads to finish
  for (int i = 0; i < originCount; ++i) {
    pthread_join(threads[i], NULL);
  }
  ...

with processOrigin :

// process origins 
void* processOrigin(void* arg) {
  ThreadData* data = (ThreadData*)arg;
  int originIndex = data->originIndex;
  char* vcfFilename = data->vcfFilename;
  htsFile *fp = data->fp;
  htsFile *fp2 = data->fp2;
  bcf_hdr_t *hdr = data->hdr;
  if (fp == NULL) {
    fprintf(stderr, "Error opening file %s\n", vcfFilename);
    exit(EXIT_FAILURE); // Consider more graceful error handling
  }

  bcf1_t *rec = bcf_init();
  int line_count = 0;

  // Process the VCF records
  while (bcf_read(fp, hdr, rec) == 0) {
    if (rec->unpacked == 0) {
      bcf_unpack(rec, BCF_UN_ALL);
      int n_sample = bcf_hdr_nsamples(hdr);
      int32_t *gts = (int32_t *)malloc(n_sample * 2 * sizeof(int32_t));
      int32_t *gt_arr = NULL, ngt_arr = 0;
      int ngt = bcf_get_genotypes(hdr, rec, &gt_arr, &ngt_arr);
      const char *chrom = bcf_hdr_id2name(hdr, rec->rid);
      int pos = rec->pos + 1; // VCF positions are 1-based
      float qual = rec->qual;
      char **alleles = rec->d.allele;

      if(line_count == 0){

        if (!fp2) {
          fprintf(stderr, "Failed to open %s for writing.\n", outputFilename);
          // Handle error appropriately, including freeing outputFilename
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }

        // Initialize the header for the output file
        hdr2 = bcf_hdr_dup(hdr); // Duplicate the header from the input file
        if (!hdr2) {
          fprintf(stderr, "Failed to duplicate the VCF header.\n");
          // Handle error appropriately
          hts_close(fp2);
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }

        // Write the header to the output file
        if (bcf_hdr_write(fp2, hdr2) < 0) {
          fprintf(stderr, "Failed to write the header to %s.\n", outputFilename);
          // Handle error appropriately
          bcf_hdr_destroy(hdr2);
          hts_close(fp2);
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }

      }

      int tsvcol = 0;
      for (int i = 0; i < n_sample; i++) {

        int32_t allele1_raw = gt_arr[i * 2];
        int32_t allele2_raw = gt_arr[i * 2 + 1];

        // Check if the alleles are missing directly
        bool is_allele1_missing = bcf_gt_is_missing(allele1_raw);
        bool is_allele2_missing = bcf_gt_is_missing(allele2_raw);

        if (tsvLines[line_count][tsvcol] - '0' == origins[originIndex].code && is_allele1_missing == 0) { // Adjust comparison to match types
          gts[i * 2] = bcf_gt_phased(bcf_gt_allele(allele1_raw));
        } else{
          gts[i * 2] =  bcf_gt_missing;
        }  
        if (tsvLines[line_count][tsvcol + 2] - '0' == origins[originIndex].code && is_allele2_missing == 0) {
          gts[i * 2 + 1] = bcf_gt_phased(bcf_gt_allele(allele2_raw));
        } else{
          gts[i * 2 + 1] =  bcf_gt_missing;
        }  

        tsvcol += 4;
      }

      bcf_update_genotypes(hdr, rec, gts, n_sample * 2); // Update the record with genotypes
      if (bcf_write(fp2, hdr2, rec) < 0) {
        fprintf(stderr, "Error writing record to VCF file.\n");
        // Handle the error, such as by cleaning up resources and exiting if necessary
      }    
      line_count++;
      if (gt_arr) free(gt_arr);
      free(gts);
    }else{
      // Clean up
      bcf_destroy(rec);
      bcf_hdr_destroy(hdr);
      if(hdr2)    bcf_hdr_destroy(hdr2);
      bcf_close(fp);
      if(fp2)    bcf_close(fp2);
    }
  }

}

The output however seems to be truncated see:

bcftools view East_Asian_chr22.vcf.gz | wc -l
[W::bcf_sr_add_reader] No BGZF EOF marker; file 'East_Asian_chr22.vcf.gz' may be truncated
[E::bgzf_read_block] Failed to read BGZF block data at offset 519085134 expected 1144 bytes; hread returned 339
[E::vcf_parse_format] Number of columns at chr22:50806710 does not match the number of samples (8011 vs 8908)
Error: VCF parse error
1498497

It looks like there is a problem with the bcf_write function or, more likely, I made a mistake in my multi-threading logic. My singel-threading code is working without problems but since the file of tsvLines is very large I would like to read the file only once then use it in each thread to save some time.

whitwham commented 4 months ago

Without more of your program I am not entirely sure if what you are trying to do will work.

Here is what I think is wrong with processOrigin

Put the file closing clean up code at the end of the function, outside of the while loop. Otherwise it may not be called and the filestream will not be flushed. I would also move the destroy functions till after the close ones. It may not make a difference but it is probably safer.

Move everything inside the if(line_count == 0) to before the while loop, you only need to call it once.

I don't know what if (rec->unpacked == 0) is meant to achieve.

There is probably other stuff but with the code fragments you have provided it is difficult to see how it fits together.

whitwham commented 3 months ago

No response, closing.