chapmanb / bcbb

Incubator for useful bioinformatics code, primarily in Python and R
http://bcbio.wordpress.com
604 stars 243 forks source link

num_cores: messages; socket.timeout: timed out #50

Closed vals closed 12 years ago

vals commented 12 years ago

Hey Brad, we are trying to use the distributed version of the pipeline.

We have a couple of test sets that we use to quickly see if the pipeline is working. One that takes the normal pipeline about 3 hours to finish, and another much smaller that takes about 7 minutes (this is with 8 cores).

When running the small test set on the messaging variant all files get generated as they should, and the program exits properly. Note that this small set consists of fastq files which are only 12 lines each, and I'm guessing much of the analysis gets skipped due to a lack of data.

When we run the messaging version of the pipeline for the larger set, the programs work for a while (time varies, but say between 45 minutes and 1 hour 30 minutes), but then one of the jobs crashes with a socket.timeout error, (this specific job I believe is some master that coordinates what the other jobs should be doing.

I'll include the output of that job here:

[2012-02-25 02:55:26,856] Found YAML samplesheet, using /proj/a2010002/nobackup/illumina/pipeline_test/archive/000101_SN001_001_AABCD99XX/run_info.yaml instead of Galaxy API
Traceback (most recent call last):
  File "/bubo/home/h10/vale/.virtualenvs/devel/bin/automated_initial_analysis.py", line 7, in <module>
    execfile(__file__)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 117, in <module>
    main(*args, **kwargs)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 48, in main
    run_main(config, config_file, fc_dir, work_dir, run_info_yaml)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 65, in run_main
    lane_items = run_parallel("process_lane", lanes)
  File "/bubo/home/h10/vale/bcbb/nextgen/bcbio/distributed/messaging.py", line 28, in run_parallel
    return runner_fn(fn_name, items)
  File "/bubo/home/h10/vale/bcbb/nextgen/bcbio/distributed/messaging.py", line 67, in _run
    while not result.ready():
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 306, in ready
    return all(result.ready() for result in self.results)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 306, in <genexpr>
    return all(result.ready() for result in self.results)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 108, in ready
    return self.status in self.backend.READY_STATES
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 196, in status
    return self.state
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 191, in state
    return self.backend.get_status(self.task_id)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/base.py", line 237, in get_status
    return self.get_task_meta(task_id)["status"]
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/amqp.py", line 128, in get_task_meta
    return self.poll(task_id)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/amqp.py", line 153, in poll
    with self.app.pool.acquire_channel(block=True) as (_, channel):
  File "/sw/comp/python/2.7.1_kalkyl/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 789, in acquire_channel
    yield connection, connection.default_channel
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 593, in default_channel
    self.connection
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 586, in connection
    self._connection = self._establish_connection()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 546, in _establish_connection
    conn = self.transport.establish_connection()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 252, in establish_connection
    connect_timeout=conninfo.connect_timeout)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 62, in __init__
    super(Connection, self).__init__(*args, **kwargs)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
    raise socket.error, msg
socket.timeout: timed out
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Have you encountered any issues with socket.timeout? Any ideas what we might be doing wrong?

chapmanb commented 12 years ago

Valentine; The error message looks like you are running into intermittent connection issues with the AMQP server. I see problems with this occasionally if the AMQP server goes down and the best advice is to restart the processing. Since everything is idempotent it will pick up where it left off without any issues.

If you are seeing it persistently there are Celery settings you can tweak in the configuration file, which is generated in the code here:

https://github.com/chapmanb/bcbb/blob/master/nextgen/bcbio/distributed/messaging.py#L124

Specifically, you could increase 'BROKER_CONNECTION_MAX_RETRIES' which is set at 100 by default:

http://celery.readthedocs.org/en/latest/configuration.html#broker-settings

If you find settings that work better for your environment please do send a pull request and we can incorporate them. Hope this works for you.

vals commented 12 years ago

Hey again. I increased the number of retries but still getting the error. Now I'm running with unlimited just to see if it manages to start before the reserved time would run out, next step would be to count number of needed retries.

However, there is another things I thought I would mention.Yesterday we experienced a silly situation, where start_analysis_manager was in a queue for quite a long time before the submitted job started. This started job then submitted two workers to the job manager (via start_workers), where they stayed in queue for several hours, causing the allotted time for the the analysis manager to run out and force quit due to time limit before the workers started. When the workers started they had no manager to communicate with.

We were thinking about something along the lines of making an option to run the analysis manager as a subprocess from distributed_nextgen_pipeline.py without giving it its own node, since as it is simply a manager it shouldn't need any heavy computational power. And jobs submitted at the same time almost always start at the same time.

chapmanb commented 12 years ago

Valentine, Are you able to connect to the AMQP server from the cluster nodes? I wouldn't expect a persistent problem from Celery unless there was some underlying issue with the connection to the server. How far are you getting in the process? Do things start and then fail halfway through or not get running at all?

The analysis manager does do processing in addition to sending out jobs so you don't want to run it on your head node. It handles anything that can't be run in parallel, like merging input files.

Your queuing problems are a bit tricky. Right now the logic is in:

https://github.com/chapmanb/bcbb/blob/master/nextgen/bcbio/distributed/manage.py

We could remove waiting for the manager node to be queued and start up everything together. Would that work better for your environment?

vals commented 12 years ago

Brad, I am fairly sure that the nodes can connect to the AMQP server. I just enqueued a new run, I'll paste all three .out files (two workers, one manager) when they are done. But some work gets performed before the timeout occurs. As far as I remember, some worker is usually at a sam alignment step when it gets canceled.

