PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17k stars 1.62k forks source link

CLI registration inconsistently bumps flow version even when metadata hasn't changed #4618

Closed marvin-robot closed 3 years ago

marvin-robot commented 3 years ago

Opened from the Prefect Public Slack Community

dkmarsh: Hi Everyone, I'm having an issue with registering flows. Whenever I register the same flow, it bumps the version, even if the metadata is unchanged. I've tried using the CLI as well as flow.register(). I've also tried using flow.register('project_1', idempotency_key=flow.serialized_hash()) and it still bumps the version. I am using a local agent, local storage and the cloud backend. Is there another configuration that needs to be set so this doesn't happen? Or should this all be done in docker?

kevin701: Hey <@U022J2DB0UR>, I don’t think moving to Docker will do this. Will ask the team to be sure but what is your use case that you need the version pinned? We might be able to solve this some other way.

ranugoldan: I once tried with cli command prefect register flow it doesn't bump version, but with prefect register it does

kevin701: Actually that’s right <@ULCV623AT>! I know some people on the other side have been asking why their flow is not bumping version :sweat_smile:

amanda.wee: <@U01QEJ9PP53> when should the flow version be bumped? In my ECS-based setup with a bunch of flows packaged together with a local agent, I build the Docker image each time we make a change, even if it is only to tweak a single task. When the Docker container starts up, it runs a shell script that runs the Python scripts that define and register the flows (i.e., not using the CLI). Like what <@ULCV623AT> observed, with serialized_hash() the flow versions kept increasing even if it was just because the ECS task was restarted, thereby starting up the Docker container again.

My suspicion was that some Docker container metadata was changing each time, e.g., something host-related, so the serialized hash computation changed. My solution was to write my own hash function that took into account the dependencies (including Prefect version) and the code content, but I wonder if that is overkill.

kevin701: Hey <@U01DJDK6AQ7>, correct me if I’m wrong but I think what you’re referring to is that DockerStorage was not respecting the cache and rebuilding everything. Have you seen this <https://github.com/PrefectHQ/prefect/pull/4584|recent PR> that fixed it?.

amanda.wee: No, I'm using S3 storage. The Docker image is for ECS, not for Prefect.

kevin701: Oh I see what you mean. Ok I’ll ask the team and get back to you

dkmarsh: <@U01QEJ9PP53> It was my understanding that the version should only be bumped when the metadata changes. My use case is to automate the registering of flows once they are added or changed by a developer. I wrote a function that looks in a directory and collects all flows and registers them. I would like to not have the versions bumped on flows that have already been registered and have had no changes.

Interestingly enough, I tried testing again this morning and discovered that in using the CLI or python api to register, the version will get bumped every other time. For example, if I run prefect register --project Project1 -p src/register.py -n Flow1 it will register the flow, then if I run the same command a few seconds later, it will skip with the message: "Skipped (metadata unchanged)". However, running it a third time, it will register the flow as version 2. So it seems to be skipping the registration as desired every other time.

znicholasbrown: Hi <@U022J2DB0UR> - this sounds like an issue with CLI registration; I'm going to open an issue from this thread for the Core team to look into.

znicholasbrown: <@ULVA73B9P> open "CLI registration inconsistently bumps flow version even when metadata hasn't changed"

Original thread can be found here.

iRod3s commented 3 years ago

I am experiencing the exact same problem.

zanieb commented 3 years ago

Hi! We're going to need a minimal reproducible example here. A very simple registration pattern does not encounter this behavior

~                                                                                                                                                                                                   prefect-dev-38
❯ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
  Building `Module` storage...
  Registering 'hello-world'... Done
  └── ID: c81cf033-4458-4aac-a4ee-f975041da199
  └── Version: 11
======================== 1 registered ========================

~                                                                                                                                                                                                   prefect-dev-38
❯ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
  Building `Module` storage...
  Registering 'hello-world'... Skipped (metadata unchanged)
================== 0 registered, 1 skipped ==================

~                                                                                                                                                                                                   prefect-dev-38
❯ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
  Building `Module` storage...
  Registering 'hello-world'... Skipped (metadata unchanged)
================== 0 registered, 1 skipped ==================

~                                                                                                                                                                                                   prefect-dev-38
❯ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
  Building `Module` storage...
  Registering 'hello-world'... Skipped (metadata unchanged)
================== 0 registered, 1 skipped ==================

