NullReferenceException when restoring topology with stream-stream join.
Code to reproduce the issue. Make sure at least 1 message in the stream consumed. Stop the test application and run it again so that the state will be restored, then the issue will happen.
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
var config = new StreamConfig<StringSerDes, StringSerDes>
{
ApplicationId = "reproduce-nullreferenceexception",
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest,
Logger = LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Debug);
builder.AddConsole();
})
};
StreamBuilder builder = new StreamBuilder();
var stream = builder.Stream<string, string>("topic1").SelectKey((k, v) => "key");
builder
.Stream<string, string>("topic2")
.SelectKey((k, v) => "key")
.Join(
stream,
(s, v) => $"{s}-{v}",
JoinWindowOptions.Of(TimeSpan.FromSeconds(10)))
.To("output-join");
Topology t = builder.Build();
var kafkaStream = new KafkaStream(t, config);
await kafkaStream.StartAsync();
Console.ReadLine();
info: Streamiz.Kafka.Net.KafkaStream[0]
stream-application[test-stream-stream-join] State transition from REBALANCING to RUNNING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[test-stream-stream-join-stream-thread-0] State is RUNNING, initializing and restoring tasks if necessary
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Task 1-0 state transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[0|0] Task 0-0 state transition from CREATED to RUNNING
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
Loaded offsets from checkpoint manager:
info: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
Initializing to the starting offset for changelog test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog [[0]] of in-memory state store KSTREAM-JOINTHIS-0000000012-store
info: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
Initializing to the starting offset for changelog test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog [[0]] of in-memory state store KSTREAM-JOINOTHER-0000000013-store
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[2|0] Task 2-0 state transition from CREATED to RESTORING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[2|0] Restoration will start soon.
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
State store test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog [[0]] metadata found (begin offset: 0 / end offset : 4)
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
State store test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog [[0]] metadata found (begin offset: 0 / end offset : 4)
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
Added partitions with offsets test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog-[0]#Beginning [-2],test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog-[0]#Beginning [-2] to the restore consumer, current assignment is test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog-[0],test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog-[0]
fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[test-stream-stream-join-stream-thread-0] Encountered the following error during processing:
System.NullReferenceException: Object reference not set to an instance of an object.
at Streamiz.Kafka.Net.ProcessorContext.get_Timestamp()
at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.<>c__DisplayClass23_0.<Init>b__0(Bytes key, Byte[] value)
at Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager.Restore(StateStoreMetadata storeMetadata, IEnumerable`1 records)
at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.RestoreChangelog(ChangelogMetadata changelogMetadata)
at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.Restore()
at Streamiz.Kafka.Net.Processors.StreamThread.RestorePhase()
at Streamiz.Kafka.Net.Processors.StreamThread.Run()
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[test-stream-stream-join-stream-thread-0] Shutting down
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[test-stream-stream-join-stream-thread-0] State transition from RUNNING to PENDING_SHUTDOWN
How to reproduce
Make sure kafka is running, Messages exist in Topic1 and Topic2
Run the test application provided above, make sure at least 1 message in the streams are consumed.
Stop the test application, make sure the JOIN state topics are generated in kafka.
Re-run the test application, the exception will occur during the restore phase
Checklist
Please provide the following information:
[x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
[x] A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
[x] Streamiz.Kafka.Net nuget version.
[x] Apache Kafka version.
[x] Client configuration.
[x] Operating system.
[x] Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
Description
NullReferenceException when restoring topology with stream-stream join.
Streamiz.Kafka.Net nuget version 1.2.1
Apache Kafka version any version
Client configuration ApplicationId = "reproduce-nullreferenceexception", BootstrapServers = "127.0.0.1:9092", AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest,
Operating system Win10
logs
How to reproduce
Checklist
Please provide the following information: