masa-finance / masa-oracle

Masa Oracle: Decentralized Data Protocol 🌐
https://developers.masa.ai/docs/masa-protocol/welcome
MIT License
24 stars 19 forks source link

Feat(workers) implement adaptive worker selection for improved task distribution #589

Closed teslashibe closed 1 month ago

teslashibe commented 1 month ago

Priority-based Worker Selection for Twitter Tasks with Improved Availability Tracking

Problem Statement

Our current worker selection mechanism for Twitter tasks doesn't account for worker reliability and availability. This leads to inefficient task distribution, where unreliable or frequently unavailable workers may be selected, causing delays and failures in task execution. We need a more intelligent selection process that prioritizes workers based on their past performance and availability.

Solution Overview

We're implementing a priority-based worker selection mechanism for Twitter tasks. This new system will track worker performance metrics and availability, using this data to prioritize more reliable and available workers. For other task types, we'll maintain the existing round-robin selection to avoid potential regression.

Changes

Description

This PR introduces a priority-based worker selection mechanism for Twitter-related tasks in the Masa Oracle network. It leverages newly added performance metrics to choose the most reliable and available workers for Twitter tasks, while maintaining the existing round-robin behavior for other work types.

The changes improve reliability, efficiency, and predictability in task allocation across the network, particularly for Twitter-related work. By tracking both successful task completions and failures (including timeouts and DHT lookup failures), we can make more informed decisions about worker selection.

Key improvements:

  1. Prioritizes workers with higher success rates in returning tweets.
  2. Considers the recency of successful task completions.
  3. Penalizes workers with frequent timeouts or DHT lookup failures.
  4. Maintains a stable sorting order when performance data is limited or equal.

This approach should significantly reduce task failures and improve overall network efficiency for Twitter-related tasks.

TODO: Update NodeData Performance Metrics

To fully implement the performance-based worker selection:

  1. Update node_event_tracker.go to track and update the new metrics in NodeData.

  2. Implement a method in node_event_tracker.go to update these metrics:

    func (net *NodeEventTracker) UpdateWorkerPerformance(peerId string, success bool, tweetCount int, notFound bool) {
       // TODO: Implement this method to update NodeData performance metrics
    }
  3. Ensure updated NodeData is properly propagated to other nodes via the existing gossip mechanism.

  4. Add unit tests for the new sorting and metric update functionalities.

  5. Update documentation to reflect the new worker selection process.

  6. Implement a mechanism to periodically reset or decay the NotFoundCount and LastNotFoundTime.

  7. Consider implementing a threshold for NotFoundCount to temporarily exclude consistently unavailable nodes.

These updates will ensure that the priority-based selection for Twitter tasks is based on up-to-date performance data and node availability, improving the overall efficiency and reliability of the network.

teslashibe commented 1 month ago

@restevens402 can you work on the TODO list and push those commits in. We can then deploy to the bootnode and test on a new protocol version tonight/tomorrow, fix any bugs, then we can launch the testnet :)

teslashibe commented 1 month ago

@restevens402 there are currently two returned states that constitute a failed attempt at a remote worker:

  1. Not a timeout per-se but a worker is not available none the less so they are unable to do the work, therefore, they should not be prioritized next time. They are 'active' in nodeData but when libp2p tries to connect they are not found: WARN[0018] Failed to find peer 16Uiu2HAmFoUdEbgTzCNad2gEnakbxUKFDie8hfwwcZJyoghtmdVY in DHT: routing: not found

How do you think we should handle this case? I just added two new fields to nodeData to track this and used this in the following function:

// SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability.
// It uses multiple criteria to determine the reliability and performance of nodes:
//  1. Prioritizes nodes that have been found more often (lower NotFoundCount)
//  2. Considers the last time a node was not found (earlier LastNotFoundTime is better)
//  3. Sorts by higher number of returned tweets
//  4. Then by more recent last returned tweet
//  5. Then by lower number of timeouts
//  6. Then by less recent last timeout
//  7. Finally, sorts by PeerId for stability when no performance data is available
//
// The function modifies the input slice in-place, sorting the nodes from most to least reliable.
func SortNodesByTwitterReliability(nodes []NodeData) {
    sorter := NodeSorter{
        nodes: nodes,
        less: func(i, j NodeData) bool {
            // First, prioritize nodes that have been found more often
            if i.NotFoundCount != j.NotFoundCount {
                return i.NotFoundCount < j.NotFoundCount
            }
            // Then, consider the last time they were not found
            if !i.LastNotFoundTime.Equal(j.LastNotFoundTime) {
                return i.LastNotFoundTime.Before(j.LastNotFoundTime)
            }
            // Primary sort: Higher number of returned tweets
            if i.ReturnedTweets != j.ReturnedTweets {
                return i.ReturnedTweets > j.ReturnedTweets
            }
            // Secondary sort: More recent last returned tweet
            if !i.LastReturnedTweet.Equal(j.LastReturnedTweet) {
                return i.LastReturnedTweet.After(j.LastReturnedTweet)
            }
            // Tertiary sort: Lower number of timeouts
            if i.TweetTimeouts != j.TweetTimeouts {
                return i.TweetTimeouts < j.TweetTimeouts
            }
            // Quaternary sort: Less recent last timeout
            if !i.LastTweetTimeout.Equal(j.LastTweetTimeout) {
                return i.LastTweetTimeout.Before(j.LastTweetTimeout)
            }
            // Default sort: By PeerId (ensures stable sorting when no performance data is available)
            return i.PeerId.String() < j.PeerId.String()
        },
    }
    sort.Sort(sorter)
}
  1. Case two is where a worker has timed out, they were available in the DHT but they didn't process the work in the required time: ERRO[0172] error sending work to worker: 16Uiu2HAkwYxBtExeUERbisW8q3sHt8E2dhHDUqCXzSyfFg7Q2A8v: work execution timed out

This case is accounted for in the above implementation.

Therefore, can you focus on updating nodeData to have the right information contained with it.

Later on we should think about migrating nodeData to the blockchain with @mudler but this is not a priority right now.