Open paxbit opened 3 years ago
Hello, @paxbit.
Yes, you might be right with this behaviour. Kopf has some protection measures against quickly arriving events — the "batch window" (settings.batching.batch_window
, defaults to 0.1 seconds). All events that arrived within this time window are reduced only to the latest event, which means the latest object state. All intermediate events/states are ignored.
One sure problem that I see in your operator is using async def
handler for startup and a synchronous time.sleep()
call inside. This would block the whole asyncio event loop for 2 seconds, not only this handlers and not only the operator. This goes against the rules of async programming and is documented in https://kopf.readthedocs.io/en/stable/async/.
Such blocking freeze of the whole process is functionally equivalent to the operator process/pod being terminated/suspended for 2 seconds (or, say, 2 minutes), and then restarted/resumed — to find out that the pod has changed.
A better and proper way would be to either do await asyncio.sleep(2)
in the async def
handler, or define the startup handler as def
(not async).
# EITHER:
@kopf.on.startup()
async def create_pod(**_): # <<< async!
await asyncio.sleep(2) # <<< async!
# OR:
@kopf.on.startup()
def create_pod(**_): # <<< not async!
time.sleep(2) # <<< not async!
This will sufficiently unblock the event loop to react to the pod's creation fast enough, and to putting the last-handled configuration also fast enough. As a result, the update handler will be called properly in the majority of cases.
Since pykube itself is synchronous, I recommend the sync-way: def
& time.sleep()
.
However, there will be a short period when this issue can appear — maybe, as long as a fraction of a second on slow connections (operator<->cluster), and near-instant (but non-zero) on fast connections.
Specifically, if a pod is changed by external actors between its actual creation and within the above-mentioned "batch window". In that case, an already changed pod will arrive to Kopf, and will be seen as a creation with an already modified state.
Almost nothing can be done in that case. Kopf implements the state-driven approach of Kubernetes, so the first time it "sees" the object's state is when it is already quick-modified. The initial non-modified state does not exist for Kopf, so there is nothing to compare against.
You can try setting settings.batching.batch_window
to 0
to disable batching. But I cannot state it is a good idea and which side effects it will produce. Perhaps, some significant load on the operator side (because pods' statuses change a lot and often).
Blindly assuming the goals of your operator, I would rather recommend one of these 3 approaches instead:
Approach 1: React to all kinds of resource changes for that field, including its creation; or, better said, its first appearance.
@kopf.on.create('pods', field='status')
@kopf.on.update('pods', field='status')
#@kopf.on.resume('pods', field='status') # might be a good idea
async def status_updated(name, status, **_):
print(f"{name}'s status was updated: {status.get('phase')}")
Approach 2: React to the low-level events only. It still is the latest state within the batch window, but you can reduce the codebase for that. A downside: such handlers are fire-and-forget, they are not retried in case of errors.
@kopf.on.event('pods', field='status')
async def status_updated(name, status, **_):
print(f"{name}'s status was updated: {status.get('phase')}")
Approach 3: Use the on-field handlers, which is the equivalent of on-creation + on-update as above, with some subtle nuances.
@kopf.on.field('pods', field='status')
async def status_updated(name, status, old, new, **_):
print(f"{name}'s status was updated: {status.get('phase')}")
In all approaches, on the first appearance of the object, it will be a change of .status
from None
to {...whatever-is-there..., "phase": ...}
. On all further changes, it will be a change from {...}
to {...}
(from a dict to another dict).
Hello, @nolar,
One sure problem that I see in your operator is using
async def
handler for startup and a synchronoustime.sleep()
call inside. This would block the whole asyncio event loop for 2 seconds, not only this handlers and not only the operator. This goes against the rules of async programming and is documented in https://kopf.readthedocs.io/en/stable/async/.Such blocking freeze of the whole process is functionally equivalent to the operator process/pod being terminated/suspended for 2 seconds (or, say, 2 minutes), and then restarted/resumed — to find out that the pod has changed.
A better and proper way would be to either do
await asyncio.sleep(2)
in theasync def
handler, or define the startup handler asdef
(not async).
Errm, yes I'm quite aware of that and I actually though about mentioning this but thought it'd be too obvious ;) But I guess I may understand your maintainer perspective and why pointing out loop blocking comes to mind first. Maybe, I could have been more clear about that. The code is not in my operator but in this example for reproduction purposes to simulate "a delay". That's why I wrote about the actual situation we observe in the second paragraph of the description. This was only to make the point that in the presence of async handler tasks running longer than a HTTP round-trip to the cluster API, kopf does not guarantee correctness of its own assumptions about state storage. Those two (the async handler and any mutation through the API) are always racing against each other in the current implementation. "Usually" kopf wins. But when it does not it fails in a very unpleasant way. There is no error, the handler is simply ignored. The situations where this can happen are entirely non-esoteric and do happen in the real world "frequently". Imagine a load-balanced Kube API behind a VRRP domain. If one API endpoint behind the VIP(s) in the group fails, within the fail-over window, kopfs storage patch might be delayed at best or lost at worst. Simple temporary network congestion, data-center fail-overs or partitions and many other net-related things might do the same. Or some third party software might simply be quicker in triggering a MODIFIED, maybe b/c the node where an operator based on kopf is currently scheduled is simply CPU stalled and swapping like crazy for some misconfiguration reason.
However, there will be a short period when this issue can appear — maybe, as long as a fraction of a second on slow connections (operator<->cluster), and near-instant (but non-zero) on fast connections.
Yes, this is what I see.
Specifically, if a pod is changed by external actors between its actual creation and within the above-mentioned "batch window". In that case, an already changed pod will arrive to Kopf, and will be seen as a creation with an already modified state.
That's what I meant.
Almost nothing can be done in that case. Kopf implements the state-driven approach of Kubernetes, so the first time it "sees" the object's state is when it is already quick-modified. The initial non-modified state does not exist for Kopf, so there is nothing to compare against.
But it's still the case that kopf sees the CREATE event before the MODIFIED event. It's just that all of kopfs iterations necessary to patch a storage annotation into the resource did not happen yet. This can be detected I think. The way I read kopfs code is such that a MODIFIED event without a storage annotation is undefined in terms of triggering any discrete handling of that fact. Couldn't the batching (debounce) implementation maybe track event sequences until pending storage patching is done? I mean as soon as kopf sees a resource event it could, while debouncing, decide if it is managed by it. After doing that it could remember the pending storage patch in-memory and implicitly apply it to every consecutive event for the resource until it sees the patch in the data of an incoming event. I think that would go a long way in putting the two racers on separate tracks.
You can try setting
settings.batching.batch_window
to0
to disable batching. But I cannot state it is a good idea and which side effects it will produce. Perhaps, some significant load on the operator side (because pods' statuses change a lot and often).So, then, I'd rather not ;)
About the 3 approaches you suggested:
on.field(...)
? Due to the nature of this bug I cannot rely on old is None
to detect creation. I would say we have not yet seen or imagined all possible side-effects of this race. For instance I think it should be possible to not just lose handler invocations but to trigger more handler invocations than desired. If an in-flight handler progress patch is preceded by one or more MODIFIED events in quick succession kopf might not see that e.g. the handler was already started, or did already terminate, or has already exhausted all configured retries. Does that sound reasonable or would that somehow be prevented?
Hi @nolar,
I had a look at the batching and storage implementations. I did not implement anything yet but what do you think about the following ideas?
Make the debouncing optional by allowing settings.batching.batch_window = None
and then guard it by if settings.batching.batch_window is not None:
before the try
here:
https://github.com/nolar/kopf/blob/9fa2d687a0541db93e8edc8d677fc509549f925f/kopf/reactor/queueing.py#L294-L303 entirely
For each of the ProgressStorage
write method implementations implement something like memory.put_storage_for(body, patch, some_version_counter)
and for the fetch
method do sth. like:
resource_storage=storage.get_from_body(body) in_memory_storage=memory.for_resource(body)
if not in_memory_storage and not resource_storage:
return empty
if in_memory_storage and not resource_storage:
return in_memory_storage
if not in_memory_storage and resource_storage:
return resource_storage
if in_memory_storage is_older_or_equals resource_storage:
purge in_memory_storage return resource_storage
if in_memory_storage is_newer_than resource_storage:
return in_memory_storage
What do you think?
Errm, yes I'm quite aware of that and I actually thought about mentioning this but thought it'd be too obvious ;)
Sorry, I didn't have any intention to question your skills. But this was an obvious problem, so indeed, I just reacted to that.
kopf might not see that e.g. the handler was already started Does that sound reasonable or would that somehow be prevented?
On the one hand, while individual objects are parallelised (via the multiplexing logic), each individual object is always handled sequentially. It means the "worker" for that individual object is not doing anything because its task is somewhere in the handler currently, so it will not start a new handler until the previous one is finished.
The events are accumulated in the backlog queue for that worker. Once the handler is finished, the call stack for that individual resource goes back to the top, i.e. to the worker, where it depletes the whole backlog queue to get the latest seen state to be processed.
The "batch window" was added to ensure that even changes introduced by that handler are taken into account.
On the other hand, you are right. If the operator<->cluster latency is high and exceeds the batching window, it might be so that we have a sequence of states A B pause C D; the handlers for the state B are triggered after the first pause; while they work, states C D arrive into the queue. If the handlers do patch the resource, they create a new state E. In normal cases, it should end up with the sequence "A B pause C D E pause", and the new E state will be processed as desired.
However, if the latency is high, it might be so that the sequence looks like "A B pause C D latency E pause". As a result, the handlers will be invoked for state D when the resource is already in state E, and Kopf just does not know about that. And so, the double-execution is possible.
As a possible fix, patching the resource by Kopf should remember its resource version (r.v.) and return it back to the worker. The worker must deplete the backlog "at least" till that reported r.v. is seen — before switching to the batching mode. In other words, if it sees the backlog as depleted, but the r.v. is not yet seen, it should wait an extra time.
Since resource versions are documented as "opaque", we cannot even compare them with <
or >
. It should really be a binary seen/not-seen flag.
However, as a safety measure from skipping the states (I don't trust the K8s watch-stream either), a secondary time-window should be added: "deplete the backlog until a specific r.v. is seen, but at most for 5.0 seconds, before considering the batch as over; then, switch to regular batching with the 0.1s window". Something like this.
This issue seems complicated and deep, but very interesting 😅 I will try to reproduce the latency issues locally (I remember there were some tools to simulate slow TCP) — and to see how these cases can be optimized.
Sadly, I have to do my regular work in the daytime. So I can get to this problem only on the coming weekend at the earliest, maybe the next one. Luckily, there are no big major roadmap features to add now, so I can focus on stability & optimizations.
Yes, batch_window=None
seems like a good option for those who want it. The docstring must explain the consequences briefly.
I didn't fully get what is happening in this example, but I prefer to not touch memory
at all. First of all, I am not happy that I had to introduce per-resource memories in the first place, as it explodes the RAM usage for large clusters — but that was needed for some non-serialisable mini-objects. Putting the whole body or even its state is too much (UPD: but it does so for daemons/timers — again, there was no other choice).
However, a new class InMemoryProgressStorage(kopf.ProgressStorage): ...
can be easily implemented and combined with kopf.AnnotationsProgressStorage
via a kopf.MultiProgressStorage
(there is an example for "Smart/Transitional" storage or something, which is now used by default). As I understand the intention, that might help, and it does not require changing the framework — it can be extended in the operator initially.
Hi @nolar,
Thanks for the answer and also for taking the time to look into it!
In my opinion it is an error to generalize event debouncing for all operators using kopf. So I think the docstring should explain both situations and I think the default should be to not debounce at all. Also it should explain the current situation and state the trade-off being made with the batching_window
. Namely the debouncing introduces a lossiness into the event stream of a resource which might break some use-cases. It trades verity for performance and implementation simplicity. I think it is wrong to assume everybody is OK in every situation to only receive the last event of a burst.
I make the case there should be a facility in kopf allowing to make discrete decisions about debouncing, maybe per resource type. So one can say things like: I'm OK with only ever seeing the latest version of my ConfigMaps but I definitely want each and every event of my Pods.
Two years ago for another client I wrote an operator from scratch using rxJava. This made me familiar with situations like this and the option to fully debounce in some situations while requiring FIFO buffering in others. That operator use-case had, among other things, to observe ConfigMaps and Pods. The ConfigMaps were managed by CD pipelines. As finalizers those pipelines were running a barrage of patch and apply calls for labels, annotations and data against the ConfigMaps. In this situation it did not hamper correctness to debounce those ConfigMaps in the operator for 10s and only ever use the "latest" one.
For the Pods however it was not OK to ignore any event. They needed to be monitored for Ready > NotReady > Ready transitions. A crashing and quickly restarting container in a pod might do this. When using kopf you might never see those.
It was just some pseudo-code trying to explain that every writing ProgressStorage
method unconditionally and always stores the patch
it just updated also in-memory. Then, in the fetch method, it waits for that in-memory representation to appear on a resource for which fetch
is called. The if
-statement chain was not supposed to be real code but to illustrate and talk about the order of precedence and invalidation logic when dealing with the in-memory representation of a patch in conjunction with its representation in the actual resource manifest data.
So if you don't want to use memory
, OK. This never meant to suggest using precisely that class. Also just for this functionality I don't think the memory footprint would be too large. The patch would only be kept in-memory for the time the actual patch is "underway" to the API and again back to kopf via the new watch event the patch triggered. If only in-flight patches are stored the memory footprint should be in the negligible megabytes region.
Agreed. It would be easy to implement an InMemoryProgressStorage
. However I think the current design with all of them storage implementations should be hardened against the race condition this bug is about. So that it not only works probably in most cases but by definition.
As a side-note: the latency can be added by using toxiproxy by Shopify.
toxiproxy-server
toxiproxy-cli create kopf_k3s -l localhost:11223 -u localhost:50016
toxiproxy-cli toxic add kopf_k3s -t latency -a latency=3000
where port 50016
comes from kubectl config view
with K3d/K3s currently used, and port 11223
is our random choice.
$ toxiproxy-cli inspect kopf_k3s
Name: kopf_k3s Listen: 127.0.0.1:11223 Upstream: localhost:50016
======================================================================
Upstream toxics:
Proxy has no Upstream toxics enabled.
Downstream toxics:
latency_downstream: type=latency stream=downstream toxicity=1.00 attributes=[ jitter=0 latency=3000 ]
When injected into kubeconfig by:
kubectl config set clusters.k3d-k3s-default.server https://0.0.0.0:11223
Then the communication with kubectl
is slowed down:
$ time kubectl get pod
………
real 0m6.125s
user 0m0.111s
sys 0m0.051s
(At least, 2 requests are made: one for API resources available; one for the actual pod list; total 2*3s=6s; sometimes 18s for reasons unknown.)
Only the "downstream" (client->server) is slowed down, not the "upstream" (server->client).
To do this to Kopf or the operator only, without affecting kubectl:
import logging
import kopf
import dataclasses
@kopf.on.login()
def delayed_k3s(**_):
conn = kopf.login_via_pykube(logger=logging.getLogger('xxx'))
if conn:
return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223')
# from examples/01-minimal:
@kopf.on.create('kopfexamples')
def create_fn(spec, **kwargs):
print(f"And here we are! Creating: {spec}")
return {'message': 'hello world'} # will be the new status
Then we get a 3-second delay on every "client->server" call (~0s on "server->client" streaming!):
[2021-04-10 11:26:58,853] kopf.reactor.activit [INFO ] Initial authentication has been initiated.
[2021-04-10 11:26:58,854] kopf.activities.auth [DEBUG ] Activity 'delayed_k3s' is invoked.
[2021-04-10 11:26:58,882] xxx [DEBUG ] Pykube is configured via kubeconfig file.
[2021-04-10 11:26:58,886] kopf.activities.auth [INFO ] Activity 'delayed_k3s' succeeded.
[2021-04-10 11:26:58,886] kopf.reactor.activit [INFO ] Initial authentication has finished.
# here, scanning the API groups/versions/resources available (in parallel, but 2-3 levels of API hierarchy).
[2021-04-10 11:27:11,129] kopf.clients.watchin [DEBUG ] Starting the watch-stream for customresourcedefinitions.v1.apiextensions.k8s.io cluster-wide.
[2021-04-10 11:27:11,131] kopf.clients.watchin [DEBUG ] Starting the watch-stream for kopfexamples.v1.kopf.dev cluster-wide.
# here, starting the watch-streams for all involved resources/namespaces.
[2021-04-10 11:27:14,253] kopf.objects [DEBUG ] [kube-system/kopf-example-1] Creation is in progress: {'apiVersion': 'kopf.dev/v1', 'kind': 'KopfExample', ………
[2021-04-10 11:27:14,254] kopf.objects [DEBUG ] [kube-system/kopf-example-1] Handler 'create_fn' is invoked.
[2021-04-10 11:27:14,256] kopf.objects [INFO ] [kube-system/kopf-example-1] Handler 'create_fn' succeeded.
[2021-04-10 11:27:14,258] kopf.objects [INFO ] [kube-system/kopf-example-1] Creation is processed: 1 succeeded; 0 failed.
[2021-04-10 11:27:14,259] kopf.objects [DEBUG ] [kube-system/kopf-example-1] Patching with: {'status': {'create_fn': {'message': 'hello world'}}, 'metadata': {'annotations': {'kopf.zalando.org/last-handled-configuration': '{"spec":{"duration":"1m","field":"value","items":["item1","item2"]},"metadata":{"labels":{"somelabel":"somevalue"},"annotations":{"someannotation":"somevalue"}}}\n'}}}
And here we are! Creating: {'duration': '1m', 'field': 'value', 'items': ['item1', 'item2']}
# here, patching: the downstream call takes 3s; the upstream watch-events take ~0s to arrive back.
[2021-04-10 11:27:17,389] kopf.objects [DEBUG ] [kube-system/kopf-example-1] Something has changed, but we are not interested (the essence is the same).
[2021-04-10 11:27:17,389] kopf.objects [DEBUG ] [kube-system/kopf-example-1] Handling cycle is finished, waiting for new changes since now.
In response to the comments:
To no.1.
I agree in principle. Though, I have some doubts about the implementation. Per-resource configuration looks questionable to me. The batching window is a low-level hack mostly for latency issues, not for operating logic.
Where I see the distinction line, is event-watching vs. change-detecting handlers (let's keep daemons & timers & indices aside for now).
The change-detecting handlers (on-create/update/delete/resume) are state-driven — the same as Kubernetes is often described to be. If you have a state change of Ready→NotReady→Ready, and the handler fails on Ready→NotReady, it will be retried, but the state will be (2nd)Ready again. More on that, Kopf will have to remember the whole chain of changes and process/retry the old changes while the resource can be in a much newer state for a long time. So, by design, these handlers should not see all events, only the differences between the last-processed & current states. Worth mentioning, that change-detecting handlers are not a Kubernetes feature, it is Kopf's own addon, so there is no baseline to align to.
However, the event-watching handlers (on-event) give no promises on which events will be seen or not seen. In fact, there might be an implicit promise and an expectation that ALL events are seen as sent by Kubernetes. And that implicit promise is obviously broken due to batching. That didn't cause any problems by now, but I would not claim that it is how it should be forever. It could be changed. For those rare cases when every interim event counts, the operator developers can implement their queues, and put events to queues in the on-event handlers, and process them elsewhere (in daemons?) — with the guarantee that the events are delivered precisely as K8s does that (except for downtimes).
A possible solution might be a redesign of the worker()
->processor()
interaction so that each and every watch-event is processed via on-event handlers (no batching!), while only the latest state of a batch is processed via on-create/update/delete/resume. This is still backwards compatible with the current promises & documentation about all of the mentioned handler types.
Would such a logic solve the limitations of Kopf for the use-cases you mentioned?
To no.2:
A similar approach was once offered for some other unrelated problem. There are reasons why patches should not be handled any other way than by applying them to the K8s API.
Reason 1: No matter how much Kopf adheres to RFCs, there might be differences in how this patching is implemented in Kopf and how it is implemented in Kubernetes. I would not trust myself here in doing things correctly. Instead, I prefer to send it to the server-side always and get back the patched body of a resource.
Reason 2: Even if we would be able to implement proper patching in-memory the same way as Kubernetes does, or even comparison of multiple/combined patches to the resulting body, there are server-side changes outside of Kopf's control: e.g., mutating admission webhooks. Even in one single PATCH
operation, the resulting resource can be different from what was in the payload of the same request — because some external admission webhooks decided so.
The latter is basically a showstopper: patches cannot be interpreted client-side in any way. The whole patching logic must be done server-side.
And so, to track the resource changes, we can only go for the "opaque" .metadata.resourceVersion
— as I suggested above.
Notes for myself:
It seems that ToxiProxy cannot help here. It slows down all requests, including patching. If patching is slowed down, the operator blocks for the whole duration of the request sent & response received. As a result, all changes arriving into Kubernetes during the patch, end up in a single batch on the operator side:
Timeline with upstream latency 0s, downstream latency 3s:
/-- kubectl create, object is created (a=s0)
| ... sleep 1s
| /-- kubectl patch, object is patched (b=s0+p1)
| | /-- c=s0+p1+p2
↓ ↓ |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
↓ ↓ ↑↓
| | |\----3s----\
| | | |
| \----3s+---\ |
| | | |
\----3s----\| \------\|
↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ (blocked by patching, ends with batching)
----+-//------------aaaaabbbbbbbbcccccc-> which state does the operator see
↓ ↓↑ X ↓
| || X |
| || X \-- operator gets a watch-event (batch: patched1+patched2)
| || X-- operator WOULD get a watch-event (patched1), but it does not
| |\-- operator reacts, starts patching (p2)
| \-- operator gets a watch-event (state a)
\-- watching started
It is impossible to introduce latency only for watching streams via ToxyProxy.
However, it is possible with strategic placement of sleeps:
# kopf.clients.watching::watch_objs()
...
async with response:
async for line in _iter_jsonlines(response.content):
await asyncio.sleep(3.0) # <<<<<<<<<<<<<<<<<<<<<<<<<<<
raw_input = cast(bodies.RawInput, json.loads(line.decode("utf-8")))
yield raw_input
...
WIth that single-line hack, the double-execution is reproducible with a regular operator (examples/01-minimal) with no authentication handlers, and the following CLI script:
kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}'
This happens 100% of the time, without freezing the event loop artificially.
The timeline looks like this (with patch-blocking removed):
Timeline with only the watch-stream latency 3s:
/-- kubectl create, object is created (a=s0)
| ... sleep 1s
| /-- kubectl patch, object is patched (b=s0+p1)
| | /-- c=s0+p1+p2
↓ ↓ |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
↓ ↓ ↑↓
| | |\----3s----\
| | | |
| \----3s+---\ |
| | | |
\----3s----\| | |
↓↑ ↓ ↓
----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator
↓ ↓↑ ↓ ↓
| || | |
| || | \-- operator gets a watch-event (patched2)
| || |
| || \-- operator gets a watch-event of state "b" (patched1)
| || !BUG!: "c" (p2) is not seen yet, though implied; executes AGAIN!
| ||
| |\-- handlers execute, insta-patches as done (p2), creates state "c"
| \-- operator gets a watch-event (state a)
\-- watching started
A possible solution might be a redesign of the
worker()
->processor()
interaction so that each and every watch-event is processed via on-event handlers (no batching!), while only the latest state of a batch is processed via on-create/update/delete/resume. This is still backwards compatible with the current promises & documentation about all of the mentioned handler types.Would such a logic solve the limitations of Kopf for the use-cases you mentioned?
Yes, I think it would. This is how I was thinking about the topic too. Leave the change detection via old/new diff in place while modifying the stream multiplexing so that it it feeds every resource event to the change detection so that it can make correct decisions about actual changes. Or, put differently, I think it is OK, actually desirable, to drop events in well known circumstances. Detecting nothing changed of the essence would be one. This of course presumes good knowledge about what is of the essence and that all assumptions kopf makes about this ideally overlap 100% with the assumptions an operator implementor.
The latter one being a moving target is making me think of this as related to #715 b/c that talks about what the diff change detection deems "essential". Do you think it would make sense to change the default at least for pods to always include status.*
when detecting changes?
To no.2: ... Reason 1: No matter how much Kopf adheres to RFCs, ...
Reason 2: Even if we would be able to implement proper patching in-memory the same way as Kubernetes does, ...
The latter is basically a showstopper: patches cannot be interpreted client-side in any way. The whole patching logic must be done server-side.
I completely agree in principle. However I think this small change to my proposal should make this work: Simply do never patch in-memory when a resource is still missing a storage structure but use the in-memory representation in change detection and handler progress calculation. At the end of the day it cannot matter by definition if kopf got its progress/diffbase storage from the actual resource data or from a cached in-memory representation. kopf today already expects the storage it patches onto the resource to come back verbatim. I think of it as a cache. The idea to patch the resource in-memory came from my desire to fully encapsulate the proposal in the ProgressStorage
/DiffBaseStorage
classes and make the change fully transparent to all existing parts of kopf dealing with the resource. Basically make it so that the resource looks like it always had the expected storage structure on it. But maybe that was never necessary. I did not actually check if access to the storage structures on the resource leak outside of the ProgressStorage
/DiffBaseStorage
classes. If it does not and the only accessors are those two classes and all its implementations, then everything should be fine as-is without patching in-memory.
About the resource-version thing. I never thought about using kube's resourcesVersion
for this. I'm aware it cannot be used for anything else but !=
. What I meant was "some_version_counter", controlled by kopf. I think it should work to define an incrementing counter field controlled by kopf which becomes part of the progress storage stanza. This way kopf could decide if the progress storage it sees on the actual resource is older or equal to the in-memory representation it has for that resource.
What do you think. Does that fix the idea?
FYI I just noticed a mistake I made. Of course it is not enough to just deal with ProgressStorage
. DiffBaseStorage
would need to be cached as well. The mistake was in my thinking about "the patch" as the entirety of all kopf managed resource stanzas and my false memory of ProgressStorage
to actually cover all those stanzas - which it does not.
Sorry if that seemed confusing! I updated my previous post.
Long story short
MODIFIED events on just CREATED resources might arrive before last-handled-configuration was written. This leads to the MODIFIED event being treated as
Reason.CREATE
b/c itsold
version is still empty.Loading the (empty)
old
manifest is tried here: https://github.com/nolar/kopf/blob/1d657e2b1e1669466d68e783cc6d23819cd130ed/kopf/reactor/processing.py#L153Falsely setting the cause reason to
CREATE
as a result of the emptyold
manifest is done here: https://github.com/nolar/kopf/blob/d29ac2bcb8481efd400910f36510f43dc1255851/kopf/reactor/causation.py#L197-L200The handler is not being called b/c its cause does not match the resource changing cause: https://github.com/nolar/kopf/blob/8fb507cc77ec55b179b7b115e40cef983f20b67b/kopf/reactor/registries.py#L201
Description
If the handler creating a resource via 3rd-party means like pykube still spends a small amount of time after creating the resource before returning, a quick update-after-create to the resource will queue up MODIFIED events before kopf had a chance to write its last-handled-config.
The following code snipped reproduces this. We had a situation where a 2-container pod had one container immediately crashing after creation. When this happened quickly enough after the pod was created the handler designated to deal with crashing containers was never called. Since I'm working from home via a DSL link to the data center where the cluster lives, the varying connection latency over the day through the VPN gateway is sometimes enough to trigger this. But only after today's lucky setting of a break-point (introducing a sufficient handler delay) right after the pod creation I was able to reliably reproduce it and find the root cause.
All the handler does btw after creating the pod is creating an event about the fact as well as setting kopf's
patch
dict.I believe this one to be broken at the queuing design level and have no good idea how to fix this. After looking at this I'm not sure the current implementation can be fixed for correctness without substantial rewrites (
memories
, maybe?). The assumptions currently made around last-handled-configuration can never be fully upheld as long as third parties other than kopf (a.k.a. pykube, kubernetes itself) modify resources too - which will of course always be true. However I'd be very happily proven wrong. Maybe the already queued MODIFIED events sans kopf storage annotations can be augmented in-memory with the missing data by remembering the CREATED event long enough. IDK.The following script:
time.sleeps
for 2s.on_update(...)
is never called and "wonky's status was updated: ..." is missing from the output.To make it work: Comment the
time.sleep(2)
after pod creation. Theon_update(...)
handler will be called.Note Running this script the first time might actually trigger
on_update
. This would be b/c thealpine
image might need to be pulled. If this takes longer than the 2s sleep, there will be MODIFIED events after that and kopf might have had enough time to write a last-handled-config. If the image is already there it should fail the first time - except maybe when run on very slow or loaded clusters so the container takes longer to crash. Simply increase the sleep to 3-4s then to still trigger it.event_race_bug.py
```python import time import kopf import pykube podspec = { "apiVersion": "v1", "kind": "Pod", "metadata": {"name": "wonky", "namespace": "default"}, "spec": { "containers": [ { "args": [ "-c", "\"echo 'Hello, sleeping for 1s'; sleep 1; echo 'Falling over now...'\"", ], "command": ["/bin/sh"], "image": "alpine:latest", "imagePullPolicy": "IfNotPresent", "name": "broken", }, { "args": [ "-c", "\"echo 'Hello, I'll stay alive much longer'; sleep 3600; echo 'Falling over now...'\"", ], "image": "alpine:latest", "imagePullPolicy": "IfNotPresent", "name": "sane", }, ], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never", "terminationGracePeriodSeconds": 30, }, } k_api: pykube.HTTPClient = pykube.HTTPClient(pykube.KubeConfig.from_env()) @kopf.on.startup() async def create_pod(**_): pod = pykube.Pod(k_api, podspec) # uncomment this if you're running the script multiple times and do not want to manually delete the pod each time # pod.delete() pod.create() # comment the following line to make the example work and allow on_update being called time.sleep(2) @kopf.on.update( "", "v1", "pods", field="status", ) async def on_update(name, status, **_): print(f"{name}'s status was updated: {status.get('phase')}") ```The exact command to reproduce the issue
```bash kopf run event_race_bug.py ```I hope somebody proves me wrong with my analysis, I really do, because if I'm correct it means that by definition I'll never be able to implement a correctly behaving operator using kopf as I would have to expect subtle errors like this one without any way to detect them through kopfs API.
Environment