opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.15k stars 1.69k forks source link

Explore use of LogByteSizeMergePolicy for time series data use cases #9241

Open rishabhmaurya opened 11 months ago

rishabhmaurya commented 11 months ago

Is your feature request related to a problem? Please describe. LogByteSizeMergePolicy always merges adjacent segments together which could be helpful for cases for time series data where documents are sorted based on timestamp and segments usually don't have much overlap on timestamps. At query time, its better if the time range can be contained in lesser number of segments and other segments can be skipped by checking min/max value of timestamp field. When adjacent segments are merged, its likelihood increases significantly. TieredMergePolicy, which is successor of LogByteSizeMergePolicy and current default merges segments more smartly and can merge non-adjacent segments too, which could be inefficient for time series data.

Describe the solution you'd like Explore usage of LogByteSizeMergePolicy for data streams use cases where @timestamp is a mandatory field.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

gashutos commented 11 months ago

There is one more mergepolicy got introduced interleaved ShuffledMergePolicy. This was introduced for exactly opposite reason than what you are trying to achieve here and make the time series based documents shuffled for better desc order performance. desc sort order is wide scenario on timeseries and computation heavy compare to range on time series based workload. range queries anyway are very fast due to BKDs and I would be very curious to see how much improvement we can get with LogByteSizeMergePolicy merge policy.

Will be curious to know numbers here, but definitely consider comparing sort performance along with range queries

rishabhmaurya commented 11 months ago

Thanks @gashutos for taking a look. I'm currently working towards getting numbers out for comparison. The shuffle merge policy is only applicable on force merges, which is like a final merge operation before index goes into read-only state for most of the cases. And there, as total number of segments would be less in number and bigger in size, it totally makes sense to interleave read operation on old and new segments.

range queries anyway are very fast due to BKDs and I would be very curious to see how much improvement we can get with LogByteSizeMergePolicy merge policy.

This optimization wouldn't be helpful with anything within BKD, the min and max ranges of timestamps for BKDs will be significantly lower after this change and points would be more dense. Also, searching across segments would be more ordered too.

rishabhmaurya commented 11 months ago

I'm not seeing any improvement using LogByteSizeMergePolicy against http_logs workload.

Concern 1:

http_logs doesn't generate workload which matches real world use cases of data streams where multiple clients are ingesting in a time ordered way. The way current logic works is, the whole corpus is segmented into chunks and each indexing client is assigned an offset from where they start ingesting concurrently. This makes the timestamp order randomized and LogByteSizeMergePolicy is not expected to perform any better in such scenarios and would result in higher write and read amplification without any gains in read latencies.

Concern 2:

OpenSearch benchmark currently completes ingestion and then run queries for http_logs workload as per test procedure defined here. This is like running queries only on read-only indices which is usually not the case in production environment of time based data like logs. LogByteSize merge policy would be more effective for indices with active writes.

I'm currently working on setting up such data stream locally to evaluate the improvements if any. Meanwhile, also working with opensearch-benchmark to add such capability - https://github.com/opensearch-project/opensearch-benchmark/issues/365

I'm sounding more like tailoring the use cases to align with effectiveness of LogByteSize merge policy, but they seem to me like a valid use cases which should be incorporated in opensearch-benchmarks.

tagging @nknize

Posting numbers with current logic - Machine: r5.2xlarge Heap: 32 GB

LogByteSizeMerge Policy

Using following defaults - https://github.com/rishabhmaurya/OpenSearch/blob/a0ccb11de2770f9fde7b49508410def89d02934a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java#L231-L236

mergeFactor = 10
minMergeSize = 2MB
maxMergeSizeMB = 5GB
maxMergeMBForForcedMerge = -1 (or inf)

Latency (ms)

Operation Type | P50 | P90 | P99 | P100 -- | -- | -- | -- | -- hourly_agg | 3,092.1 | 3,163.6 | 3,215.5 | 3,286.4 asc_sort_timestamp | 5.657 | 6.398 | 7.855 | 8.041 index-stats-wait-until-merges-finish | 22,793.3 | 26,391.2 | 26,391.2 | 26,391.2 asc_sort_with_after_timestamp | 16.047 | 22.208 | 23.94 | 25.825 index-stats-wait-until-merges-1-seg-finish | 40,995.5 | 72,900.9 | 72,900.9 | 72,900.9 desc-sort-with-after-timestamp-after-force-merge-1-seg | 217.5 | 283.5 | 312.1 | 325.6 force-merge-1-seg | 240,011.3 | 240,013 | 240,013 | 240,013 scroll | 587 | 601.9 | 659.7 | 720.4 range | 7.373 | 7.917 | 10.654 | 54.852 index-append | 175.4 | 267.6 | 476.2 | 10,609.1 desc_sort_with_after_timestamp | 129.8 | 143.5 | 155.4 | 180.7 default | 4.349 | 4.554 | 4.845 | 6.617 asc-sort-timestamp-after-force-merge-1-seg | 5.784 | 7.524 | 8.455 | 9.358 200s-in-range | 5.98 | 6.326 | 6.745 | 10.67 400s-in-range | 3.976 | 4.26 | 4.548 | 6.429 term | 4.542 | 4.794 | 5.236 | 6.076 desc_sort_timestamp | 48.449 | 70.893 | 76.449 | 78.903 asc-sort-with-after-timestamp-after-force-merge-1-seg | 37.074 | 43.786 | 47.948 | 60.585 desc-sort-timestamp-after-force-merge-1-seg | 181.8 | 228.5 | 262.5 | 270.1 ## TieredMergePolicy

