Closed bajaj-varun closed 5 years ago
RocksDB and Windows has sometimes issue... Try to run in a container or VM, or maybe switch to in-memory stores. As an advanced alternative, try to compile RocksDB for your platform to get native binaries.
Hi @mjsax Thanks for your response. I can;t use VM or Container or compile RocksDB on local machine due to business constraints.
"maybe switch to in-memory stores. As an advanced alternative" - request you to kindly guide me to some links for more details. We have in memory store available with us is H2 or Sqlite, so is this possible to change metastore on either one?
The simplest way is to use Kafka Streams' in-memory store that you can configure on a per-operator basis by passing for example Materialized.as(Stores.inMemoryKeyValueStore("your-store-name"))
as parameter to the operator.
If you want you use a customized in-memory store (eg H2 or Sqlite), you would need to implement the corresponding store interface (ie, KeyValueStore
, WindowStore
, or SessionStore
) and a StoreSupplier
and pass your custom store via Materialized.as(new MyCustomStoreSupplier())
.
Thanks for your much needed help @mjsax 👍 , reading around your suggestion and will update the code accordingly. Marking close the ticket.
My broker and zookeeper services are running on Linux boxes and dev box OS is Windows 10. Stream Stateful operations are not working at my end due to following error -
Exception in thread "pageview-region-example-client-StreamThread-1" java.lang.UnsatisfiedLinkError: org.rocksdb.Options.newOptions()J at org.rocksdb.Options.newOptions(Native Method) at org.rocksdb.Options.(Options.java:35)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:114)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:171)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:64)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:116)
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:234)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:75)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:318)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)