Open amielc1 opened 1 month ago
using Confluent.Kafka; using Confluent.Kafka.Admin; using System; using System.Collections.Generic; using System.Threading.Tasks;
class Program { public static async Task Main(string[] args) { var config = new AdminClientConfig { BootstrapServers = "localhost:9092" };
using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var topicSpecifications = new List<TopicSpecification>
{
new TopicSpecification
{
Name = "my-topic",
NumPartitions = 3,
ReplicationFactor = 1,
Configs = new Dictionary<string, string>
{
{ "retention.ms", "604800000" }, // שבוע אחד במיליסקנדות
{ "retention.bytes", "1073741824" } // 1 גיגהבייט
}
}
};
await adminClient.CreateTopicsAsync(topicSpecifications);
Console.WriteLine("Topic created successfully with retention policy");
}
catch (CreateTopicsException e)
{
Console.WriteLine($"An error occured creating topic: {e.Results[0].Error.Reason}");
}
}
}
}
retention policy - per topic
After failure on consumer connect - in witch offset we will consume