self = <tests.test_queue.TestQueue object at 0xffff9b951e80>
def test_queue_balance(self):
# We only test SimpleQueuePush because SimpleQueuePull doesn't gurantee
# balance.
inqueue = fiber.queues.SimpleQueuePush()
outqueue = fiber.queues.SimpleQueuePush()
num_workers = 4
multiplier = 600
workers = []
results = []
for i in range(num_workers):
print("create worker", i)
p = fiber.Process(target=worker, args=(inqueue, outqueue, i), daemon=True)
workers.append(p)
for i in range(num_workers):
workers[i].start()
# wait for all workers to connect
time.sleep(1)
for i in range(num_workers * multiplier):
inqueue.put("work")
for i in range(num_workers * multiplier):
results.append(outqueue.get())
stats = collections.Counter(results)
total = num_workers * multiplier
# send singals to all workers
for i in range(num_workers * multiplier):
inqueue.put("quit")
for i in range(num_workers):
workers[i].join()
for i in range(num_workers):
#print("{}: {} {:.2f}".format(i, stats[i], stats[i] / float(total)))
# data should be fairly queued
Looks like all works, except for a single test error:
[jw@cn06 fiber]$ ./test_local.sh ... tests/test_queue.py::TestQueue::test_queue_balance FAILED [100%]
======================================================= FAILURES ======================================================= _ TestQueue.test_queuebalance
self = <tests.test_queue.TestQueue object at 0xffff9b951e80>
tests/test_queue.py:250: AssertionError ------------------------------------------------- Captured stdout call ------------------------------------------------- create worker 0 create worker 1 create worker 2 create worker 3 =============================================== short test summary info ================================================ FAILED tests/test_queue.py::TestQueue::test_queue_balance - assert 988 == 600 ================================= 1 failed, 61 passed, 10 skipped in 242.24s (0:04:02) ================================= [jw@cn06 fiber]$