Closed asrocha closed 2 years ago
@asrocha
On which line you are getting this error? TypeError: 'ExternalTask' object is not iterable
?
hi @yogeshrnaik Now I running it ok, I rewriting somethings and I will put here my code. It s working fine.
worker-->celery-->callback the unique requisite is that the activity_name on bpmn diagram is the same of the celery task.]
att
Hi @asrocha I am glad your issue is resolved. I am closing this issue for now. Please feel free to reopen it or create new issue if you face any difficulties. Regards, Yogesh
@asrocha Can you please put here, your sample code that uses Celery to run the External task asynchronously? Few other folks are asking for such mechanism. It will help them implement it using Celery. Thanks.
@yogeshrnaik off Course, today my code is a litle bit more complex I have added:
regards
This is the worker
import os
import sys
import json
import logging
import uuid
import pycamunda
import pycamunda.externaltask
import pycamunda.processinst
import requests
from gettext import gettext as _
import sys
import importlib
#only if you running django project
sys.path.insert( 0, 'PATH/TO/YOUR/DJANGO/PROJECT' )
import django
django.setup()
#it is very cool
from art import art, tprint, text2art, decor
from inspect import signature
from YOUR_CELERY_PROJECT import app
from YOUR_CELERY_PROJECT import CELERY_RESULT_BACKEND
import logging
from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
#Here where you import yours celecy Tasks that complete or fail tasks
from YOUR_TASKS_PATH_MODULES import complete_task, fail_task
import pycamunda
from concurrent.futures.thread import ThreadPoolExecutor
from YOUR_OWN_EXCEPTIONS import BusinessException
#it is very important to monitoring if your long tasks is running
app.conf.task_track_started = True
logger = logging.getLogger(__name__)
#### Configuration
url = 'http://localhost:8080/engine-rest'
worker_id = 'BPCore'
topics = os.getenv('TOPICS', ['network', 'BPCore', 'capabilities'])
long_polling_topics = os.getenv('LONG_POLLING_TOPICS', ['network'])
LOCK_DURATION = os.getenv('LOCK_DURATION',60000)
EXTEND_LOCK_DURATION = os.getenv('EXTEND_LOCK_DURATION',300000)
#text Decoration it is very cool and a tribute to fucking Lemmy Kilmister
app.loader.import_default_modules()
tprint( "M O T O R H E A D", font="rdn-x-large" )
x = text2art( " We Are Motorhead \n \t and we Play Rock in roll !!! ", font="small" ) + decor( "barcode1", reverse=True )
print (x)
#showing the (celery) tasks that we can run
for t in app.tasks:
print(f"{worker_id}--> Waiting: {t}")
print ("=======================================================================")
tprint("Ready to rock ......", font="rdn-small")
print ("=======================================================================")
def add_celery_task_to_process(celery_task,task):
"""
Add a Celery Task Id to process variables
it will be usefull to check if celery task was running on background
celery_task_XXX because many time you need run in parallel tasks with same name
:param celery_task:
:param task: External Task_parset
:return:
"""
try:
var_name = F"{task['id_']}_{task['activity_id']}_celery_task"
set_var = pycamunda.processinst.VariablesModify(url=url,process_instance_id=task['process_instance_id'])
set_var.add_variable(name=var_name,value=celery_task.id)
set_var()
except (Exception) as e:
logger.error(F"{_('ERROR_ON_SET_CELERY_ID')}: {e}")
raise Exception (F"{_('ERROR_ON_SET_CELERY_ID')}: {e}")
def get_celely_task(task_id):
"""
Get info about a celery Task on Flower or in this case the tasks information is sended to opensearch becouse flower is not persistent
if you are using flower you ned to rewrite it becouse it is using opensearch that is better than Elasticsearch
:param task_id:
:return:
"""
try:
url = f"https://{YOUR_OPENSEARCH_HOST}:9200/celery_task/_doc/{task_id}"
headers = {"Content-Type": "application/json"}
res = requests.get(url=url, headers=headers, auth=('admin','admin'), verify=False)
data = json.loads(res.text)
return data['_source']
except (Exception ) as e:
raise Exception (f"{_('ERROR_ON_GET_TASK_STATE')} : {e}")
def check_if_task_is_running(task):
"""
Check if task is running on celery Work
:param task: (activitd Camunda Task)
:return: Boolean
"""
try:
vars = pycamunda.variable.GetList(url=url, process_instance_id_in=task['process_instance_id'])
vars_intances = vars()
celery_task = None
for var in vars_intances:
if "_celery_task" in getattr(var, "name") :
celery_task = getattr(var,"value")
celery_task_data = get_celely_task(celery_task)
if task['activity_id'] in celery_task_data['name'] : #or (task['activity_id'].split('capability_')[-1] in celery_task_data['args']['capability']):
if celery_task_data [ 'state'] != "SUCCESS" and celery_task_data['state'] != "FAILURE":
return True
return False
except (Exception) as e:
logger.error(f"{_('ERROR_VERIFY_TASK_IS_RUNNING_ON_CELERY')}: {e}")
raise Exception (f"{_('ERROR_VERIFY_TASK_IS_RUNNING_ON_CELERY')}: {e}")
def extend_lock (task):
"""
Extend lock duration for task based on celery status
:param celery_task:
:param task:
:return:
"""
try:
extend = pycamunda.externaltask.ExtendLock(url=url,id_=task['id_'],new_duration=EXTEND_LOCK_DURATION, worker_id=worker_id)
extend()
except (Exception) as e:
logger.error(f"{_('ERROR_ON_EXTEND_LOCK_TIME')}: {e}")
raise Exception(f"{_('ERROR_ON_EXTEND_LOCK_TIME')}: {e}")
def map_wftask_param_to_bp_param_task(signature, task):
"""
This map the signature of task (celery task) in your Celery Project, with the camunda tasks variables
:param signature: The signature of task
:param task: Camunda Task
:return: Dict parsed of parameter to call
"""
call = {}
vars = pycamunda.variable.GetList(url=url,process_instance_id_in=task.process_instance_id)
vars_intances = vars()
vars = {}
for var in vars_intances:
if getattr(var,'type_') == "Object":
j = pycamunda.variable.Get(url=url, id_=var.id_, deserialize_value=True)
vars.update({getattr(var,'name'): j().value})
else:
pass
vars.update({getattr(var,'name'): var.value})
return vars
def handle_task(task: ExternalTask):
"""
This task handler is the hub to get task from camunda bpm-engine and switch to a Celery task
"""
logger.info(F"starting task:{type(task)} \n{task.__dict__['activity_id']}")
try:
#activityId = task._context['activityId']
activityId = task.__dict__['activity_id']
task_parsed = task.__dict__
"""
Check if this tasks is running it will depend of check_if_task_is_running
"""
is_running = check_if_task_is_running(task_parsed)
if is_running:
"""
task is running the extending lock and updating status
"""
extend_lock(task_parsed)
logger.info(F"{_('EXTENDING_LOCK_DURATION')}: {LOCK_DURATION} : TASK:{activityId}")
return
"""
The next line is very important becouse it will uopdate all variables every time that the worker check the process isntance
than if any change on processes varaiables will be updated
"""
del task_parsed['variables']
for t in app.tasks:
"""
Loop in Celery tasks to match activityid in bpmn model
Here is importante to understand that in same cases we can play the same activity many times on the same model, than each time you run the same activity on nmodel you need to add a numer after the id
like run_same_task and the next is run_same_task1
##
it will match the bpmn model name with celery task name and import the correct module and instance the celery task
parse the paramethers from celery tasks from BPMN variables
"""
if activityId in t or activityId.rstrip('0123456789') in t:
if activityId.rstrip('0123456789') in t:
module = t.split('.')
module = ".".join(module[:-1]).rstrip('0123456789')
activityId = activityId.rstrip('0123456789')
else:
module = t.split('.')
module = ".".join(module[:-1])
module = importlib.import_module(module)
working = getattr(module, activityId)
parameters = signature(working)
call = map_wftask_param_to_bp_param_task(parameters, task)
"""
Now we have mapped the TASK to name "working" and args to "call"
it is important note that complete_task and fail_task is the celery tasks that will be called after the assync tasks finishing
"""
try:
if len(list(parameters.parameters)) == 1:
"""
When the celery tasks has just one parameter
"""
celery_task = working.apply_async(args=[call],link=complete_task.s(task=task_parsed),
link_error=fail_task.s(task=task_parsed))
else:
"""
The celery_tasks has many parameters
"""
parameters = list(parameters.parameters)
true_call = {}
call = map_wftask_param_to_bp_param_task(parameters, task)
for x in parameters:
if x in call:
true_call.update({x: call[x]})
if "extra_data" in parameters:
true_call.update({'extra_data': call})
if "data" in parameters:
true_call.update({'data': call})
celery_task = working.s(**true_call).apply_async(link=complete_task.s(task=task_parsed),link_error=fail_task.s(task=task_parsed))
add_celery_task_to_process(celery_task,task_parsed)
except (TypeError) as e:
error = F"{e}"
logger.critical(f"{_('ERROR_ON_CALL_TASK')}:{type(e)}{e} ",exc_info=True)
except (BusinessException) as e:
failure = pycamunda.externaltask.HandleBPMNError(url=url, id_=task_parsed['id_'], worker_id=task_parsed['worker_id'],
error_message=f"{e.error_message}", error_code=f"{e.error_code}")
failure()
except (Exception) as e:
logger.error(f"ERROR_ON_EXCECUTE TASK: {e}", extra=task_parsed, exc_info=True)
fetch_and_lock = pycamunda.externaltask.FetchAndLock(url=url, worker_id=worker_id, max_tasks=5000)
print(f"Subscribing on Topic: {topics} ....... {LOCK_DURATION}")
while True:
try:
for topic in topics:
try:
fetch_and_lock.add_topic(name=topic, lock_duration=LOCK_DURATION)
except (Exception) as e:
logger.error(f"{_('ERROR_ON_SUBSCRIBE_BPMNEENGINE_TOPIC')}:{e}",
extra={'topic': 'topic', 'worker': worker_id})
tasks = fetch_and_lock()
for task in tasks:
try:
handle_task(task)
except (Exception) as e:
logger.critical(F"{_('ERROR_ON_DISPATHER')}: {e} ", exc_info=True)
except (Exception) as e:
logger.error(f"{_('ERROR_ON_RUN_TASK')}:{e}", extra={'topic': 'topic', 'worker': worker_id})
This is the celery Exporter that export events to opensearch
`import os
import pytz
from celery import Celery
from opensearchpy import OpenSearch
import logging
from gettext import gettext as _
from datetime import datetime
host = os.getenv('OPENSEARCH_HOST', 'localhost')
port = os.getenv('OPENSEARCH_PORT', '9200')
#CAUTION IS BETTER USER ENV HERE TOO
auth = ('YOUR_USER', 'YOUR_PASS')
index_name = 'celery_task'
update_url = f"https://{host}:{port}/{index_name}/_update"
import ast
import requests
import json
from nested_lookup import nested_lookup, get_all_keys
logger = logging
def send_activity(event):
try:
if event['state'] == 'RECEIVED':
event.update({'created_at': datetime.now(pytz.utc).isoformat()})
event['kwargs'] = ast.literal_eval(event['kwargs'])
try:
client = OpenSearch(hosts=[{'host': host, 'port': port}],
http_compress=True, # enables gzip compression for request bodies
http_auth=auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
event.update({'@timestamp': datetime.now(pytz.utc).isoformat() })
response = client.index(
index=index_name,
body=event,
op_type="index",
id = event['uuid']
)
except (Exception) as e:
raise Exception(f"{_('ERROR_ON_INDEX_TASK')}: {e}\n {event}")
else :
try:
if "args" in event:
del event['kwargs']
del event['kwargs']
doc = {'doc':event}
res = requests.post(F"{update_url}/{event['uuid']}", data=json.dumps(doc), auth=auth, verify=False, headers={'Content-Type': 'Application/json'})
#print (res.text)
except (Exception) as e:
logger.error(f"ERROR_ON_UPDATE_CELETY_TASK:{e}", exc_info=True)
except (Exception) as e:
logger.error(f"ERROR_ON_SENT_CELETY_TASK_TO_OPENSEARCH:{e}",exc_info=True)
def my_monitor(app):
state = app.events.State()
def process_event_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
if 'state' in event:
#task = state.tasks.get(event['uuid'])
send_activity(event)
#print (f"event_task:{json.dumps(event)} \n--")
with app.connection() as connection:
monitored_states = ['task-sent','task-received','task-started','task-succeeded','task-failed','task-rejected','task-revoked', "task-retried"]
rcv = app.events.Receiver(connection, handlers={'*': process_event_tasks})
rcv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
print ("Celery exporter .... ")
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)`
Hi folks
I have using, it to dev a Generic External Worker my proposition is run a worker that get the activity(task) and send it to Celery to run asynchronous and free the worker tread. In my case, i sent o celery and it is working very well but, the necessity is use a celery callback to complete the task when it run ok and a callback_error_handle when it fail. worker-->celery-->callback.
Anyony have idea what do it My code
my_task.apply_async(args=[call], link=complete_task.s(task=task), link_error=fail_task.s(task=task))
Where my_task is my celery task. it show the error:TypeError: 'ExternalTask' object is not iterable
it is my callback function: