Yelp / python-gearman

Gearman API - Client, worker, and admin client interfaces
http://github.com/Yelp/python-gearman/
Other
242 stars 122 forks source link

Worker intermittently lost connection with some gearmand servers #47

Open justhalf opened 11 years ago

justhalf commented 11 years ago

I found that when using multiple gearmand servers and having the workers connect to both of them, somehow (intermittently) some of the workers will just accept jobs from one of the servers only (i.e., it seems to lose connection to the other server or blocked in waiting for jobs in one server only and thus doesn't fetch jobs from there anymore). This causes the jobs from that server to be executed only by the workers which are still connected to that server. I experienced this in a 16-core Ubuntu machine.

This is somehow related to https://github.com/Yelp/python-gearman/issues/17 although a bit different.

Configuration:

  1. Start gearmand at 4730 and 4731
  2. Start worker B, C, D, and E, all connect to both 4730 and 4731
  3. Start client A, connects to both 4730 and 4731
  4. After many jobs are sent, one of the workers will randomly lose connection with one of the servers.

Try the minimal reproducing code below to test.

  1. Start gearmand at 4730 and 4731
  2. Start gearman_worker.py
  3. Start test.bash
  4. Check that for the last n jobs (as printed in the worker's console), only some of the workers are processing the requests (i.e., some others just don't fetch jobs from the other server anymore). Note that this happens intermittently, so please rerun the worker and the bash script if the behaviour hasn't occur yet.

    test.bash

# test.bash
for i in {0..5}; do
    python gearman_client.py
    sleep 2
done

gearman_client.py

# gearman_client.py
import multiprocessing
import gearman
import traceback

def start_gearman_client(process_id):
    gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
    try:
        requests = []
        for gm_job_id in range(500):
            request = gm_client.submit_job(
                    task='do_task',
                    data='%d_%03d' % (process_id,gm_job_id),
                    unique='%d_%03d' % (process_id,gm_job_id),
                    background=False,wait_until_complete=False)
            requests.append(request)
        gm_client.wait_until_jobs_completed(requests)
    except:
        print traceback.format_exc()
    return 0

def main():
    child_processes = []
    for process_id in range(2):
        p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
        child_processes.append((process_id,p))
        p.start()

    for (pid,child) in child_processes:
        print 'Confirming that child number %d had died' % pid
        child.join()

if __name__ == '__main__':
    main()

gearman_worker.py

# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial

def do_work(gearman_worker,gearman_job,worker_id):
    try:
        print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
        time.sleep(0.001)
    except:
        print traceback.format_exc()
    return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)

def start_gearman_worker(worker_id):
    gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
    gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
    print 'Worker %d start working' % worker_id
    gm_worker.work()

if __name__ == '__main__':
    workers = []
    for pid in range(8):
        worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
        workers.append(worker)
        worker.start()
    for worker in workers:
        worker.join()

My last lines of output in worker's console:

Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499

As you can see, only workers 2, 4, and 5 are processing jobs from 4731, the others just don't fetch jobs from there anymore.

justhalf commented 11 years ago

After further inspection by printing out the connection status in connection_manager.py, line 128, I found out that the workers that seemed to be not working were actually waiting for any activity in the connection to the server.

So this is probably a bug in gearmand. I've posted the issue there (https://bugs.launchpad.net/gearmand/+bug/1220168)

orainxiong commented 10 years ago

I want to know if this issue has been resolved. my environment is gearmand 1.1.12, multiple job server, persistent queue type is mysql. I encountered the same problem, when the program runs for a week, there will be a part of the job retention in mysql queue, the strange thing is, some job but also to work fine

yunjianfei commented 10 years ago

I encountered the same problem too, when using multiple gearmand servers and having the workers connect to both of them, somehow (intermittently) some of the workers will just accept jobs from one of the servers only.

If gearman client send jobs to mutiple gearmand servers, sometimes It send more jobs to someone, if workers just accept jobs from one of the servers only. If job run a longtime, Result in that, some workers can't get job, and many jobs in one gearmand server's queue.

I have resolved this problem, gather some code from the branches. Add some code to grab job from servers.

My fork is https://github.com/yunjianfei/python-gearman