kipoi / kipoi-veff

Variant effect prediction plugin for Kipoi
https://kipoi.org/veff-docs
MIT License
6 stars 5 forks source link

Speedup writing to file #27

Open Avsecz opened 5 years ago

Avsecz commented 5 years ago

- [x] buffer writes - https://github.com/kipoi/kipoi-veff/pull/21 (e.g. don't write predictions to disk on every batch but only every now and then)

- [ ] use asynchronous writes

Here is the main loop performing:

  1. data-loading
  2. data preparation for the model
  3. model prediction
  4. Prediction writing to file

https://github.com/kipoi/kipoi-veff/blob/master/kipoi_veff/snv_predict.py#L620-L658

    for i, batch in enumerate(tqdm(it)):
        ...
        # Step 1. load the data
        eval_kwargs = _generate_seq_sets(dataloader.output_schema, batch, vcf_fh, vcf_id_generator_fn,
                                         seq_to_mut=seq_to_mut, seq_to_meta=seq_to_meta,
                                         sample_counter=sample_counter, vcf_search_regions=vcf_search_regions,
                                         generate_rc=model_info_extractor.use_seq_only_rc,
                                         bed_id_conv_fh=bed_id_conv_fh)

        # Step 2.  data preparation for the model
        if generated_seq_writer is not None:
            for writer in generated_seq_writer:
                writer(eval_kwargs)
            # Assume that we don't actually want the predictions to be calculated...
            continue

        if evaluation_function_kwargs is not None:
            assert isinstance(evaluation_function_kwargs, dict)
            for k in evaluation_function_kwargs:
                eval_kwargs[k] = evaluation_function_kwargs[k]

        eval_kwargs["out_annotation_all_outputs"] = model_out_annotation

        # Step 3. Make model prediction
        res_here = evaluation_function(model, output_reshaper=out_reshaper, **eval_kwargs)

        ....

        # Step 4. write the predictions
        if sync_pred_writer is not None:
            for writer in sync_pred_writer:
                writer(res_here, eval_kwargs["vcf_records"], eval_kwargs["line_id"])

- [ ] setup some standardized benchmarks to test the overhead

Tasks

Follow the following notebook: https://github.com/kipoi/kipoi-veff/blob/write_buffer/notebooks/code-profiling.ipynb

Finish the code on the write buffer PR by speeding up the writing to take minimal amount of time.

Avsecz commented 5 years ago

@shabnamsadegh I have another idea how to implement this that could also benefit the core batch writers:

We could implement an AsyncBatchWriter class in kipoi.writers that takes another batch writer and makes it asynchronous. The process loop should just run the while loop where it immediately runs batch_writer.batch_write().

class AsyncBatchWriter(BatchWriter):
    def __init__(self, batch_writer, max_queue_size=100):
        """
        Args:
          max_queue_size: maximal queue size. If it gets larger then batch_write needs to wait
             till it can write to the queue again. 
        """
        self.batch_writer = batch_writer

        # start the process and instantiate the queue
        self.queue = ...
        self.process = ...

    @abstractmethod
    def batch_write(self, batch):
        """Write a single batch of data
        Args:
          batch is one batch of data (nested numpy arrays with the same axis 0 shape)
        """
        if self.queue.size() > self.max_queue_size:
             # display warning. Wait till the queue is not small enough
        self.queue.put(batch)

    @abstractmethod
    def close(self):
        """Close the file
        """
        # stop the process, 
        # make sure the queue is empty
        # close the file
        self.batch_writer.close()

With this approach we would just need to add that class to kipoi.writers and then change this line of code to:

extra_writers = [SyncBatchWriter(AsyncBatchWriter(writer))]

Note that SyncBatchWriter is very confusing as it's actually converts the variant scores to the input usable by BatchWriters