Open Tom-Newton opened 12 months ago
Thank you for opening your first issue here! π
cc @pingsutw when we added system tags did we add a query method for this - in list executions?
Thanks for the replies. I gave the tags filtering a try but I don't really understand how the tags can help. I want to see the status of all the nodes in a particular workflow quickly but I can't find any API that can list all the nodes in a workflow. All the APIs seem to list only immediate child nodes, rather that the whole graph.
I don't really think filtering is the problem, all the APIs I've tested already have too much filtering.
I would still be keen to know if there is a better way but I ended up creating a version of remote.sync_execution(sync_nodes=True)
that make gRPC requests in many parallel threads by wrapping iterate_node_executions
and running it with asyncio
. Now the 12 minutes is now more like 10 seconds.
This is awesome, @Tom-Newton... flytekit (particularly remote) needs asyncio everywhere... Would you be open to redoing your change within flytekit codebase to support asyncIO? even if we do it one API at a time (or create a separate asyncRemote type with smaller interface initially until we migrate all APIs)
What I did was a fairly naive approach that could definitely be optimised. The best way though would be to add native support for asyncio
in flytekit
as you say. Anything that uses gRPC could probably benefit. I'm a total noob when it comes to asyncio
and to gRPC but it looks like we could use https://grpc.github.io/grpc/python/grpc_asyncio.html. Potentially I could contribute to this but probably not in the near term.
If anyone is interested in the code I'm using personally
flyte_remote.py.txt (Its got a .txt
extension because github doesn't allow uploading .py
)
I just saw https://github.com/flyteorg/flyte/issues/2193 which seems to be about an async interface to flyte admin, which is one possible solution to this
π€
Hey @Tom-Newton! Great to see you diving into the depths of Flyte workflows again. Looks like we're on another interesting adventure. π
Based on your requirements, I would recommend using the ListNodeExecutions
function in the NodeExecutionManager
class. This function lists all node executions for a given workflow execution and can be used to efficiently query the status of large workflows in Flyte.
Here is a sample usage of the ListNodeExecutions
function:
nodeExecutionListRequest := admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "my_project",
Domain: "my_domain",
Name: "my_workflow_execution_name",
},
Filters: "eq(phase, SUCCEEDED)",
Limit: 100,
Token: "",
SortBy: &admin.Sort{Key: "created_at", Direction: admin.Sort_DESCENDING},
}
nodeExecutionList, err := nodeExecutionManager.ListNodeExecutions(context.Background(), nodeExecutionListRequest)
if err != nil {
log.Fatalf("Failed to list node executions: %v", err)
}
for _, nodeExecution := range nodeExecutionList.NodeExecutions {
fmt.Printf("Node execution: %v\n", nodeExecution)
}
In this example, the Filters
field is used to filter node executions by their phase. The Limit
field is used to limit the number of node executions returned in a single request. The Token
field is used for pagination. The SortBy
field is used to sort the node executions by their creation time in descending order.
Please note that you need to replace "my_project"
, "my_domain"
, and "my_workflow_execution_name"
with your actual project, domain, and workflow execution name. Also, replace "eq(phase, SUCCEEDED)"
with your actual filter condition.
The ListNodeExecutions
function handles large workflows with thousands of nodes by implementing pagination. This is done by using a limit and an offset to retrieve a subset of node executions at a time. If the number of retrieved node executions equals the limit, a new token is created for the next page. This token is the sum of the offset and the number of retrieved node executions. This way, the function can handle large workflows with thousands of nodes by retrieving and processing a manageable number of nodes at a time.
You can find more details about the ListNodeExecutions
function in the flyteadmin/pkg/manager/impl/node_execution_manager.go file.
Regarding the async interface to Flyte admin discussed in issue #2193, it could potentially improve the performance of querying the status of large workflows. However, it would require significant changes to the Flyte admin and might not be available in the near future. For now, I would recommend using the ListNodeExecutions
function as it is designed to handle large workflows efficiently.
I hope this helps! If you have any further questions, feel free to ask.
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a π if this is helpful and π if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Motivation: Why do you think this is important?
For large workflows with ~1000s of nodes its difficult to monitor the overall progress. At this scale we need to be able to get things like counts of how many nodes or sub-workflows are in each status. e.g. 900 success, 200: unknown 2 failed, 98: in progress.
I think supporting large workflows like this would be a valuable feature and its critical to what I want to do with flyte.
Goal: What should the final outcome look like, ideally?
I think ideal would be an interface similar to
remote.sync_execution(sync_nodes=True)
but fast for large workflows. This would be very flexible.Other interfaces would also be fine - I'm mostly just interested in it being fast.
Describe alternatives you've considered
The flyte UI: It displays lists and graphs but at the scale of 1000s of nodes these are impossible to parse by eye. Additionally in tends to crash my browser.
flytectl get execution
Can get information about nodes when using--details
but it seems to be incomplete. Writing to a.yaml
file and searching, I find quite a lot of nodes are missing.flytekit
remote.sync_execution(sync_nodes=True)
This does fetch all the important information and could certainly be parsed by some python code to extra whatever metrics are needed. The problem is that it takes about 12 minutes to run on a workflow with 3000 nodes. EDIT: It actually doesn't fetch information about nodes that haven't started processing yet. So nodes that would show with unknown status on the UI are missed.Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
Have you read the Code of Conduct?