import time
import random
import gearman
import traceback
import json
class RequestTimeout:
def __init__(self):
self.delayed = {}
self.disabled = 0
def disableMethod(self, method, timeout):
self.delayed[method] = time.time()+timeout
def disableAll(self, timeout):
self.disabled = time.time()+timeout
def isDisabled(self, method):
t = time.time()
if (method not in self.delayed):
self.delayed[method] = 0
if (self.disabled >= t) or (self.delayed[method] >= t):
return True
return False
def getTimeout(self, method):
if (method not in self.delayed):
self.delayed[method] = 0
if (self.disabled > self.delayed[method]):
return self.disabled
return self.delayed[method]
class Controler:
__keys = [('123-456-789', RequestTimeout()), ('987-654-321', RequestTimeout())]
@staticmethod
def getApiKey(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
random.shuffle(Controler.__keys)
for (key, timeout) in Controler.__keys:
if (not timeout.isDisabled(method)):
return key
return -1
@staticmethod
def disableMethod(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
key = jobData['key']
timeout = jobData['timeout']
try:
for (k, t) in Controler.__keys:
if (k == key):
t.disableMethod(method, timeout)
return 'T'
return 'F'
except:
traceback.print_exc()
return 'F'
@staticmethod
def disableKey(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
key = jobData['key']
timeout = jobData['timeout']
try:
for (k, t) in Controler.__keys:
if (k == key):
t.disableAll(timeout)
return 'T'
return 'F'
except:
traceback.print_exc()
return 'F'
@staticmethod
def getMethodTimeout(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
t = time.time()
result = (sys.maxint, '')
for (key, timeout) in Controler.__keys:
t0 = min(result[0], timeout.getTimeout(method))
if (t0 != result[0]):
result = [t0, key]
return result
if __name__ == "__main__":
gm_worker = gearman.GearmanWorker(['10.132.157.195:4730'])
gm_worker.set_client_id('python-worker')
gm_worker.register_task('Manager-GetApiKey', Controler.getApiKey)
gm_worker.register_task('Manager-DisableMethod', Controler.disableMethod)
gm_worker.register_task('Manager-DisableKey', Controler.disableKey)
gm_worker.register_task('Manager-GetMethodTimeout', Controler.getMethodTimeout)
gm_worker.work()
provides the following crash:
root@Workers:~/worker# cat nohup.out
Traceback (most recent call last):
File "manager.py", line 102, in <module>
gm_worker.work()
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 83, in work
continue_working = self.poll_connections_until_stopped(worker_connections, continue_while_connections_alive, timeout=poll_timeout)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
self.handle_connection_activity(read_connections, write_connections, dead_connections)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 160, in handle_connection_activity
self.handle_read(current_connection)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 218, in handle_read
current_handler.fetch_commands()
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 39, in fetch_commands
continue_working = self.recv_command(cmd_type, **cmd_args)
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 65, in recv_command
completed_work = cmd_callback(**cmd_args)
File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 137, in recv_job_assign_uniq
self.connection_manager.on_job_execute(gearman_job)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 198, in on_job_execute
return self.on_job_complete(current_job, job_result)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 205, in on_job_complete
self.send_job_complete(current_job, job_result)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 147, in send_job_complete
current_handler.send_job_complete(current_job, data=data)
File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 58, in send_job_complete
self.send_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=current_job.handle, data=self.encode_data(data))
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 28, in encode_data
return self.connection_manager.data_encoder.encode(data)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 31, in encode
cls._enforce_byte_string(encodable_object)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 27, in _enforce_byte_string
raise TypeError("Expecting byte string, got %r" % type(given_object))
TypeError: Expecting byte string, got <type 'int'>
The following code:
provides the following crash:
I'm using the gearman 2.0.2 (The one on github)
PS:: Is this still being developed?