apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.2k stars 1.21k forks source link

[pinot]Support for pausing the realtime consumption without disabling the table. #6302

Closed Aka-shi closed 1 year ago

Aka-shi commented 3 years ago

As of now, to pause the realtime consumption from kafka we have to disable the table. This also leads to the table not being available for querying.

It would be helpful if there is support for only stopping the realtime consumption while having the table available for querying.

mcvsubbu commented 3 years ago

Two questions, @Aka-shi

  1. After a pause, if the server is restarted, how do you desire the server should come back up? Should it consume up to the exact same paused place again? Should it simply not consume after the last completed/committed point? Should it forget that it was paused?
  2. If there are multiple replicas, then the above question becomes even harder since the behavior has to be co-ordinated across all replicas. I suppose it is not OK for one replica to forget the pause, and the other replica to still be paused.
  3. Further if there are multiple replicas, do you want them all to pause at the same place after a pause command?
mcvsubbu commented 3 years ago

@snleee , @sajjad-moradi if you have other questions/thoughts please chime in

mcvsubbu commented 3 years ago

Other questions:

  1. Would you want all partitions of the stream to pause , or just some specific partitions?
  2. Would you want all partitions/tables in a given server to pause?
Aka-shi commented 3 years ago

@mcvsubbu

Would you want all partitions of the stream to pause , or just some specific partitions??

For all partitions. The consumption for the table itself should be stopped. I was thinking something like this.

  1. User pauses stream -> pinot server commits the current consuming segments(for all partitions) along with offsets.
  2. User resumes stream -> Pinot starts consumption from all partitions from previously committed offsets.

Would you want all partitions/tables in a given server to pause?

I was expecting the pause option to be at a table level. Because of this #6555 . If the pause and reset APIs are available at a table level, then the user can pause the current stream(which would commit current segments and pause the stream), reset the offsets, and resume consumption from earliest/latest offsets as per config.

After a pause, if the server is restarted, how do you desire the server should come back up? Should it consume up to the exact same paused place again? Should it simply not consume after the last completed/committed point? Should it forget that it was paused?

If a table is paused and server restarts after it, then considering the previous consuming segments were already committed when the table is paused, I would expect the server to not consume after the restart too. My understanding is when we pause the stream, we are changing the state of the table and it should not start consuming until the user resumes the stream himself.

Further if there are multiple replicas, do you want them all to pause at the same place after a pause command?

Yes. That's what I would expect when I pause the stream. No replica of any partition consumer should be active is what I feel.

PS: Just putting it out here. When we pause the table, the table should still be available for querying the already consumed data. If not, this would be more or less like the enable/disable API.

mcvsubbu commented 3 years ago

Thanks for clarifying. So, you are not really looking for a "pause" (which I assumed to mean pause without committing segments). You are looking for a "commit now and do not consume until further instructions".

Last question (should have been the first). What is the use case for this?

Question for the community: Are there others who need this feature?

Eywek commented 3 years ago

Question for the community: Are there others who need this feature?

👋  yes. We're planning to use Pinot to store some data from our customers. We currently pull data from some APIs for our customers, transform it and store it. As this data can be quite large it means we need to stream it, and we need to be able to push this transformed data as a stream into pinot. To do this we plan to use a REALTIME table because we don't want to use Batch ingestion as it means that we need rely on a s3 bucket and we won't be able to know when the data is available for queries. But when we've finished to pull data from those APIs, we won't update the table anymore, so being able to stop the ingestion but keep the query available could be really useful to avoid putting useless pressure on our Apache Pulsar cluster (we using KoP to be able to use Kafka ingestion plugin).

mcvsubbu commented 3 years ago

@Eywek this seems to be a one-time operation as you describe it? Am I right? Once the data is loaded into pinot, you intend to shut off consumption and just query the data -- i.e. you do not expect more data to arrive in the realtime pipeline. Is that right?

Eywek commented 3 years ago

yep exactly

Eywek commented 2 years ago

Hi 👋

Do you have any news on this kind of feature?

kishoreg commented 2 years ago

@mcvsubbu @npawar we should definitely consider implementing this. I remember discussing a simple approach for this but I dont seem to recall the solution.

mcvsubbu commented 2 years ago

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes". Also, if you don't expect to touch the table or send any more data ever again, then why bother pausing? You can get all the queries, and data in consuming segment will be committed eventually, and everything should be good.

@Aka-shi please state your use case as well (i.e. why do you want to "pause" the stream? How long do you expect the pause to last? Will you also need a resume, or just pause and finish the table?)

Eywek commented 2 years ago

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes"

In my use case, I don't need to be able to resume, only being able to pause and query the data.

why bother pausing?

Because we don't need to put useless pressure on our Apache Pulsar cluster (e.g. creating consumers, reading data...), we won't be able to delete the topic if we have consumer attached to it. I also think that it's preferable to commit the segment when we pause so Pinot doesn't have to re-compute the segment if a server is restarted, and the data won't be lost.

mcvsubbu commented 2 years ago

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes"

In my use case, I don't need to be able to resume, only being able to pause and query the data.

why bother pausing?

Because we don't need to put useless pressure on our Apache Pulsar cluster (e.g. creating consumers, reading data...), we won't be able to delete the topic if we have consumer attached to it. I also think that it's preferable to commit the segment when we pause so Pinot doesn't have to re-compute the segment if a server is restarted, and the data won't be lost.

Good point.

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

Eywek commented 2 years ago

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

Good idea. But can we trigger a segment move (from REALTIME to OFFLINE) via the API? And know when this is done? And can we drop the REALTIME table (of an hybrid table) without breaking anything?

mcvsubbu commented 2 years ago

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

Good idea. But can we trigger a segment move (from REALTIME to OFFLINE) via the API? And know when this is done? And can we drop the REALTIME table (of an hybrid table) without breaking anything?

No, it is done automatically in a periodic manner. Also, you will need to deploy minions for it.

Of course, you can always do this manually by downloading the segments and uploading them to the offline table. APIs are available for it. A small script can change the segment name. This is a hack (as of now, since segment name change is not a supported feature), but if, for example, we have an API to add a segment to a table under a different name (not a bad ask, IMO), your problem can be completely solved with APIs.

Again, this is based on the assumption that yours is a one-time operation, and therefore having some manual intervention is not an issue.

Eywek commented 2 years ago

I would rather using the API instead of minions, since I would do it as soon as my job finished processing data, I want to end this job only when the data is ready (which means when the segment is in the OFFLINE table in this solution).

Not an issue if the API supports it since we can automate it in our job.

This solution seems to work for our use case. But downloading and uploading segments could be a long task, isn't it? There is no other way?

npawar commented 2 years ago

There is an API (check Task section on swagger) to trigger the movement from real-time to offline instead of waiting for the periodic task (you still need minions). However the realtimeToOffline task is not designed to achieve exactly what you want. It will not pick up the segments for movement unless they're completed segments. If it seems consuming segments, it will wait for them to complete. https://youtu.be/V_KNUUrS6DA (minute 33 onwards) explains that behavior. We might need to change the job's behavior to support this.

On Tue, Aug 3, 2021, 7:04 AM Valentin Touffet @.***> wrote:

I would prefer using API instead of minions, since I would do it as soon as my job finished processing data, I want to end this job only when the data is ready (which means when the segment is in the OFFLINE table in this solution).

Not an issue if the API supports it since we can automate it in our job.

This solution seems to work for our use case. But downloading and uploading segments could be a long task, isn't it? There is no other way?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/apache/pinot/issues/6302#issuecomment-892012040, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEWIC3NWSBA6K6TAQS5ZSEDT3AOQLANCNFSM4UIMSSVQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

mcvsubbu commented 2 years ago