~                                                                                                                                                                                                   prefect-dev-38
❯ 
lbhtran commented 3 years ago

hi, I'm having this exact problem too. I'm using prefect register -m prefect.hello_world --project example

and it always bumps the version of the flow

zanieb commented 3 years ago

Hi @lbhtran -- can you share the output? Running the examples I provided before still results in skipping for me.

lbhtran commented 3 years ago

this is what I'm running

root@344629af97b2:/allica_prefect# prefect register --module flows.demo --project allica_data
Collecting flows...
Processing 'flows.demo':
  Building `Azure` storage...
[2021-11-01 19:35:13+0000] INFO - prefect.Azure | Uploading demo-flow-as-module/2021-11-01t19-35-13-279866-00-00 to prefect
[2021-11-01 19:35:13+0000] INFO - prefect.Azure | Uploading demo-flow-as-module2/2021-11-01t19-35-13-279970-00-00 to prefect
  Registering 'demo-flow-as-module'... Done
  └── ID: 481e4f82-e805-408e-a2c0-76c6432ece30
  └── Version: 6
  Registering 'demo-flow-as-module2'... Done
  └── ID: c301fc75-b9ea-49ad-8ea8-d4ee16ca91a4
  └── Version: 4
======================== 2 registered ========================
root@344629af97b2:/allica_prefect# prefect register --module flows.demo --project allica_data
Collecting flows...
Processing 'flows.demo':
  Building `Azure` storage...
[2021-11-01 19:35:17+0000] INFO - prefect.Azure | Uploading demo-flow-as-module/2021-11-01t19-35-17-497573-00-00 to prefect
[2021-11-01 19:35:17+0000] INFO - prefect.Azure | Uploading demo-flow-as-module2/2021-11-01t19-35-17-497672-00-00 to prefect
  Registering 'demo-flow-as-module'... Done
  └── ID: f369d018-2782-4743-ab07-9bf0473ff35d
  └── Version: 7
  Registering 'demo-flow-as-module2'... Done
  └── ID: 9cd6510f-6402-4e70-a091-bf80ff2aa189
  └── Version: 5
======================== 2 registered ========================
root@344629af97b2:/allica_prefect# prefect register --module flows.demo --project allica_data
Collecting flows...
Processing 'flows.demo':
  Building `Azure` storage...
[2021-11-01 19:35:21+0000] INFO - prefect.Azure | Uploading demo-flow-as-module/2021-11-01t19-35-21-328814-00-00 to prefect
[2021-11-01 19:35:21+0000] INFO - prefect.Azure | Uploading demo-flow-as-module2/2021-11-01t19-35-21-328960-00-00 to prefect
  Registering 'demo-flow-as-module'... Done
  └── ID: 4a7c2f22-3b06-4f87-b5bd-db4b6e72cfb6
  └── Version: 8
  Registering 'demo-flow-as-module2'... Done
  └── ID: 9cb1d2ce-f0dc-47c9-927a-68c0d592583f
  └── Version: 6
======================== 2 registered ========================

There is no changes in both flows but their versions keep bumped up

iRod3s commented 3 years ago

Hello,

In case it would be useful, I was using Azure Storage as well.

I'm sorry for not responding, but for some reason I wasn't getting the notifications.

zanieb commented 3 years ago

Hi

In both of these, your flow's storage is being built and the timestamp is used as the default file name so the location of the flow is changing. Registration is a two step process

Building storage always occurs when you register the flow. If the storage location does not change, it will not be re-registered. Here, the storage location changes so the flow metadata changes so a new version is registered.

iRod3s commented 3 years ago

Thank you very much for the clarification.

Is there any way to prevent this from happening? I mean, keep using Azure Storage but without the version counter going up due to the default name?

iRod3s commented 3 years ago

