LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

Offer manual control over Grace MS settings #281

Closed yasinparsi closed 9 months ago

yasinparsi commented 9 months ago

Description

As you can see our pod pegasus-streams-6469cfbf4f-xqc2t in Kubernetes had 3 restarts

pic1

pic3

and this picture is for a longer range pic2

We have encountered elevated memory usage and suspect that it may be attributed to the static one-day setting of Grace MS. Consequently, we have opted to transition it to a dynamic configuration.

https://github.com/LGouellec/kafka-streams-dotnet/blob/aa833389b1b8b8afd6b68b4d8c724fdc6c924b18/core/Stream/JoinWindowOptions.cs#L63C9-L63C9

How to reproduce

       StreamBuilder builder = new();

        IKStream<string, Quantity> sellableQuantityStream =
            builder.Stream<string, Quantity, StringSerDes, JsonSerDes<Quantity>>(TopicSpecifications
                .SellableQuantityItems.Name);

        IKStream<string, Pricing> sellablePriceStream =
            builder.Stream<string, Pricing, StringSerDes, JsonSerDes<Pricing>>(TopicSpecifications
                .SellablePriceItems.Name);

        var productGlobalTable =
            builder.GlobalTable<string, Product, StringSerDes, JsonSerDes<Product>>(TopicSpecifications.Products.Name,
                InMemory.As<string, Product>("Product_Store"));

            sellableQuantityStream.Join(stream: sellablePriceStream,
                    valueJoiner: (quantity, pricing) => new EnrichPriceQuantity
                    {
                        OkPrice = pricing.OkPrice,
                        Quantity = quantity.SellableQuantity,
                        DiscountPercent = (int)pricing.DiscountPrice,
                        ProductId = pricing.ProductId,
                        Price = pricing.Price,
                        StoreId = pricing.StoreId,
                        MaxLimitOrder = pricing.MaxLimitOrder > 0
                            ? pricing.MaxLimitOrder
                            : (int)quantity.SellableQuantity,
                        StoreName = quantity.StoreName
                    }, JoinWindowOptions.Of(TimeSpan.FromMinutes(10)))
                .Join(productGlobalTable, (ProductId, _) => _.ProductId.ToString(),
                    (enrichPriceQuantity, product) => new InventoryItem()
                    {
                        StoreId = enrichPriceQuantity.StoreId,
                        Description = product.Description,
                        OkPrice = enrichPriceQuantity.OkPrice,
                        Quantity = (int)enrichPriceQuantity.Quantity,
                        DiscountPercent = (int)enrichPriceQuantity.DiscountPercent,
                        ProductId = product.ProductId,
                        Price = enrichPriceQuantity.Price,
                        Id = $"{enrichPriceQuantity.ProductId}^{enrichPriceQuantity.StoreId}",
                        Perishable = product.Perishable,
                        Priority = product.Priority,
                        AveragesSales = product.AveragesSales,
                        BrandId = product.BrandId,
                        BrandName = product.BrandName,
                        C1Id = product.C1Id,
                        C1Name = product.C1Name,
                        C2Id = product.C2Id,
                        C2Name = product.C2Name,
                        C3Id = product.C3Id,
                        C3Name = product.C3Name,
                        ComingSoon = product.ComingSoon,
                        CreatedOn = product.CreatedOn,
                        HasQuantity = enrichPriceQuantity.Quantity > 0,
                        ImageId = product.ImageId,
                        ImageUrl = product.ImageUrl,
                        IsBundle = product.IsBundle,
                        IsNew = product.IsNew,
                        IsSupplier = product.IsSupplier,
                        LowMargin = product.LowMargin,
                        ProductBarcode = product.ProductBarcode,
                        ProductName = product.ProductName,
                        StatusCode = product.StatusCode,
                        ReserveQuantity = (int)enrichPriceQuantity.Quantity,
                        StateCode = product.StateCode,
                        BrandLatinName = product.BrandLatinName,
                        SearchKeywords = product.SearchKeywords,
                        C2LatinName = product.C2LatinName,
                        C3LatinName = product.C3LatinName,
                        C1LatinName = product.C1LatinName,
                        MaxOrderLimit = product.MaxOrderLimit,
                        MaximumOrderWholesale = product.MaximumOrderWholesale,
                        NoInPackage = product.NoInPackage,
                        StoreName = enrichPriceQuantity.StoreName,
                        MainQuantity = (int)enrichPriceQuantity.Quantity
                    }).To<StringSerDes, JsonSerDes<InventoryItem>>("CatalogItem");

        return builder.Build();

Checklist

Please provide the following information: