apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.29k stars 3.66k forks source link

ability to let user configure segment version in indexing task #8249

Closed himanshug closed 10 months ago

himanshug commented 4 years ago

Currently, various indexing tasks auto generate the segment version which is used by the overshadowing logic. We have an use case (for Parallel index task and Local Index task) where the overshadowing should happen based on when the data was generated by the ETL pipelines and not when Druid indexing is running for those which could many times run in different order for many reasons e.g. Druid tasks may fail and are resubmitted.

Can we add a feature wherein user could explicitly send the segment version to be used for all generated segments in task context. I don't think it would make sense for "append" mode where we append data to existing segments.

himanshug commented 4 years ago

@jihoonson I see that there has been a lot of code changes in locking etc and I haven't gone through them yet, do you think this would be doable in a central place or needs to be handled per task ?

jihoonson commented 4 years ago

I think it could depend on the lock granularity (segment lock vs time chunk lock) and the rollup mode (perfect rollup vs best-effort rollup).

I understand your use case could need this kind of feature. But before we talk about implementation details, I'm wondering this is really a good idea. Even though Hadoop task already supports the custom segment version, I feel like it's a hacky way to avoid the segment versioning system of Druid which could be hard to use and even dangerous if something happens (like they might see some stale data unexpectedly). Also, it's very weird to me if indexing tasks could generate segments overshadowed by the existing segments. It could be just waste of time and resources I guess.

We have an use case (for Parallel index task and Local Index task) where the overshadowing should happen based on when the data was generated by the ETL pipelines and not when Druid indexing is running for those which could many times run in different order for many reasons e.g. Druid tasks may fail and are resubmitted.

I guess you're using a sort of workflow scheduler tool and, ideally, this issue should be addressed in the tool. Do we need this because it's too hard or complex to guarantee the proper job execution order in the tool?

himanshug commented 4 years ago

I guess you're using a sort of workflow scheduler tool and, ideally, this issue should be addressed in the tool. Do we need this because it's too hard or complex to guarantee the proper job execution order in the tool?

Let me elaborate the scenario a bit more. There are many ETL things running (outside of my control) which would produce data in any arbitrary interval . If data interval overlaps from two different ETL jobs then , for the overlapped interval, data produced by later ETL job is "correct".

the "tool" could address it by being intelligent i.e. never submit druid task for a dataset if a druid task for another dataset with overlapping interval already running , wait for that to finish first, if task failed then retry it (if task continues to fail for some unexpected data corruption then that would block the whole pipeline, and also tasks fail for things unrelated such as occassionally druid k8s pods got rescheduled etc). Overall, aside from needing more intelligence in the tool, This limits our ability to parallelize indexing ability of different datasets as in most cases each overlaps with the previous one . This is not a major concern as of now but would become a limitation as load increases. In this use case, we never "append" , never "rollup" as data is already always grouped by ETL jobs upfront.

OTOH if I could let segment version be a timestamp token coming from ETL job then tool can run druid tasks for any of the uploaded data set in any order , in parallel or whatever. That makes tool's(and mine as writer of that tool infra) life so much more easy.

I feel like it's a hacky way to avoid the segment versioning system of Druid which could be hard to use and even dangerous if something happens (like they might see some stale data unexpectedly). Also, it's very weird to me if indexing tasks could generate segments overshadowed by the existing segments. It could be just waste of time and resources I guess.

all the catches you mentioned are acceptable. For Euphemism, I would call it a "power user" option instead of "hack" :) , where user understands the consequences. Unless, there could be a more ideal solution to the problem

jihoonson commented 4 years ago

Hmm, I understand it could be useful for your use case if custom segment version is allowed, but I still think it would be better if the functionality supported by Druid is more organized and safe.

I'm wondering if it's possible to achieve the same goal with a new supervisor (not supervisor task). This supervisor coordinates the completion order of incoming tasks. I guess the completion order might be defined in a various way in the future, but we can start with a simple one like FIFO. So, you may want to submit all indexing tasks to this supervisor instead of directly submitting them to the overlord. The supervisor guarantees first-issued tasks are finished first, which means the segments created by later tasks will have higher versions.

I think this could be implemented quite easily with a simple restriction, i.e., the intervals parameter should be specified in the granularitySpec. If intervals of a new task are overlapped with those of any other tasks, then that new task will wait for those tasks to be finished first. If a task fails, the supervisor will retry it until it succeeds or the number of failures exceeds a configured limit. With this supervisor, your tasks can still be run in parallel if their intervals are not overlapped. Tasks of overlapping intervals can't be run at the same time, but I think this should be same in the custom segment version approach. What do you think?

jihoonson commented 4 years ago

Hmm, I guess it might not work for you if the tasks are issued out of order..

himanshug commented 4 years ago

@jihoonson thanks ... yeah , basically it is very much possible for following pathological case to happen for our env...

dataset#1 data interval: hour1 - hour3 dataset#2 data interval: hour2 - hour4 dataset#3 data interval: hour3 - hour5 ....

which requires indexing of these datasets be done sequentially unless we can control the segment version irrespective of who submits the indexing tasks.

so, I am assuming that you agree there is not a ideal/simple alternative and we can support something in the task to let user override segment version. we can leave it an undocumented option or documented with warning phrases describing the pitfalls.

jihoonson commented 4 years ago

Yes, I don't see a simple and better alternative. For this kind of feature, I personally prefer to document with a scary warning with details.

@jihoonson I see that there has been a lot of code changes in locking etc and I haven't gone through them yet, do you think this would be doable in a central place or needs to be handled per task ?

So, you don't have to roll up and it will make things a bit simpler. I think the part you may want to modify will depend on what lock type you want to use.

If a task is using the time chunk lock, it can assign the segment version by itself (LocalSegmentAllocator for local indexing and ParallelSupervisorTask for parallel indexing).

If it's using the segment lock, the segment version is assigned by the overlord in a centralized manner. This is because the major version of the new segment is determined based on the existing segments.

With the segment lock, all your tasks can run at the same time unless they are overwriting existing segments. For segment lock, you may want to set custom minor versions. With the time chunk lock, tasks can run at the same time only if their intervals are not overlapped. For time chunk lock, you may want to set custom major versions.

himanshug commented 4 years ago

going through https://github.com/apache/incubator-druid/issues/7491 , thanks for documenting that , it is helpful to have that kind of knowledge inside the proposals.

stale[bot] commented 4 years ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

jihoonson commented 4 years ago

Still relevant.

stale[bot] commented 4 years ago

This issue is no longer marked as stale.

github-actions[bot] commented 11 months ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

github-actions[bot] commented 10 months ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.