PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.22k stars 1.58k forks source link

When a task is mapped over a list of iterables the task hierarchy is not correctly displayed. #6686

Closed robalar closed 2 years ago

robalar commented 2 years ago

First check

Bug summary

When a task is mapped over a list of iterables the task hierarchy is not correctly displayed. The mapped tasks are shown as connected to the flow root, not the task their data came from (see notes).

Reproduction

  1. Run the following flow code:
    
    from prefect import task, flow, get_run_logger

@task def sum_task(chunk: list[int]) -> int: logger = get_run_logger() logger.info(chunk) return sum(chunk)

@task def numbers() -> list[tuple[int, int]]: return [(0, 1), (2, 3), (3, 4)]

@flow def test_flow(): logger = get_run_logger()

n = numbers()
sums = sum_task.map(n)

logger.info("Sums: %s", [x.result() for x in sums])

if name == "main": test_flow()

2. Open Orion dashboard
3. Radar visualisation displays incorrect hierarchy

### Error

![Screenshot 2022-09-02 at 1 56 49 pm](https://user-images.githubusercontent.com/67542061/188169125-55f4f98d-f05c-469d-baf5-5600be0956cb.png)

### Browers

- [X] Chrome
- [X] Firefox
- [ ] Safari
- [ ] Edge

### Prefect version

Version: 2.2.0 API version: 0.8.0 Python version: 3.9.13 Git commit: e3651362 Built: Tue, Aug 23, 2022 2:18 PM OS/Arch: darwin/x86_64 Profile: default Server type: hosted



### Additional context

_No response_
bunchesofdonald commented 2 years ago

Thanks @robalar, this does appear to be broken, even when using .submit for the task run. We'll look into it.

zanieb commented 2 years ago

Is the data correct in the API and just displayed incorrectly? (I presume this is the case as we check relationship tracking in unit tests)

cc @zhen0 if this is broken for all task runs we may want to bump this to priority:high.

robalar commented 2 years ago

This is the API response from the graph endpoint (from a modified version of the provided flow code using submit):

[
  {
    "id": "c83a2a71-fdf9-44f8-8734-5bac0136e929",
    "upstream_dependencies": [],
    "state": {
      "id": "cd1b85e6-ce3c-4736-9f5e-c00f0a8438e8",
      "type": "COMPLETED",
      "name": "Completed",
      "timestamp": "2022-09-03T10:40:50.671043+00:00",
      "message": null,
      "data": {
        "encoding": "result",
        "blob": "{\"key\": \"92d19ac8aefb470b933479da233011bb\", \"filesystem_document_id\": \"98576fee-8ba9-4856-a029-21178719ed5a\"}"
      },
      "state_details": {
        "flow_run_id": "cf64bfb5-dfc8-4c57-a274-2a9e5d511967",
        "task_run_id": "c83a2a71-fdf9-44f8-8734-5bac0136e929",
        "child_flow_run_id": null,
        "scheduled_time": null,
        "cache_key": null,
        "cache_expiration": null
      }
    },
    "expected_start_time": "2022-09-03T10:40:50.551211+00:00",
    "start_time": "2022-09-03T10:40:50.614813+00:00",
    "end_time": "2022-09-03T10:40:50.671043+00:00",
    "total_run_time": 0.05623,
    "estimated_run_time": 0.05623
  },
  {
    "id": "ba73fe95-edd5-4a62-9a6c-16a5e209ed93",
    "upstream_dependencies": [],
    "state": {
      "id": "46da1b37-1e62-4db3-aa8a-ff2293d95ed0",
      "type": "COMPLETED",
      "name": "Completed",
      "timestamp": "2022-09-03T10:40:50.675025+00:00",
      "message": null,
      "data": {
        "encoding": "result",
        "blob": "{\"key\": \"4b4ea0de042148f286265c706cf87cbd\", \"filesystem_document_id\": \"98576fee-8ba9-4856-a029-21178719ed5a\"}"
      },
      "state_details": {
        "flow_run_id": "cf64bfb5-dfc8-4c57-a274-2a9e5d511967",
        "task_run_id": "ba73fe95-edd5-4a62-9a6c-16a5e209ed93",
        "child_flow_run_id": null,
        "scheduled_time": null,
        "cache_key": null,
        "cache_expiration": null
      }
    },
    "expected_start_time": "2022-09-03T10:40:50.548927+00:00",
    "start_time": "2022-09-03T10:40:50.633185+00:00",
    "end_time": "2022-09-03T10:40:50.675025+00:00",
    "total_run_time": 0.04184,
    "estimated_run_time": 0.04184
  },
  {
    "id": "66de94e9-a656-4b55-9133-ad866a7c5f47",
    "upstream_dependencies": [],
    "state": {
      "id": "c065715a-5f9a-417f-831d-e29aa1aea7eb",
      "type": "COMPLETED",
      "name": "Completed",
      "timestamp": "2022-09-03T10:40:50.651660+00:00",
      "message": null,
      "data": {
        "encoding": "result",
        "blob": "{\"key\": \"3a5fd2eaf7c14de39060d290e3b7025e\", \"filesystem_document_id\": \"98576fee-8ba9-4856-a029-21178719ed5a\"}"
      },
      "state_details": {
        "flow_run_id": "cf64bfb5-dfc8-4c57-a274-2a9e5d511967",
        "task_run_id": "66de94e9-a656-4b55-9133-ad866a7c5f47",
        "child_flow_run_id": null,
        "scheduled_time": null,
        "cache_key": null,
        "cache_expiration": null
      }
    },
    "expected_start_time": "2022-09-03T10:40:50.544852+00:00",
    "start_time": "2022-09-03T10:40:50.585907+00:00",
    "end_time": "2022-09-03T10:40:50.651660+00:00",
    "total_run_time": 0.065753,
    "estimated_run_time": 0.065753
  },
  {
    "id": "2438a2f5-5768-4061-bbc8-18d80121a859",
    "upstream_dependencies": [],
    "state": {
      "id": "01d38d64-39fb-453b-9d9d-18534a329eb3",
      "type": "COMPLETED",
      "name": "Completed",
      "timestamp": "2022-09-03T10:40:50.516115+00:00",
      "message": null,
      "data": {
        "encoding": "result",
        "blob": "{\"key\": \"012d6425417c4ac584cacfb7a701c29d\", \"filesystem_document_id\": \"98576fee-8ba9-4856-a029-21178719ed5a\"}"
      },
      "state_details": {
        "flow_run_id": "cf64bfb5-dfc8-4c57-a274-2a9e5d511967",
        "task_run_id": "2438a2f5-5768-4061-bbc8-18d80121a859",
        "child_flow_run_id": null,
        "scheduled_time": null,
        "cache_key": null,
        "cache_expiration": null
      }
    },
    "expected_start_time": "2022-09-03T10:40:50.400697+00:00",
    "start_time": "2022-09-03T10:40:50.477160+00:00",
    "end_time": "2022-09-03T10:40:50.516115+00:00",
    "total_run_time": 0.038955,
    "estimated_run_time": 0.038955
  }
]

@madkinsz I'm assuming that upstream_dependencies should be populated for the sum_task entries here, so this is potentially not a UI issue as it is displaying what the API is returning.

robalar commented 2 years ago

Continued digging and found that the bug is not just for lists of iterables but just for any list by the looks of it.

from prefect import task, flow, get_run_logger

@task
def log_task(x: int):
    logger = get_run_logger()
    logger.info(x)

@task
def flat_numbers() -> list[int]:
    return [0, 1, 2, 3, 4]

@flow
def test_flow():
    x = flat_numbers.submit()
    log_task.map(x)

if __name__ == "__main__":
    test_flow()

yields:

Screenshot 2022-09-03 at 1 51 21 pm

However! adding wait_for to the mapped task like so:

from prefect import task, flow, get_run_logger

@task
def log_task(x: int):
    logger = get_run_logger()
    logger.info(x)

@task
def flat_numbers() -> list[int]:
    return [0, 1, 2, 3, 4]

@flow
def test_flow():
    x = flat_numbers.submit()
    log_task.map(x, wait_for=[x])

if __name__ == "__main__":
    test_flow()

yields:

Screenshot 2022-09-03 at 1 52 41 pm

This is weird because the execution seems to respect the relationship without the additional wait_for, but the graph doesn't.

I would love to create a patch for this but may be slightly out of my depth, will keep digging and see.

robalar commented 2 years ago

The plot thickens.....

I believe the issue stems from this line: https://github.com/PrefectHQ/prefect/blob/a48eb57ded87300a165090517a829afcf54e5c4b/src/prefect/engine.py#L755

Which executes the PrefectFuture passed into .map to enable the checking of lengths, but in doing so discards the metadata about where this data came from - the mapped task essential treats it as a constant.

One way of solving this would be to merge the futures passed as args with the wait_for futures passed from .map and pass that onto the mapped task, essentially replicating what I did to get the correct behaviour above.