Checking the [docs] (https://docs.prefect.io/api/latest/storage.html#azure) again, i saw the 'blob_name' parameter. Setting this parameter to, for example, the flow name should fix the issue?

(I cant check this atm, will try it in 2 days when I get back home and, if not answered, will update)

zanieb commented 3 years ago

Setting the name of the blob should indeed solve this. You can see where the dynamic default is generated at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/storage/azure.py#L113-L116

lbhtran commented 3 years ago

hi, I'm trying this with with Flow('demo-flow-as-module2', run_config=kubernetes_run, storage=Azure(container="prefect", blob_name='demo-flow-as-module-2')) as flow: for the 2nd flow. This is what I'm getting

root@60e7060bfadb:/allica_prefect# prefect register --project allica_data --module flows.demo
Collecting flows...
Processing 'flows.demo':
  Building `Azure` storage...
[2021-11-02 09:43:42+0000] INFO - prefect.Azure | Uploading demo-flow-as-module/2021-11-02t09-43-42-605783-00-00 to prefect
  Registering 'demo-flow-as-module'... Done
  └── ID: adf8f9d8-4f0d-4fe0-9dad-dbf3806712b7
  └── Version: 9
  Building `Azure` storage...
[2021-11-02 09:43:43+0000] INFO - prefect.Azure | Uploading demo-flow-as-module-2 to prefect
  Registering 'demo-flow-as-module2'... Done
  └── ID: 14291fcb-05e2-4ce0-a810-e0902a8cf3de
  └── Version: 7
======================== 2 registered ========================
root@60e7060bfadb:/allica_prefect# prefect register --project allica_data --module flows.demo
Collecting flows...
Processing 'flows.demo':
  Building `Azure` storage...
[2021-11-02 09:43:47+0000] INFO - prefect.Azure | Uploading demo-flow-as-module/2021-11-02t09-43-46-863810-00-00 to prefect
  Registering 'demo-flow-as-module'... Done
  └── ID: c62be062-6be5-4e36-b789-4e5215584b98
  └── Version: 10
  Building `Azure` storage...
[2021-11-02 09:43:47+0000] INFO - prefect.Azure | Uploading demo-flow-as-module-2 to prefect
    Error building storage:
      Traceback (most recent call last):
        File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 463, in build_and_register
    storage.build()
        File "/usr/local/lib/python3.8/site-packages/prefect/storage/azure.py", line 151, in build
    client.upload_blob(data)
        File "/usr/local/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
    return func(*args, **kwargs)
        File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py", line 713, in upload_blob
    return upload_block_blob(**options)
        File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_upload_helpers.py", line 168, in upload_block_blob
    process_storage_error(error)
        File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py", line 177, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
        File "<string>", line 1, in <module>
        File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_upload_helpers.py", line 99, in upload_block_blob
    return client.upload(
        File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 245, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/usr/local/lib/python3.8/site-packages/azure/core/exceptions.py", line 105, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:d13daa43-501e-0060-03ce-cfcc0c000000
Time:2021-11-02T09:43:47.3159055Z
ErrorCode:BlobAlreadyExists

  Registering 'demo-flow-as-module2'... Error
================== 1 registered, 1 errored ==================
root@60e7060bfadb:/allica_prefect# 
lbhtran commented 3 years ago

also, is there a way to set the blob name dynamically? I'm having a utility function where I set azure_store = Azure(container="prefect") and reuse across my project. It's not convenient to set blob name for each flow

zanieb commented 3 years ago

Here's PR that enables overwrites for existing blob names so you can use a consistent name https://github.com/PrefectHQ/prefect/pull/5103

also, is there a way to set the blob name dynamically? I'm having a utility function where I set azure_store = Azure(container="prefect") and reuse across my project. It's not convenient to set blob name for each flow

You can import the flows and set the storage dynamically ie

from flows.demo import my_flow, my_other_flow

for flow in [my_flow, my_other_flow]:
    flow.storage = Azure(container="prefect", blob_name=flow.name)
iRod3s commented 3 years ago

@madkinsz I have been following your fix and I want to thank you for your work. 😊

zanieb commented 3 years ago

@iRod3s thank you & you're welcome :)

lbhtran commented 2 years ago

hi, thank you for the fix. I've just done some tests on this. There are a few inconsistencies I'm afraid.

  1. It works when I tried this

    with Flow(FLOW_NAME,
          storage=Azure(blob_name=FLOW_NAME, container='prefect', overwrite=True),
          run_config=kubernetes_run) as flow:

    But it only works the first time, if I rerun the registration command, it bumps the version up again

    (venv) alex-MacBook-Pro Prefect % docker run --rm -it --entrypoint /entrypoint-dev.sh --env-file .env localhost:5000/test:app              
    Collecting flows...
    Processing 'projects.demo.flows':
    Building `Azure` storage...
    [2021-11-22 13:16:59+0000] INFO - prefect.Azure | Uploading demo-etl-actions-flow-as-module to prefect
    Registering 'demo-etl-actions-flow-as-module'... Skipped (metadata unchanged)
    ================== 0 registered, 1 skipped ==================
    (venv) alex-MacBook-Pro Prefect % docker run --rm -it --entrypoint /entrypoint-dev.sh --env-file .env localhost:5000/test:app                                      
    Collecting flows...
    Processing 'projects.demo.flows':
    Building `Azure` storage...
    [2021-11-22 13:17:14+0000] INFO - prefect.Azure | Uploading demo-etl-actions-flow-as-module to prefect
    Registering 'demo-etl-actions-flow-as-module'... Done
    └── ID: b29c4615-3be4-4f8f-ba1b-b1754a681e14
    └── Version: 10
    ======================== 1 registered ========================
  2. If I use what you suggested above to set the blob name dynamically like this

    
    from flows.demo import my_flow, my_other_flow

for flow in [my_flow, my_other_flow]: flow.storage = Azure(container="prefect", blob_name=flow.name)


I get the error

Processing 'projects.demo.flows': Traceback (most recent call last): File "/usr/local/bin/prefect", line 8, in sys.exit(cli()) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1128, in call return self.main(args, kwargs) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1053, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1637, in invoke super().invoke(ctx) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1395, in invoke return ctx.invoke(self.callback, ctx.params) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 754, in invoke return __callback(args, kwargs) File "/usr/local/lib/python3.8/site-packages/click/decorators.py", line 26, in new_func return f(get_current_context(), *args, *kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 65, in inner return func(args, kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 827, in register register_internal( File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 551, in register_internal stats += build_and_register( File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 447, in build_and_register prepare_flows(flows, labels) File "/usr/local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 320, in prepare_flows flow.storage.add_flow(flow) File "/usr/local/lib/python3.8/site-packages/prefect/storage/azure.py", line 111, in add_flow raise ValueError( ValueError: Name conflict: Flow with the name "demo-etl-actions-flow-as-module" is already present in this storage.


It looks like so far the only place I could get this to work is set it when initiating the flow context like in (1). However, with that method, it also inconsistently bumps the version up. I checked the blob storage, there is only one blob file for this flow now instead of saving a new one everytime it was registered previously.
zanieb commented 2 years ago

@lbhtran it looks like you've left override=True out of the dynamic example in (2).

I'm not sure why the version would change in (1), if you can compare the serialized flows then we can determine what keys are changing resulting in a different hash. Also note that the hash of the serialized flow is not meant to be a perfect versioning system. If versioning is important to you, I'd strongly recommend using flow.register() and setting the idempotency_key to a string value that actually represents your flow versioning.

lbhtran commented 2 years ago

Hi, thank you for the reply. For 2, I was just copied the codes from the comment above. I get the version bump randomly when no changes were applied to the flow with that method too. The thing is, we’re having a CI/CD pipeline to automatically deploy the flows. I don’t want every time we run it, the flow’s version gets bumped up randomly. There’re used to be a flag in the old prefect cli that handled this very well. I check the serialized flows again tomorrow

zanieb commented 2 years ago

The old CLI has --skip-if-flow-metadata-unchanged which is simply the default behavior in the new CLI. The difference is that the idempotency key is calculated before building storage in the old CLI and is calculated after in the new CLI. The old behavior is actually wrong as your flow could be pushed to a new location but not re-registered to point to it.

You can use this branch to display the serialized flow being used to calculate the idempotency key and note the difference when it does not skip as intended https://github.com/prefectHQ/prefect/compare/debug-flow-json

lbhtran commented 2 years ago

Hey, I'm not entirely sure how to use the idempotency_key. If you can give me a bit more direction, it would be great.

I've also noticed that eventhough the flow version was increased, there is no new flow saved to the Azure blob storage. The overwrite option seems to work as intended. It just the version number still get increased

zanieb commented 2 years ago

The idempotency key is what determines if the version should be skipped. If the key matches between repeated registration calls, then a new version is not registered. You could set this key to a hash of the file containing your flow, for example, to only have it register a new version when the contents of the file changes. You could set it to a constant value FLOW_VERSION=1 flow.register(idempotency_key=FLOW_VERSION) that you bump manually. You could set it to an environment variable, etc.

I've also noticed that eventhough the flow version was increased, there is no new flow saved to the Azure blob storage. The overwrite option seems to work as intended. It just the version number still get increased

Some other metadata must be changing. You can use the branch I've provided to determine what.