There is an API (check Task section on swagger) to trigger the movement from real-time to offline instead of waiting for the periodic task (you still need minions). However the realtimeToOffline task is not designed to achieve exactly what you want. It will not pick up the segments for movement unless they're completed segments. If it seems consuming segments, it will wait for them to complete. https://youtu.be/V_KNUUrS6DA (minute 33 onwards) explains that behavior. We might need to change the job's behavior to support this. … On Tue, Aug 3, 2021, 7:04 AM Valentin Touffet @.***> wrote: I would prefer using API instead of minions, since I would do it as soon as my job finished processing data, I want to end this job only when the data is ready (which means when the segment is in the OFFLINE table in this solution). Not an issue if the API supports it since we can automate it in our job. This solution seems to work for our use case. But downloading and uploading segments could be a long task, isn't it? There is no other way? — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <#6302 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEWIC3NWSBA6K6TAQS5ZSEDT3AOQLANCNFSM4UIMSSVQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

The segment will complete when the time constraint is reached. Since there is no more new data, the last completed segment will be moved, and the next consuming segment will be empty. Am I right?

mcvsubbu commented 2 years ago

@Aka-shi please describe your use case for pause. Thanks

npawar commented 2 years ago

The segment will complete when the time constraint is reached. Since there is no more new data, the last completed segment will be moved, and the next consuming segment will be empty. Am I right?

Not quite. The movement job waits for the complete window. So if you've set window as 1h, it will wait until the completed segments have crossed the entire 1 hour. If there's a consuming segment immediately after, it will wait. There's no way to know if the next consuming segment is going to get events for the incomplete window or not. This was designed with the assumption that in production you'll have a continuous stream of events and none of these manual and edge cases.

My preference would be to design a pause/resume, instead of relying on the realtimeToOfflineJob for this. Or else, we'll be adding too many if-else to handle all these edge cases in the minion job. Plus, they would have to wait for the time threshold, or adjust it manually just for making this work.

mcvsubbu commented 2 years ago

I agree realtime to offline does not fit the bill here. I think the use case described by @Eywek is unique, so I wanted to avoid a feature build (that may not be simple) for a one-time operation in one installation. Also, I wanted to check if there are existing mechanisms that @Eywek could use one-time to satisfy their needs. Segment name change seems perfectly fine here.

@Eywek Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

npawar commented 2 years ago

Just putting it out there, one option is to wait for all data to make its way into completed segments, then delete the CONSUMING segments. If this is any way a one-time thing and reconsumption is not required, this will work. Anyway, hoping that we get a concrete use-case description, so we can build this out properly.

Eywek commented 2 years ago

Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

Yes I'll have many table with this situation, each time one of our customer try to import a new "datasource" (which happen maybe 10 times / day) we will create a new table and have this situation.

one option is to wait for all data to make its way into completed segments

That's not something that meet my use case, I want to be able to process and save data as fast as possible, waiting for pinot to complete a segment after an amount of time is increasing my processing time, which isn't desirable

yupeng9 commented 2 years ago

+1 to the feature. We have a similar request but for a different reason.

When we bootstrap an upsert table using the real-time segment push, we need to pause the real-time ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally, making it explicit will simplify the implementation and make it safer for operations.

mcvsubbu commented 2 years ago

e ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally,

Thanks, yupeng. I need to understand this better. Is this a race condition in pinot? If so, we should fix it regardless?

Let us discuss a little bit on this. I will reach out. and then we can summarize our discussion here.

mcvsubbu commented 2 years ago

Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

Yes I'll have many table with this situation, each time one of our customer try to import a new "datasource" (which happen maybe 10 times / day) we will create a new table and have this situation.

thanks, this is good to know.

one option is to wait for all data to make its way into completed segments

That's not something that meet my use case, I want to be able to process and save data as fast as possible, waiting for pinot to complete a segment after an amount of time is increasing my processing time, which isn't desirable

yupeng9 commented 2 years ago

e ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally,

Thanks, yupeng. I need to understand this better. Is this a race condition in pinot? If so, we should fix it regardless?

Let us discuss a little bit on this. I will reach out. and then we can summarize our discussion here.

You can think of it this way: When the segment in the history is modified, we need to scan the history to derive the latest upsert state per primary key, especially for the partial upsert. This scan cannot proceed when the real-time consumption is on, as we dont have the latest metadata info to handle the real-time update. The way to solve it is to put a lock and block the real-time ingestion, until the load/scan is completed.

mcvsubbu commented 2 years ago

@yupeng9 and I discussed offline, and here is the summary for Upsert support:

For upsert tables, we need an occasional refill of older data. During refill, the segments change data, and if ingestion is also going on, then it messes up the indexing since other (uploaded realtime) segments are being modified. A few minutes of pause where queries are still being served (but potentially served with stale data) is acceptable. The requirement is to pause all partitions, not one at a time.

Having things pause while retaining a CONSUMING segment with data in it is hard. Pausing is easy, but in case a server gets restarted, it is hard to remember the exact offset when things were paused, and consume up to that.

Instead, I suggest we support two controller API primitives

Things to think about and handle correctly (just listing a few)

I am sure people can add more.

yupeng9 commented 2 years ago

Thanks for writing down the summary!

A question on if we need to complete the consuming segment upon pause. Is it possible to simply pause the consumption? If the server restarts, then the server will lose the offset info, and restart from the previous checkpoint which is the same as the current consuming segment behavior. This could avoid the segment sealing (and small segment file) if the pause is not too long.

mcvsubbu commented 2 years ago

I suggest to provide an option of either discarding all the consuming segments or completing all of them. I feel that handling server restarts when we are consumed half way and paused will be hard.

Take the case when the server starts to consume a segment at offset 100 and there are two replicas A and B. After sometime, a "pause" command is entered. Replica A is at 150, and Replica B is at 160, and they stop consuming, with the rows still in memory.

Now, A gets restarted. Ingestion has continued, so the current offset available in stream is 200.

Should A just serve data until 100 (i.e. data that has been committed) whereas B will serve data until 150? Should A consume up to 150 and stop? Should A consume up to 160 and stop? What if there are three replicas?

It gets harder to maintain state.

We can provide the operator an option to either complete the current consuming segment or discard it. Either way, we "pause" at a point where everything is committed.

yupeng9 commented 2 years ago

Perhaps I didn't fully see the complexity. I feel when the consumption is paused, then it's paused regardless of the offset. If a server restarts, the offset will rewind, but it will not consume.

I think the same issue applies to normal consumption? You still have this offset mismatch across servers, when a server restarts?

Or in other words, I hope the pause can be a simple instruction to the consumers, but not related to the state transition.

Aka-shi commented 2 years ago

@mcvsubbu Sorry for the delayed response. The initial use case for which I raised this issue is: We had frequent downtimes on Kafka at that time and during every downtime server fails to ingest and we had to restart servers to recover. Even though we were informed of the downtime, the other option to avoid server restarts was to disable the table. But disabling the table also means querying is disabled for that table, which is not intended. So we wanted a functionality where if we are aware/anticipate ingestion failures we can just pause the ingestion and resume it later.

Recently, we are also using the upsert feature in pinot and we want to manually push segments to the upsert table too. SO @yupeng9's use case is valid for us too.

mcvsubbu commented 2 years ago

@Aka-shi Pinot automatically recovers from Kafka failures. How much time did you allow to recover? There is a periodic task (you can adjust the frequency to run every X minutes) that recreates failed segments and restarts consumption. We use that in production, and things just auto-recover.

https://docs.pinot.apache.org/configuration-reference/controller#periodic-tasks-configuration The one you want is controller.realtime.segment.validation.frequencyPeriod

If that does not work right, then we have a bug that needs to be fixed nevertheless.

mcvsubbu commented 2 years ago

@Aka-shi did the periodic task help you recover from kafka downtimes automatically?

mbecker commented 2 years ago

Hi,

maybe adding another question to this issue: How to restart the ingestion process (with new configuration)?

Let's assume the following two use cases:

How could I restart the streaming ingestion process (maybe with new configs that the "old" offsets doesn't match)?

I've tried to disable and then enable the table; I tried to "overwrite" the table (by just adding it again with new streamconfigs); didn't work.

Thanks!

mcvsubbu commented 2 years ago

@mbecker I think the scenario you state is closer to issue #6555 and/or issue #7280 but I agree that these are closely related.

Aka-shi commented 2 years ago

did the periodic task help you recover from kafka downtimes automatically?

@mcvsubbu Yes. That helped us in recovering. Apologies for delayed response.

sajjad-moradi commented 1 year ago

Pause/resume feature #8986 is now merged into master.