I think enqueueing the manager and the workers at the same time makes sense as they are supposed to run at the same time, though, there is of course never a guarantee that they will all start at the same time. And a worker starting before the manager can't be good...

Optimal would be to enqueue a job with the number of needed nodes as an argument, but I don't really know how to distribute jobs and things once one is given the nodes.

chapmanb commented 12 years ago

Valentine; That is strange. The error message reads that the manager can no longer connect to the AMQP server and so shuts everything down. If there is bad intermittent connectivity than maybe unlimited retries is the way to go. I worry a bit that it would get stuck in real problem cases where the AMQP server dies.

If the manager is added first it should get started first, but I think things should work smoothly even if works get started before the manager. They just sit around and wait for instructions. Here is the small change that queues everything together:

https://github.com/chapmanb/bcbb/commit/465d0fdc482bcb5ac0f354f9b825537531cf5789

Let me know if that improves task scheduling for you.

vals commented 12 years ago

Hey Brad; some updates:

1. The patch helps increase the chances, and works nicely as such. But there is still a long queue on our resource, and and the jobs manage to start at widely different times. However! If one submits single core jobs, these will get higher priority and start quickly. So by setting cores_per_host to 1, we have managed to start testing things out without things taking forever just to get started.

We were wondering: Is there any point in the pipeline where the multicore nature of the nodes is used? The canonical example we discussed was that for bwa you can provide the number of cores, and it works parallely. Or is all multiprocessing functionality tied to how the pipeline splits up tasks?

2. We are still experiencing timeout issues. Here is a full output for the job that experiences the timeout:

$ cat slurm-1842146.out
[2012-03-08 11:15:36,634] Found YAML samplesheet, using /proj/a2010002/nobackup/illumina/pipeline_test/archive/000101_SN001_001_AABCD99XX/run_info.yaml instead of Galaxy API
[2012-03-08 10:15] WARNING: nextgen_pipeline: Encountered exception when writing sequencing report to Google Docs: create_log_handler() takes exactly 1 argument (2 given)
Traceback (most recent call last):
  File "/bubo/home/h10/vale/.virtualenvs/devel/bin/automated_initial_analysis.py", line 7, in <module>
    execfile(__file__)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 117, in <module>
    main(*args, **kwargs)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 48, in main
    run_main(config, config_file, fc_dir, work_dir, run_info_yaml)
  File "/bubo/home/h10/vale/bcbb/nextgen/scripts/automated_initial_analysis.py", line 70, in run_main
    align_items = run_parallel("process_alignment", lane_items)
  File "/bubo/home/h10/vale/bcbb/nextgen/bcbio/distributed/messaging.py", line 28, in run_parallel
    return runner_fn(fn_name, items)
  File "/bubo/home/h10/vale/bcbb/nextgen/bcbio/distributed/messaging.py", line 69, in _run
    if result.failed():
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 288, in failed
    return any(result.failed() for result in self.results)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 288, in <genexpr>
    return any(result.failed() for result in self.results)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 116, in failed
    return self.status == states.FAILURE
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 196, in status
    return self.state
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/result.py", line 191, in state
    return self.backend.get_status(self.task_id)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/base.py", line 237, in get_status
    return self.get_task_meta(task_id)["status"]
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/amqp.py", line 128, in get_task_meta
    return self.poll(task_id)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/celery/backends/amqp.py", line 153, in poll
    with self.app.pool.acquire_channel(block=True) as (_, channel):
  File "/sw/comp/python/2.7.1_kalkyl/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 789, in acquire_channel
    yield connection, connection.default_channel
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 593, in default_channel
    self.connection
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 586, in connection
    self._connection = self._establish_connection()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/connection.py", line 546, in _establish_connection
    conn = self.transport.establish_connection()
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 252, in establish_connection
    connect_timeout=conninfo.connect_timeout)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 62, in __init__
    super(Connection, self).__init__(*args, **kwargs)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/bubo/home/h10/vale/.virtualenvs/devel/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
    raise socket.error, msg
socket.timeout: timed out
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

(The google docs warning shouldn't have anything to do with the problem; it's just a warning.)

chapmanb commented 12 years ago

Valentine; I also run with cores_per_host set at 1. Our situation is very similar: with a busy cluster it takes a long time to get the awesome multi-core nodes so I settle for getting single cores wherever I can. You won't be able to multi-thread bwa so there's a tradeoff. bwa is normally not the rate limiting step for my analyses, so I prefer to get them started faster and lose a little time in processing. This can be configured however you think best for your system.

I'm not sure how to help with the RabbitMQ problems. All of the error messages read like network problems. It seems to be having issues getting connections with the RabbitMQ server. Short of debugging the network issue, the next best option is to try adjusting up some of the connection parameters like BROKER_CONNECTION_TIMEOUT to see if that helps:

http://docs.celeryproject.org/en/latest/configuration.html?highlight=timeout#broker-connection-timeout

Sorry I don't have anything more specific to suggest. Hope this helps.

vals commented 12 years ago

Brad; We got it working now, what finally fixed it was to upgrade Celery from 2.4 to 2.5 (we also increased BROKER_CONNECTION_TIMEOUT to 10 minutes, but we are fairly certain it was the Celery upgrade that mattered).

chapmanb commented 12 years ago

Valentine; Awesome -- glad that it is working now. Thanks for the patience digging into this.