Users may wish to shard their periodic jobs into multiple Pods. For example every day at 12am, we will need to process a whole bunch of work, and this work to be done may significantly increase in volume over time, and the work cannot be done before 12am (i.e. on the previous day). At where we stand right now, the only option is to support vertical scaling of a Pod, which is obviously impractical beyond a certain point. As such, we want to evaluate how to support horizontal scaling of Job pods (i.e. task-level parallelism).
The Job object in K8s currently supports parallel execution: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs. However, it is my personal opinion that the K8s JobController API for controlling parallelism of multiple Pods is not very clear and well-designed. We will outline the use cases, and attempt to propose an API design that would support and potentially improve the existing one in K8s.
It is also important to avoid over-designing this feature. A good principle to keep in mind is that Furiko's main focus on automatic timed/periodic tasks, not so much user-invoked or complex task workflows, and we are better off utilizing more complex workflow engines (i.e. Argo Workflows) to support them.
Use Cases
We will outline some use cases (including/highlighting those we have gotten internally):
Support running a fixed number of Pods per Job at the same time: This would basically be equivalent to "scaling up" a Job to more replicas, but each Pod has to be assigned work independently of other Pods. This usually involves an external message queue.
The equivalent use case in K8s is "Parallel Jobs with a work queue".
The advantage is that there is a well-defined upper bound in the amount of resources required to run the Job, but the disadvantage is that any unexpected increase in work to be done could result in longer processing times.
Support running a variable number of Pods per Job at the same time: This is an extension of (1), except that the parallelism factor is variable.
One idea could be to use (1) but allow changing the parallelism at both adhoc invocation time (prior to creation), and during runtime (after it is started) which could be controlled by an autoscaler of sorts. See (3) for more details on the latter.
If we allow the parallelism factor to depend on an external data source (e.g. Kafka topic lag), then it becomes dangerously close to a MapReduce design. I think it may be better to require the parallelism factor to be defined in the workload spec itself.
Horizontal scaling of Pods while a Job is running: While a Job is running, we may want to update the number of Pod replicas if we realize that it may not complete in time without stopping its progress. (just an idea, no real use case for now)
This can be utilized by jobs which read from a queue with a central coordinator, but not so much when the number of shards is fixed. One notable exception is Kafka, where consumers can rebalance when new consumers are added to the consumer group, and scale up to a maximum of the number of partitions.
Implementing this should be straightforward, but we have to be careful about the scale-down case, since it may conflict with completion policies (see below). A simple way to get around this is to prevent scale-down, and only allow scale-up.
Stateless/Stateful parallel worker patterns: When a Job has multiple parallel Pods, it could be possible that some Pods can pick up work from a queue such that other Pods don't need to do so, so it would be sufficient to terminate once any Pod is finished. On the other hand, if every Pod works on its subset or work and nothing else (e.g. using consistent hashing), then we need to wait for ALL Pods to finish before terminating. As such, we need to support both use cases.
I personally don't think there is a need for "fixed completion count" jobs like in K8s, at least I have never encountered a use case which depends on this. Perhaps the close equivalent of "fixed completion count" is to start N Jobs at the same time with a fixed concurrency factor, which is slightly different from the topic we are currently discussing.
Requirements
The parallelism feature must not conflict with the retries feature of Jobs. In other words, the distinction between retries and parallel tasks should be clear. In the batch/v1 JobController, it depends on a Pod's restartPolicy to retry containers, but we explicitly support pod-level retries.
Control the completion and success policy: The user may want to be explicit about what and when constitutes a successful or a completed Job. In the case of a single Pod, using exit code 0 (i.e. the Pod's phase should be Success) is sufficient to indicate a successful Job, but it becomes less clear once we have parallel Pods.
Control early termination policies: When a single Pod fails, it could be possible that we want to immediately terminate early from the Job in order to avoid unnecessary work, or to wait for all Pods to gracefully finish their existing work.
Proposed Implementation 1
We are not going forward with this proposal.
Show old proposal...
We will implement a new CRD `ShardedSet` (**NOTE**: the name is currently TBC). This CRD is most similar to a ReplicaSet, except that it controls running to completion (which ironically actually makes it similar to `batch/v1` Job itself).
The implementation of a ShardedSet will follow closely to the Indexed Job pattern in `batch/v1` (https://kubernetes.io/docs/tasks/job/indexed-parallel-processing-static/), but defines completion policy in a much more explicit manner than is currently supported by the `batch/v1` Job API, and avoids the confusion with having to define `completions`. See https://github.com/kubernetes/kubernetes/issues/28486 for some related discussion about how completions are handled.
### CRD Design
Example of a proposed `ShardedSet` custom resource object, with all possible API fields (including future expansion):
```yaml
apiVersion: execution.furiko.io/v1alpha1
kind: ShardedSet
spec:
# Defines that exactly 5 tasks are run in parallel.
# Each task receives a separate task index, and it is guaranteed that
# no two concurrently running tasks will receive the same task index.
parallelism: 5
# Defines retry policy.
retries:
# Maximum attempts per shard, beyond which we stop creating new tasks for the shard. Defaults to 1.
maxAttemptsPerShard: 3
# Cannot exceed maxAttemptsPerShard * parallelism (also the default).
# If a shard fails but this is exceeded, then it is considered a shard failure.
maxAttemptsTotal: 15
# Defines completion policy.
completion:
# Defines when a ShardedSet is completed. Options:
# - OnAllShardsSuccess (default): Stop once all shards are successful.
# - OnAnyShardSuccess: Stop once any shard is successful.
# - OnAnyShardFailure: Stop once any shard is failed.
condition: OnAllShardsSuccess
# Defines what to do on completion, depending on whether the ShardedSet is successful or failed, the defaults are shown below.
# Note that this has no effect for OnAllShardsSuccess, since by definition all shards would have completed prior to taking this action.
onSuccess: WaitForRemaining
onFailure: TerminateRemaining
# The TaskTemplateSpec itself, we could further compose other task executors too!
template:
pod:
spec:
containers:
- name: container
image: alpine
args: ["echo", "Hello world"]
env:
# The task can determine its shard index using this env var.
- name: SHARD_INDEX
value: "${parallel.shard_index}"
```
Complete breakdown for `completion.condition` success cases:
- OnAllShardsSuccess
- When some shard succeeds, do nothing.
- When all shards succeed, succeed the ShardedSet.
- OnAnyShardSuccess
- When some shard succeeds, immediately succeed the ShardedSet.
- OnAnyShardFailure
- When some shard succeeds, do nothing.
- When all shards succeed, succeed the ShardedSet.
Complete breakdown for `completion.condition` failure cases:
- OnAllShardsSuccess
- If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet.
- OnAnyShardSuccess
- If any shard cannot retry further (exceed maxAttempts), do nothing.
- If all shards failed and cannot retry further (exceed maxAttempts), fail the ShardedSet.
- OnAnyShardFailure
- If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet.
Note that:
- In the success case, `OnAllShardsSuccess` == `OnAnyShardFailure`
- In the failure case: `OnAllShardsSuccess` == `OnAnyShardFailure`
Therefore, we can simplify it to just `AllShardsSucceeded` and `AnyShardSucceeded`. (Help me verify this claim??)
Inside a JobConfig, the user will define it like this:
```yaml
apiVersion: execution.furiko.io/v1alpha1
kind: JobConfig
spec:
template:
spec:
# Retry the ShardedSet up to 3 times
maxAttempts: 3
task:
template:
# This is the start of the ShardedSetTemplateSpec
parallel:
metadata:
labels: ...
spec:
parallelism: 5
template:
pod:
spec:
containers: [ ... ]
```
### Pros and Cons
* Pros:
* Very easy to reason about. The composition of two separate APIs is clear from a developer and a user perspective, and future extensions to the ShardedSet controller avoids conflicting with the core JobController.
* Most users will not need to think about the additional API fields that are introduced for parallelism if they don't need it. In my opinion, this is the biggest issue with the `batch/v1` Job.
* Cons:
* By composing a secondary task executor to achieve task-level parallelism, we may be prematurely confining the design to only support a subset of use cases. For example, by separating the retry and parallel sets into distinct layers we may constrain the possible expansion options in the future.
* Additional implementation cost, but it is saved by reducing the work on ensuring that the existing JobController behavior is not broken if we do Option 2.
* Potentially duplicate logic in both ShardedSet and Job controllers (e.g. task adoption, task executor).
Proposed Implementation 2
Another way is to avoid the use of a CRD, but implement it directly in the JobController and update the Job API.
withCount: Specify absolute number, the index number will be generated from 0 - N-1 in ${task.index_num}
withKeys: Specify a list of string keys, it will be made available in ${task.index_key}
withMatrix: Specify a map of keys to a list of string values, each key will be available in ${task.index_matrix.<matrix_key>}. This is to support common parallel patterns (e.g. CI on multiple platform and version combinations)
Some considerations:
Retries will take place on a parallel index-level. This means that using withCount of 3, each index (0, 1, 2) has a maximum of 3 retries each.
The completionStrategy is similar to Proposal (1).
The main reason we are not using Proposal (1) is because of the complexity introduced when having nested retries, and it is actually clearer to inline the implementation into the same CRD/controller.
Alternatives
There are some alternatives to the above design to achieve the same requirements.
Creating one JobConfig for each desired shard. The obvious downside is that you have duplicate objects and higher cost of maintenance, configuration drift, etc.
Support starting multiple Jobs at each schedule. This is a very simple solution, but there are some drawbacks:
Each Job started at the same time are basically independent of each other, and we cannot determine the status or control the workload as a single atomic unit.
Multiple Jobs started concurrently that spill over their normal run duration may eat into the maxConcurrency of the JobConfig (see #16), resulting in lesser total Jobs being run than expected.
We will be adding this to the Alpha Release milestone, and this will be second task executor to be added to Furiko. I think that we can solidify the task executor interface once this is complete.
Motivation
Users may wish to shard their periodic jobs into multiple Pods. For example every day at 12am, we will need to process a whole bunch of work, and this work to be done may significantly increase in volume over time, and the work cannot be done before 12am (i.e. on the previous day). At where we stand right now, the only option is to support vertical scaling of a Pod, which is obviously impractical beyond a certain point. As such, we want to evaluate how to support horizontal scaling of Job pods (i.e. task-level parallelism).
The Job object in K8s currently supports parallel execution: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs. However, it is my personal opinion that the K8s JobController API for controlling parallelism of multiple Pods is not very clear and well-designed. We will outline the use cases, and attempt to propose an API design that would support and potentially improve the existing one in K8s.
It is also important to avoid over-designing this feature. A good principle to keep in mind is that Furiko's main focus on automatic timed/periodic tasks, not so much user-invoked or complex task workflows, and we are better off utilizing more complex workflow engines (i.e. Argo Workflows) to support them.
Use Cases
We will outline some use cases (including/highlighting those we have gotten internally):
I personally don't think there is a need for "fixed completion count" jobs like in K8s, at least I have never encountered a use case which depends on this. Perhaps the close equivalent of "fixed completion count" is to start N Jobs at the same time with a fixed concurrency factor, which is slightly different from the topic we are currently discussing.
Requirements
batch/v1
JobController, it depends on a Pod's restartPolicy to retry containers, but we explicitly support pod-level retries.Success
) is sufficient to indicate a successful Job, but it becomes less clear once we have parallel Pods.Proposed Implementation 1
We are not going forward with this proposal.
Show old proposal...
We will implement a new CRD `ShardedSet` (**NOTE**: the name is currently TBC). This CRD is most similar to a ReplicaSet, except that it controls running to completion (which ironically actually makes it similar to `batch/v1` Job itself). The implementation of a ShardedSet will follow closely to the Indexed Job pattern in `batch/v1` (https://kubernetes.io/docs/tasks/job/indexed-parallel-processing-static/), but defines completion policy in a much more explicit manner than is currently supported by the `batch/v1` Job API, and avoids the confusion with having to define `completions`. See https://github.com/kubernetes/kubernetes/issues/28486 for some related discussion about how completions are handled. ### CRD Design Example of a proposed `ShardedSet` custom resource object, with all possible API fields (including future expansion): ```yaml apiVersion: execution.furiko.io/v1alpha1 kind: ShardedSet spec: # Defines that exactly 5 tasks are run in parallel. # Each task receives a separate task index, and it is guaranteed that # no two concurrently running tasks will receive the same task index. parallelism: 5 # Defines retry policy. retries: # Maximum attempts per shard, beyond which we stop creating new tasks for the shard. Defaults to 1. maxAttemptsPerShard: 3 # Cannot exceed maxAttemptsPerShard * parallelism (also the default). # If a shard fails but this is exceeded, then it is considered a shard failure. maxAttemptsTotal: 15 # Defines completion policy. completion: # Defines when a ShardedSet is completed. Options: # - OnAllShardsSuccess (default): Stop once all shards are successful. # - OnAnyShardSuccess: Stop once any shard is successful. # - OnAnyShardFailure: Stop once any shard is failed. condition: OnAllShardsSuccess # Defines what to do on completion, depending on whether the ShardedSet is successful or failed, the defaults are shown below. # Note that this has no effect for OnAllShardsSuccess, since by definition all shards would have completed prior to taking this action. onSuccess: WaitForRemaining onFailure: TerminateRemaining # The TaskTemplateSpec itself, we could further compose other task executors too! template: pod: spec: containers: - name: container image: alpine args: ["echo", "Hello world"] env: # The task can determine its shard index using this env var. - name: SHARD_INDEX value: "${parallel.shard_index}" ``` Complete breakdown for `completion.condition` success cases: - OnAllShardsSuccess - When some shard succeeds, do nothing. - When all shards succeed, succeed the ShardedSet. - OnAnyShardSuccess - When some shard succeeds, immediately succeed the ShardedSet. - OnAnyShardFailure - When some shard succeeds, do nothing. - When all shards succeed, succeed the ShardedSet. Complete breakdown for `completion.condition` failure cases: - OnAllShardsSuccess - If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet. - OnAnyShardSuccess - If any shard cannot retry further (exceed maxAttempts), do nothing. - If all shards failed and cannot retry further (exceed maxAttempts), fail the ShardedSet. - OnAnyShardFailure - If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet. Note that: - In the success case, `OnAllShardsSuccess` == `OnAnyShardFailure` - In the failure case: `OnAllShardsSuccess` == `OnAnyShardFailure` Therefore, we can simplify it to just `AllShardsSucceeded` and `AnyShardSucceeded`. (Help me verify this claim??) Inside a JobConfig, the user will define it like this: ```yaml apiVersion: execution.furiko.io/v1alpha1 kind: JobConfig spec: template: spec: # Retry the ShardedSet up to 3 times maxAttempts: 3 task: template: # This is the start of the ShardedSetTemplateSpec parallel: metadata: labels: ... spec: parallelism: 5 template: pod: spec: containers: [ ... ] ``` ### Pros and Cons * Pros: * Very easy to reason about. The composition of two separate APIs is clear from a developer and a user perspective, and future extensions to the ShardedSet controller avoids conflicting with the core JobController. * Most users will not need to think about the additional API fields that are introduced for parallelism if they don't need it. In my opinion, this is the biggest issue with the `batch/v1` Job. * Cons: * By composing a secondary task executor to achieve task-level parallelism, we may be prematurely confining the design to only support a subset of use cases. For example, by separating the retry and parallel sets into distinct layers we may constrain the possible expansion options in the future. * Additional implementation cost, but it is saved by reducing the work on ensuring that the existing JobController behavior is not broken if we do Option 2. * Potentially duplicate logic in both ShardedSet and Job controllers (e.g. task adoption, task executor).Proposed Implementation 2
Another way is to avoid the use of a CRD, but implement it directly in the JobController and update the Job API.
We will add the following fields to the spec:
Some possible parallelism types:
withCount
: Specify absolute number, the index number will be generated from0 - N-1
in${task.index_num}
withKeys
: Specify a list of string keys, it will be made available in${task.index_key}
withMatrix
: Specify a map of keys to a list of string values, each key will be available in${task.index_matrix.<matrix_key>}
. This is to support common parallel patterns (e.g. CI on multiple platform and version combinations)Some considerations:
withCount
of3
, each index (0
,1
,2
) has a maximum of 3 retries each.completionStrategy
is similar to Proposal (1).The main reason we are not using Proposal (1) is because of the complexity introduced when having nested retries, and it is actually clearer to inline the implementation into the same CRD/controller.
Alternatives
There are some alternatives to the above design to achieve the same requirements.
TODO List
81
83