Closed sailist closed 9 months ago
will take a look this issue today
do you mean you can not create the workflow via python sdk? or could not run it?
I am able to create workflows normally, but according to my analysis, the process of creating a workflow should automatically create or update tasks in the workflow.
However, currently, the Python SDK creates workflows but does not correctly create tasks (at lease not show in ui, and http api can not get task created in this way, neither).
This results in workflows with single tasks that have no dependencies being able to run normally, but workflows with task dependencies cannot be properly created. The database cannot read the relationships of these tasks (I guess), leading to a restart loop.
Last night, I attempted to replace the workflow.submit
function with my custom submit()
function that uses the HTTP API. However, the API list in swagger-ui/index.html?urls.primaryName=v1
still not functioning properly. Ultimately, I reverse-engineered the API through the UI interface and successfully workedaround this issue. Currently my program is running smoothly.
the replaced submit function is posted here, hoping it's helpful.
import json
def get_task_code(project_code, num=1) -> List[int]:
ret = auth_request(
requests.get,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}",
)
return json.loads(ret.content)["data"]
def get_project(name: str):
ret = requests.get(
"http://172.32.2.55:12345/dolphinscheduler/projects/list",
headers={
"token": AUTH_TOKEN,
"Accept": "application/json",
},
)
for i in json.loads(ret.content)["data"]:
if name == i["name"]:
return i
return None
def get_workflow(project_code: int, name: str):
ret = requests.get(
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list",
headers={
"token": "25fc3ddf36991fe6f95ce6bdd0d36448",
"Accept": "application/json",
},
)
for workflow in json.loads(ret.content)["data"]:
if workflow["processDefinition"]["name"] == name:
return workflow["processDefinition"]
return None
def get_tasks(project_code: int):
ret = requests.get(
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30",
headers={
"token": "25fc3ddf36991fe6f95ce6bdd0d36448",
"Accept": "application/json",
},
)
return json.loads(ret.content)["data"]["totalList"]
def get_resource_id(name: str):
ret = auth_request(
requests.get,
"http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0",
)
# breakpoint()
for resource in json.loads(ret.content)["data"]:
if resource["name"] == name:
return resource["id"]
raise KeyError()
def release(project_code, workflow_code):
auth_request(
requests.post,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release",
)
def submit(workflow: Workflow, online=True):
workflow_def = workflow.get_define()
project = get_project(workflow_def["project"])
project_code = project["code"]
url_workflow = get_workflow(project_code, name=workflow_def["name"])
# [{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]
tasks = get_tasks(project_code)
task_codes = set([task["taskCode"] for task in tasks])
locations = []
x_offset = 100
y_offset = 100
task_code_map = {}
for task in workflow_def["taskDefinitionJson"]: # type: dict
task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
task.setdefault("projectName", project["name"])
task.setdefault("projectCode", project["code"])
task.setdefault("operator", 1)
task.setdefault("modifyBy", None)
task["delayTime"] = str(task["delayTime"])
task["description"] = str(task["description"])
task["failRetryTimes"] = str(task["failRetryInterval"])
task["failRetryInterval"] = str(task["failRetryInterval"])
if task.get("environmentCode", None) is None:
task["environmentCode"] = -1
if task.get("timeoutNotifyStrategy", None) is None:
task["timeoutNotifyStrategy"] = ""
task.setdefault("userName", None)
task.setdefault("userId", 1)
task.setdefault("taskParamList", [])
task.setdefault("taskParamMap", {})
task.setdefault("taskExecuteType", "BATCH")
# locations.append(
# {
# "taskCode": task["code"],
# "x": x_offset,
# "y": y_offset,
# }
# )
x_offset += 30
y_offset += 30
# not needed
# if task["code"] not in task_codes:
# gen_task_code = get_task_code(project_code)[0]
# task_code_map.setdefault(task["code"], gen_task_code)
# task["code"] = gen_task_code
# ret = create_single_task(project_code, url_workflow["code"], task)
for relation in workflow_def["taskRelationJson"]:
relation["preTaskCode"] = task_code_map.get(
relation["preTaskCode"], relation["preTaskCode"]
)
relation["postTaskCode"] = task_code_map.get(
relation["postTaskCode"], relation["postTaskCode"]
)
data = {
"taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]),
"taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
"locations": "",
"name": workflow_def["name"],
"executionType": workflow_def["executionType"],
"description": workflow_def["description"],
"globalParams": json.dumps(workflow.param_json),
"timeout": workflow_def["timeout"],
"releaseState": "ONLINE" if online else "OFFLINE",
}
assert project is not None, workflow_def["project"]
workflow = get_workflow(project["code"], workflow_def["name"])
if workflow is None:
# create
ret = auth_request(
requests.post,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
data=data,
)
else:
if workflow_def["releaseState"] == 1:
release(project_code, workflow["code"])
# update
ret = auth_request(
requests.put,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
data=data,
)
print(json.loads(ret.content))
btw, sub workflow is also not worked properly, I workaround it by replace definition of process_definition_code:
class CustomSubWorkflow(Task):
"""Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler."""
_task_custom_attr = {"process_definition_code"}
def __init__(self, name: str, workflow_name: str, *args, **kwargs):
super().__init__(name, TaskType.SUB_WORKFLOW, *args, **kwargs)
self.workflow_name = workflow_name
@property
def process_definition_code(self) -> str:
"""Get workflow code, a wrapper for :func:`get_workflow_info`.
We can not change this function name to workflow_code, because it is a keyword used in
dolphinscheduler itself.
"""
project_code = get_project(self.workflow.project.name)["code"]
return get_workflow(project_code, self.workflow_name)["code"]
In my experience, perhaps a Python SDK that purely utilizes the HTTP API might be better.
I think the problem here is that when submitting with the Python SDK, the task definition isn't following the right task schema.
I am able to create workflows normally, but according to my analysis, the process of creating a workflow should automatically create or update tasks in the workflow.
However, currently, the Python SDK creates workflows but does not correctly create tasks (at lease not show in ui, and http api can not get task created in this way, neither).
This results in workflows with single tasks that have no dependencies being able to run normally, but workflows with task dependencies cannot be properly created. The database cannot read the relationships of these tasks (I guess), leading to a restart loop.
Last night, I attempted to replace the
workflow.submit
function with my customsubmit()
function that uses the HTTP API. However, the API list inswagger-ui/index.html?urls.primaryName=v1
still not functioning properly. Ultimately, I reverse-engineered the API through the UI interface and successfully workedaround this issue. Currently my program is running smoothly.the replaced submit function is posted here, hoping it's helpful.
import json def get_task_code(project_code, num=1) -> List[int]: ret = auth_request( requests.get, f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}", ) return json.loads(ret.content)["data"] def get_project(name: str): ret = requests.get( "http://172.32.2.55:12345/dolphinscheduler/projects/list", headers={ "token": AUTH_TOKEN, "Accept": "application/json", }, ) for i in json.loads(ret.content)["data"]: if name == i["name"]: return i return None def get_workflow(project_code: int, name: str): ret = requests.get( f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list", headers={ "token": "25fc3ddf36991fe6f95ce6bdd0d36448", "Accept": "application/json", }, ) for workflow in json.loads(ret.content)["data"]: if workflow["processDefinition"]["name"] == name: return workflow["processDefinition"] return None def get_tasks(project_code: int): ret = requests.get( f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30", headers={ "token": "25fc3ddf36991fe6f95ce6bdd0d36448", "Accept": "application/json", }, ) return json.loads(ret.content)["data"]["totalList"] def get_resource_id(name: str): ret = auth_request( requests.get, "http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0", ) # breakpoint() for resource in json.loads(ret.content)["data"]: if resource["name"] == name: return resource["id"] raise KeyError() def release(project_code, workflow_code): auth_request( requests.post, f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release", ) def submit(workflow: Workflow, online=True): workflow_def = workflow.get_define() project = get_project(workflow_def["project"]) project_code = project["code"] url_workflow = get_workflow(project_code, name=workflow_def["name"]) # [{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}] tasks = get_tasks(project_code) task_codes = set([task["taskCode"] for task in tasks]) locations = [] x_offset = 100 y_offset = 100 task_code_map = {} for task in workflow_def["taskDefinitionJson"]: # type: dict task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) task.setdefault("projectName", project["name"]) task.setdefault("projectCode", project["code"]) task.setdefault("operator", 1) task.setdefault("modifyBy", None) task["delayTime"] = str(task["delayTime"]) task["description"] = str(task["description"]) task["failRetryTimes"] = str(task["failRetryInterval"]) task["failRetryInterval"] = str(task["failRetryInterval"]) if task.get("environmentCode", None) is None: task["environmentCode"] = -1 if task.get("timeoutNotifyStrategy", None) is None: task["timeoutNotifyStrategy"] = "" task.setdefault("userName", None) task.setdefault("userId", 1) task.setdefault("taskParamList", []) task.setdefault("taskParamMap", {}) task.setdefault("taskExecuteType", "BATCH") # locations.append( # { # "taskCode": task["code"], # "x": x_offset, # "y": y_offset, # } # ) x_offset += 30 y_offset += 30 # not needed # if task["code"] not in task_codes: # gen_task_code = get_task_code(project_code)[0] # task_code_map.setdefault(task["code"], gen_task_code) # task["code"] = gen_task_code # ret = create_single_task(project_code, url_workflow["code"], task) for relation in workflow_def["taskRelationJson"]: relation["preTaskCode"] = task_code_map.get( relation["preTaskCode"], relation["preTaskCode"] ) relation["postTaskCode"] = task_code_map.get( relation["postTaskCode"], relation["postTaskCode"] ) data = { "taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]), "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]), "locations": "", "name": workflow_def["name"], "executionType": workflow_def["executionType"], "description": workflow_def["description"], "globalParams": json.dumps(workflow.param_json), "timeout": workflow_def["timeout"], "releaseState": "ONLINE" if online else "OFFLINE", } assert project is not None, workflow_def["project"] workflow = get_workflow(project["code"], workflow_def["name"]) if workflow is None: # create ret = auth_request( requests.post, f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition", data=data, ) else: if workflow_def["releaseState"] == 1: release(project_code, workflow["code"]) # update ret = auth_request( requests.put, f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}", data=data, ) print(json.loads(ret.content))
could you share what kind of task do you use in pydolphinschsduler?
Hi @sailist this issue fix in #140
I find DolphinScheduler to be an incredibly useful and powerful tool for enhancing efficiency. Currently I use it to daily build my repo and run regression test which may not be worth sharing. I am still exploring scenarios that are suitable for delegating tasks to DolphinScheduler. Perhaps a little later, when the project becomes more complex, I will be willing to do this sharing.
As I create more workflows and aim for easier management of data sources and environments, I may consider redeveloping a python SDK, which will transition the backend from Py4J to an HTTP API to incorporate functionalities that Py4J cannot provide.
I hope to get your opinion on this. Do you think resource management and other functions that HTTP Api have can be achieved through Py4J backend? Do you think this is a good way?
I find DolphinScheduler to be an incredibly useful and powerful tool for enhancing efficiency. Currently I use it to daily build my repo and run regression test which may not be worth sharing. I am still exploring scenarios that are suitable for delegating tasks to DolphinScheduler. Perhaps a little later, when the project becomes more complex, I will be willing to do this sharing.
sound cool, currently most of our users use dolhinscheduler to schedule data task workflow, like sql, spark, or hive job. And you put it in the new fields. May i ask you did it via ui or python sdk?
Further more, could you share some of sample you running in your production?
As I create more workflows and aim for easier management of data sources and environments, I may consider redeveloping a python SDK, which will transition the backend from Py4J to an HTTP API to incorporate functionalities that Py4J cannot provide.
The init reason we use py4j instead of http are
I hope to get your opinion on this. Do you think resource management and other functions that HTTP Api have can be achieved through Py4J backend? Do you think this is a good way?
of cause we can add more entry point for python gateway and support all the resources, or we can reuse the exists http protocol
I guess, which cause problem when running tasks with relationship. The second task will re-start again and again.
environmen
py: install from source dolphinscheduler: 3.2.0 start from docker