Closed rahul2512 closed 7 months ago
Can you confirm which pyfaidx version you're using?
Hello, I'm experiencing a similar issue with pyfaidx 0.8.1.1 when using it in a parallel environment. When running my code in parallel (using multiprocessing or threads), pyfaidx returns sequences that are different from the expected sequences, and the lengths of the returned sequences are also incorrect.
@yang-dongxu could you provide some example code that I could use to reproduce the issue? If so I would be glad to take a look.
Hello, @mdshw5. Here is an example code. This code sometimes gets sequences with length=1001. This problem doesn't occur every time, just occasionally after a few executions.
from pyfaidx import Fasta
from multiprocessing import Pool
def read_region(region):
return fasta[region['chr']][region['start']:region['end']]
fasta = Fasta('ref_genome/hg38.fa')
region_list = [{'chr': 'chr19', 'start': 1000, 'end': 2000} for i in range(10000)]
with Pool(processes=16) as pool:
results = pool.map(read_region, region_list)
for result in results:
if len(result) != 1000:
print(len(result))
@mdshw5 Sorry for the late response :) The code provided by @maplecai also represents my issue.
Hello, @mdshw5. Here is an example code. This code sometimes gets sequences with length=1001. This problem doesn't occur every time, just occasionally after a few executions.
from pyfaidx import Fasta from multiprocessing import Pool def read_region(region): return fasta[region['chr']][region['start']:region['end']] fasta = Fasta('ref_genome/hg38.fa') region_list = [{'chr': 'chr19', 'start': 1000, 'end': 2000} for i in range(10000)] with Pool(processes=16) as pool: results = pool.map(read_region, region_list) for result in results: if len(result) != 1000: print(len(result))
Sorry for forgetting to provide my environment details:
Thank you both for providing more details! I'll reproduce this and then figure out a solution.
I've just run your code and can confirm I can reproduce the behavior you're describing. I haven't had a chance to look further yet, but did reproduce using:
Python 3.11.5 Ubuntu 23.04 pyfaidx 0.7.2.2
Based on this, I would say it's not going to be an issue with only certain package versions.
Just an update: Once I started looking into process safety vs thread safety in python I remembered that I have addressed this in a previous issue:
@Maarten-vd-Sande Looking more closely at the differences between synchronization in python's threading vs multiprocessing modules I'm remembering how much of a headache this actually is. I've used both modules for different tasks in the past, but for I/O-bound operations I've chosen threading due to this statement in the threading documentation:
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
The
multiprocessing.Lock
andthreading.Lock
internals are completely different since they attempt to solve different issues. Mainly it seems thatmultiprocessing.Lock
must be inherited by the child processes, which makes implementation in theFasta
object almost impossible, since theFasta
instance will serialized bypickle
before being copied to the children, rendering the lock useless (see the multiprocessing documentation for an example).So, I think that since I've implemented the
threading.Lock
inFaidx.fetch
that means thatFasta
is thread-safe but not process safe. To make the object process safe you should be able to do something like:import pyfaidx from multiprocessing import Pool, Lock import random def get_random_sequence_sync(_): pos = random.randint(0, len(danRer11['chr9']) - 500) lock.acquire() seq = danRer11['chr9'][pos:pos+500] lock.release() assert len(seq) == 500 def init(l): global lock lock = l danRer11 = pyfaidx.Fasta("/vol/genomes/danRer11/danRer11.fa") print("try multi process") p = Pool(32, initializer=init, initargs=(Lock(),)) p.map(get_random_sequence_sync, range(1000))
Originally posted by @mdshw5 in https://github.com/mdshw5/pyfaidx/issues/92#issuecomment-578200960
Unless either of you have a better idea about how to incorporate process synchronization into the Fasta
class, I think you should be able to do something like:
@Maarten-vd-Sande Looking more closely at the differences between synchronization in python's threading vs multiprocessing modules I'm remembering how much of a headache this actually is. I've used both modules for different tasks in the past, but for I/O-bound operations I've chosen threading due to this statement in the threading [documentation](https://docs.python.org/3.8/library/threading.html#lock-objects):
> **CPython implementation detail**: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
The `multiprocessing.Lock` and `threading.Lock` internals are completely different since they attempt to solve different issues. Mainly it seems that `multiprocessing.Lock` must be inherited by the child processes, which makes implementation in the `Fasta` object almost impossible, since the `Fasta` instance will serialized by `pickle` before being copied to the children, rendering the lock useless (see the [multiprocessing documentation](https://docs.python.org/3.8/library/multiprocessing.html#synchronization-between-processes) for an example).
So, I think that since I've implemented the `threading.Lock` in `Faidx.fetch` that means that `Fasta` is thread-safe but not process safe. To make the object process safe you should be able to do something like:
```python
from pyfaidx import Fasta
from multiprocessing import Pool, Lock
import random
def init(l):
global lock
lock = l
def read_region(region):
lock.acquire()
seq = fasta[region['chr']][region['start']:region['end']]
lock.release()
return seq
fasta = Fasta('ref_genome/hg38.fa')
region_list = [{'chr': 'chr19', 'start': 1000, 'end': 2000} for i in range(10000)]
with Pool(16, initializer=init, initargs=(Lock(),)) as pool:
results = pool.map(read_region, region_list)
for result in results:
if len(result) != 1000:
print(len(result))
In my testing tonight I can confirm that this fixes the issue, and all(len(result) == 1000 for result in results)
.
@mdshw5 Thank you for solving this problem. I have another question, in my tests this process safe multiprocess code is slower than single process, is there any solution to increase the speed while keeping the process safe?
@maplecai I don't believe this process safe method will ever be fast due to the shared lock. You'll always have some processes waiting their turn. There might be two alternatives to speed up access using concurrency. The first would be to instantiate and destroy the Fasta
object in each new process. This should work fine since each instance will have a different file pointer. The second method would be to use multithreading which would allow the interpreter process to have some level of IO concurrency. It all depends on which part of your code you need to speed up. I'd also suggest using a profiler first to see if this is even the slow part of your code.
@mdshw5 Thanks! I'll give it a try.
from pyfaidx import Fasta
hg38 = Fasta('path_to_reference_genome')
def read_seq(): seq = hg38('chr1', 100000, 100005 ) return seq
If I use above function in parallel, it given different seq. If not in parallel, then it's working fine.