Latency (ms)

Operation Type | P50 | P90 | P99 | P100 -- | -- | -- | -- | -- hourly_agg | 3,003.4 | 3,079.4 | 3,132.8 | 3,187.5 asc_sort_timestamp | 7.053 | 8.149 | 9.872 | 10.126 index-stats-wait-until-merges-finish | 5,586.6 | 58,237.1 | 58,237.1 | 58,237.1 asc_sort_with_after_timestamp | 18.278 | 21.437 | 24.302 | 48.292 index-stats-wait-until-merges-1-seg-finish | 19,230.3 | 103,239 | 103,239 | 103,239 desc-sort-with-after-timestamp-after-force-merge-1-seg | 95.924 | 164.9 | 190.2 | 211 force-merge-1-seg | 240,010.7 | 240,013 | 240,013 | 240,013 scroll | 598.8 | 612.9 | 671.5 | 682.2 range | 7.636 | 8.312 | 10.476 | 47.232 index-append | 163.2 | 259.5 | 509.5 | 11,028.8 desc_sort_with_after_timestamp | 41.707 | 55.354 | 68.997 | 99.183 default | 4.556 | 4.824 | 5.579 | 6.17 asc-sort-timestamp-after-force-merge-1-seg | 6.822 | 7.802 | 9.374 | 9.95 200s-in-range | 6.326 | 6.925 | 10.44 | 41.752 400s-in-range | 4.201 | 4.522 | 5.238 | 5.66 term | 5.157 | 5.52 | 6.151 | 7.539 desc_sort_timestamp | 18.51 | 24.365 | 49.348 | 56.725 asc-sort-with-after-timestamp-after-force-merge-1-seg | 31.552 | 38.305 | 47.129 | 79.136 desc-sort-timestamp-after-force-merge-1-seg | 113.8 | 169 | 181.1 | 184

gashutos commented 11 months ago

Thank you for the benchmark !! Some regression is there as well.

http_logs doesn't generate workload which matches real world use cases of data streams where multiple clients are ingesting in a time ordered way. The way current logic works is, the whole corpus is segmented into chunks and each client is assigned an offset from where it starts ingesting. This makes the timestamp order randomized and LogByteSizeMergePolicy is not expected to perform any better in such scenarios and would result in higher write and read amplification without any gains in read latencies

If you take a segment and see values, they are very much sorted by timestamp. (nearly sorted). Just do desc order sort by timestamp on any segment and you will notice results from last chunk of docIds in segment. Would not this be a production level scenario ? Where multiple service are pushing logs and they are pushing in chunks only. SO it might be possible skewing sort order little bit.

Probably for testing, you can try with --workload-params="bulk_indexing_clients:1" ?

rishabhmaurya commented 11 months ago

I do see some performance gain when running locally on system with following configuration -

I had to customize the http_logs workload. I'm just using one log file, index with one shard and just few operations - index-append, asc_sort_timestamp, desc_sort_timestamp; I had to explicitly set refresh interval as 1s so that the translog doesn't grow huge and new segments are created frequently, so that merge policy can come into picture sooner. If we don't set refresh interval setting, since all searches happens after indexing is complete and force merge is complete, so it keep filling translog and segments are created much later (whe explicit force merge is triggered) and thus merge policy comes into picture much later.

workload: customized http_logs 
index: logs-181998; shard count: 1; doc count: 2708746;
indexing client count: 1
benchmark machine: c4.2xlarge
data node count: 1; r5.2xlarge; heap: 32GB

With tiered merge policy

|                                                 Min Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |      desc_sort_timestamp |     10.3268 |     ms |
|                                        90th percentile latency |      desc_sort_timestamp |     11.4483 |     ms |
|                                       100th percentile latency |      desc_sort_timestamp |     12.5894 |     ms |
|                                   50th percentile service time |      desc_sort_timestamp |     7.01704 |     ms |
|                                   90th percentile service time |      desc_sort_timestamp |     8.43393 |     ms |
|                                  100th percentile service time |      desc_sort_timestamp |     10.1131 |     ms |
|                                                     error rate |      desc_sort_timestamp |           0 |      % |
|                                                 Min Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |       asc_sort_timestamp |     9.57576 |     ms |
|                                        90th percentile latency |       asc_sort_timestamp |     10.3733 |     ms |
|                                       100th percentile latency |       asc_sort_timestamp |     11.4519 |     ms |
|                                   50th percentile service time |       asc_sort_timestamp |      6.4471 |     ms |
|                                   90th percentile service time |       asc_sort_timestamp |     7.24644 |     ms |
|                                  100th percentile service time |       asc_sort_timestamp |      8.9884 |     ms |
|                                                     error rate |       asc_sort_timestamp |           0 |      % |

with log merge policy

mergeFactor = 10
minMergeSize = 2MB
maxMergeSizeMB = 5GB
maxMergeMBForForcedMerge = -1 (or inf)
|                                                 Min Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |      desc_sort_timestamp |     9.71618 |     ms |
|                                        90th percentile latency |      desc_sort_timestamp |     10.8187 |     ms |
|                                       100th percentile latency |      desc_sort_timestamp |     12.6003 |     ms |
|                                   50th percentile service time |      desc_sort_timestamp |      6.6176 |     ms |
|                                   90th percentile service time |      desc_sort_timestamp |     7.80692 |     ms |
|                                  100th percentile service time |      desc_sort_timestamp |     9.29486 |     ms |
|                                                     error rate |      desc_sort_timestamp |           0 |      % |
|                                                 Min Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |       asc_sort_timestamp |     8.70018 |     ms |
|                                        90th percentile latency |       asc_sort_timestamp |     9.96291 |     ms |
|                                       100th percentile latency |       asc_sort_timestamp |     10.8828 |     ms |
|                                   50th percentile service time |       asc_sort_timestamp |     5.62269 |     ms |
|                                   90th percentile service time |       asc_sort_timestamp |     6.75591 |     ms |
|                                  100th percentile service time |       asc_sort_timestamp |      8.3948 |     ms |
|                                                     error rate |       asc_sort_timestamp |           0 |      % |

If you take a segment and see values, they are very much sorted by timestamp. (nearly sorted). Just do desc order sort by timestamp on any segment and you will notice results from last chunk of docIds in segment. Would not this be a production level scenario ? Where multiple service are pushing logs and they are pushing in chunks only. SO it might be possible skewing sort order little bit.

if you look at the timestamp difference between the 1st and the 8th client at a given time, its quite significant - https://github.com/opensearch-project/opensearch-benchmark/issues/365#issuecomment-1687282593

rishabhmaurya commented 11 months ago

below is the snapshot of the segments created by both merge policies. The X axis denotes the segment, sorted by the document count and Y axis represents the @timestamp field min, max value stored in BKD. We can see the delta by min & max is growing large in tiered MP. Given, the corpus is small, we didn't see much performance improvements. Next I'm working on ingesting more documents taking care of following -

  1. all indexing clients should process the timestamps as close as possible.
  2. set explicit refresh interval to values - 5s, 10s, 30s and compare the performance.
  3. measure the write amputation for merges and compare both policies.
  4. Compare the performance of - asc_sort_with_after_timestamp desc_sort_with_after_timestamp - - with different and more relevant search_after values. asc_sort_timestamp &desc_sort_timestamp.

Log Byte Size LogMerge_2

Tiered Tiered_2

itiyama commented 11 months ago

@rishabhmaurya If time is really a concern, why not make the merge policy time aware?

rishabhmaurya commented 11 months ago

@itiyama open to such experiments however I think the gain may not be significant for timeseries usecase. Assuming the agents sending data from different machines don't have a significant time lag, merging adjacent segments would ensure the overlap of timestamps across segments isn't a lot however tiered merge policy could worsen the overlaps. The timestamp based approach might be more helpful in cases when there is a significant lag between clients ingesting but that's usually not the case and we should not to optimize for such anomalies. Do you have better ideas on how you want to make use of timestamps?

rishabhmaurya commented 10 months ago

Some of the outstanding items before making LogByteSizeMergePolicy default for timestamp based index -

  1. With current testing so far, the tail segments created by LogByteSizeMergePolicy are significantly bigger than TieredMergePolicy keeping merge factor and min segment size same. This is causing increase in latency for desc sort queries when compared to TieredMergePolicy in http logs workload. AI Fix https://github.com/opensearch-project/opensearch-benchmark/issues/365 and experiment with LogByteSizeMergePolicy settings and optimize for http_logs dataset.
  2. Check if we can improve the behavior of LogByteSizeMergePolicy to avoid very large segments in case of findForcedDeletesMerges as discussed here https://github.com/opensearch-project/OpenSearch/pull/9992#discussion_r1334903911