SuperDuperDB / superduperdb

🔮 SuperDuperDB: Bring AI to your database! Build, deploy and manage any AI application directly with your existing data infrastructure, without moving your data. Including streaming inference, scalable model training and vector search.
https://superduperdb.com
Apache License 2.0
4.53k stars 443 forks source link

[BUG]: The dependency information is lost when submitting a task in Ray. #2188

Closed jieguangzhou closed 1 week ago

jieguangzhou commented 1 week ago

System Information

main

What happened?

Because the submission method of Ray was modified, the submitted future is lost, resulting in downstream tasks being unable to obtain dependency information.

If both listener_upstream and listener_downstream listen to the same table, and listener_downstream depends on the output of listener_upstream, then problems will occur.

self.G.nodes[a]['job'].future always return None

    def run_jobs(
        self,
    ):
        """Run all the jobs in this workflow."""
        pred = self.G.predecessors
        current_group = [n for n in self.G.nodes if not ancestors(self.G, n)]
        done = set()

        while current_group:
            for node in current_group:
                job: Job = self.G.nodes[node]['job']
                dependencies = [self.G.nodes[a]['job'].future for a in pred(node)]
                job(
                    self.database,
                    dependencies=dependencies,
                )
                done.add(node)

            current_group = [
                n for n in self.G.nodes if set(pred(n)) <= done and n not in done
            ]

Steps to reproduce

https://github.com/SuperDuperDB/superduper-apps/tree/feat/new-version

BRANCH: new-version

cd legal_tech_demo
python -m superduperdb local-cluster up
export SUPERDUPERDB_CONFIG=config.yaml
python setup.py --action=build --reset

Relevant log output

No response