treasure-data / digdag

Workload Automation System
https://www.digdag.io/
Apache License 2.0
1.3k stars 221 forks source link

for_each parameterization capability is limited #25

Closed danielnorberg closed 7 years ago

danielnorberg commented 8 years ago

for_each> parameters can be parameterized like below by utilizing YAML anchors:

run: +parameterized_for_each

_export:
  foos: &FOOS
    - 1
    - 2

+parameterized_for_each:
  for_each>:
    foo: *FOOS
  _do:
    sh>: "echo hello ${foo}"

But it might be useful to allow for parameterizing for_each> using actual digdag parameters. E.g.

run: +parameterized_for_each

_export:
  foos:
    - 1
    - 2

+parameterized_for_each:
  for_each>:
    foo: @foos
  _do:
    sh>: "echo hello ${foo}"

And using parameters explicitly set by e.g. a py> task:

run: +main

+main:
  +export_foos:
    py>: tasks.export_foos
  +parameterized_for_each:
    for_each>:
      foo: ${foos}
    _do:
      sh>: "echo hello ${foo}"
import digdag

def export_foos():
    digdag.env.store({"foos": [1, 2]})

Attempting to parameterize for_each> like this currently fails with the below error:

2016-03-30 10:18:17 +0900: Digdag v0.4.2
2016-03-30 10:18:18 +0900 [WARN] (main): Reusing the last session time 2016-03-29T00:00:00+09:00.
2016-03-30 10:18:18 +0900 [INFO] (main): Using session digdag.status/20160329T000000+0900.
2016-03-30 10:18:18 +0900 [INFO] (main): Starting a new session repository id=1 workflow name=+main session_time=2016-03-29T00:00:00+09:00
2016-03-30 10:18:19 +0900 [INFO] (0020@+main+export_foos): py>: tasks.export_foos
2016-03-30 10:18:19 +0900 [INFO] (0020@+main+parameterized_for_each): for_each>: {foo=[1,2]}
2016-03-30 10:18:19 +0900 [ERROR] (0020@+main+parameterized_for_each): Task failed
io.digdag.client.config.ConfigException: Expected array type for key 'foo' but got "[1,2]" (string)
    at io.digdag.client.config.Config.propagateConvertException(Config.java:400)
    at io.digdag.client.config.Config.readObject(Config.java:391)
    at io.digdag.client.config.Config.get(Config.java:235)
    at io.digdag.client.config.Config.getList(Config.java:277)
    at io.digdag.standards.operator.ForEachOperatorFactory$ForEachOperator.runTask(ForEachOperatorFactory.java:67)
    at io.digdag.standards.operator.BaseOperator.run(BaseOperator.java:49)
    at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:238)
    at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:653)
    at io.digdag.core.agent.OperatorManager.runWithArchive(OperatorManager.java:193)
    at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$1(OperatorManager.java:130)
    at io.digdag.core.agent.CurrentDirectoryArchiveManager.withExtractedArchive(CurrentDirectoryArchiveManager.java:20)
    at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:129)
    at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:107)
    at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:635)
    at io.digdag.core.agent.LocalAgent.lambda$run$0(LocalAgent.java:61)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token
 at [Source: N/A; line: -1, column: -1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:854)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:850)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.handleNonArray(CollectionDeserializer.java:292)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:227)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:217)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:25)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3703)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2072)
    at io.digdag.client.config.Config.readObject(Config.java:388)
    ... 18 common frames omitted
error:
  * +main+parameterized_for_each:
    Expected array type for key 'foo' but got "[1,2]" (string)

Task state is saved at digdag.status/20160329T000000+0900 directory.
Run command with --session '2016-03-29 00:00:00' argument to retry failed tasks.

I'm uncertain whether the best approach would be to make ${...} expansion more intelligent about types of parameters and not coerce everything to a string, or if it would make more sense to introduce another syntax.

danielnorberg commented 8 years ago

This was inspired by seeing the programmatic looping here in the digdag-analytics poc: https://github.com/treasure-data/digdag-analytics/blob/master/se_poc_emails/tasks/poc_workflow.rb#L15

It seems desirable to be able to use digdag to perform these queries and leverage the digdag parallelism, retry and dependency tracking etc facilities instead.

frsyuki commented 8 years ago
  1. Use YAML reference (& and *). This works now as you mentioned.
  2. Introduce a new syntax like key: @foo, key: @{foo}, key: @${foo}, key: $@foo, key=: foo, key: !param foo etc.
  3. Preserve type of foo if a value is ${foo} without any characters around it. If foo is an integer, prefix_${foo} is string, ${foo}_suffix is string, but ${foo} is integer. I think this is problematic when users want to convert integer to string. In YAML syntax, you can use "${foo}" but there're no ways to distinguish it from ${foo}. So, users need to use ${foo.toString()}. I think this is not intuitive.
  4. Parse value as a JSON string if an operator gets it with array or map type. If operator is written in Java, it uses int foo = conf.get("foo", int.class) or List<String> foo = conf.getList("foo", String.class) API to get a config value. This get or getList methods have a chance to convert value type. Currently, conf.getList("foo", String.class) fails if actual value of foo is not a list. But we can change the behavior so that it tries to parse the string as JSON if the actual value is string. This works because ${foo} converts foo to a JSON string if foo is a list or map. But this doesn't work if operator is Ruby or Python (dynamically-typed). params.get("foo") is string even if the operator expects list.
  5. Similar to 3, but use string by default. But raw(...) to preserve type. So, ${foo} is string, but ${raw(foo)} is list. prefix_${raw(foo)} is string, though.

In a very old version, syntax was key=: foo (so, idea 2).

danielnorberg commented 8 years ago

I like the key: @foos syntax but @ is reserved and cannot be used to start a token in yaml =/

http://yaml.org/spec/current.html#c-directive

On the other hand, less ways to parameterize things means less surprise and mistakes for our users, so allowing key: ${foos} (idea 4) might be more desirable.

danielnorberg commented 8 years ago

It would also be useful to be able to do things like:

run: +main
+main:
  for_each>:
    foo:
      - bar: a
        baz: b
      - bar: c
        baz: d
  _do:
    sh>: "echo ${foo.bar}: ${foo.baz}"

Where the tasks executed for the above would be:

echo a b
echo c d

Currently the above produces:

error:
  * +main:
    io.digdag.core.repository.ModelValidationException: Validating workflow task failed
name can't contain   character "+for-foo={bar=a, baz=b}"