simerplaha / SwayDB

Persistent and in-memory key-value storage engine for JVM that scales on a single machine.
https://swaydb.simer.au
Apache License 2.0
293 stars 16 forks source link

High rate of ClosedChannelException during compaction #358

Open hicolour opened 2 years ago

hicolour commented 2 years ago

I'm getting a high rate of ClosedChannelException with the persistent map configuration described below.

java.nio.channels.ClosedChannelException: null
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
    at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790)
    at swaydb.core.io.file.ChannelFile.read(ChannelFile.scala:94)
    at swaydb.core.io.file.DBFile.read(DBFile.scala:412)
    at swaydb.core.io.reader.FileReader.read(FileReader.scala:73)
    at swaydb.core.segment.format.a.block.reader.BlockReader$.readFullBlock(BlockReader.scala:97)
    at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock(BlockReaderBase.scala:65)
    at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock$(BlockReaderBase.scala:64)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader.readFullBlock(UnblockedReader.scala:81)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader.readAllAndGetReader(UnblockedReader.scala:123)
    at swaydb.core.segment.format.a.block.Block$.unblock(Block.scala:299)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader$.apply(UnblockedReader.scala:69)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$6(SegmentBlockCache.scala:202)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$5(SegmentBlockCache.scala:197)
    at swaydb.data.cache.SynchronisedIO.$anonfun$value$7(Cache.scala:323)
    at swaydb.data.cache.LazyIO.$anonfun$getOrSet$3(Lazy.scala:165)
    at swaydb.data.cache.LazyValue.$anonfun$getOrSet$2(Lazy.scala:96)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.LazyValue.$anonfun$getOrSet$1(Lazy.scala:95)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.LazyValue.getOrSet(Lazy.scala:93)
    at swaydb.data.cache.LazyIO.getOrSet(Lazy.scala:165)
    at swaydb.data.cache.SynchronisedIO.value(Cache.scala:323)
    at swaydb.data.cache.DeferredIO.value(Cache.scala:295)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$createReader$1(SegmentBlockCache.scala:330)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.DeferredIO.getOrElse(Cache.scala:302)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createReader(SegmentBlockCache.scala:328)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createSortedIndexReader(SegmentBlockCache.scala:470)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.iterator(SegmentBlockCache.scala:531)
    at swaydb.core.segment.SegmentRef.iterator(SegmentRef.scala:889)
    at swaydb.core.segment.SegmentRef$.put(SegmentRef.scala:635)
    at swaydb.core.segment.PersistentSegmentOne.put(PersistentSegmentOne.scala:288)
    at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$2(Level.scala:1160)
    at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$1(Level.scala:1158)
    at swaydb.IO$IterableIOImplicit.mapRecoverIO(IO.scala:227)
    at swaydb.core.level.Level.putAssignedKeyValues(Level.scala:1155)
    at swaydb.core.level.Level.$anonfun$putKeyValues$2(Level.scala:1106)
    at swaydb.IO$Right.$anonfun$flatMap$1(IO.scala:570)
    at swaydb.IO$Right.flatMap(IO.scala:404)
    at swaydb.core.level.Level.putKeyValues(Level.scala:1098)
    at swaydb.core.level.Level.$anonfun$put$11(Level.scala:704)
    at swaydb.core.level.Level.$anonfun$reserveAndRelease$2(Level.scala:578)
    at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
    at swaydb.IO$Right.map(IO.scala:396)
    at swaydb.core.level.Level.$anonfun$reserveAndRelease$1(Level.scala:576)
    at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
    at swaydb.IO$Right.map(IO.scala:396)
    at swaydb.core.level.Level.reserveAndRelease(Level.scala:574)
    at swaydb.core.level.Level.put(Level.scala:694)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:191)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:176)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:155)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJob(ThrottleCompaction.scala:131)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJobs(ThrottleCompaction.scala:107)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runNow(ThrottleCompaction.scala:73)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:59)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:48)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.doWakeUp(ThrottleCompactor.scala:287)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:309)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:51)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1(ThrottleCompactor.scala:232)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1$adapted(ThrottleCompactor.scala:228)
    at swaydb.ActorWire.$anonfun$send$2$adapted(ActorWire.scala:148)
    at swaydb.ActorWire.$anonfun$actor$2$adapted(ActorWire.scala:61)
    at swaydb.Actor.swaydb$Actor$$receive(Actor.scala:773)
    at swaydb.Actor.$anonfun$swaydb$Actor$$basicWakeUp$1(Actor.scala:569)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:834)

  /*
    Specifies if key-value IDs should be cached. There are over 1300 key-value Ids, if caching is disabled then on disk binary-search is performed to find the IDs.
    https://swaydb.io/configuration/cacheKeyValueIds/?language=scala
   */
  val cacheKeyValueIdsOverride = false

  /*
    This enables caching raw bytes stored in Persistent Segment and disables caching of actual key-values
    https://swaydb.io/configuration/memoryCache/?language=scala
   */
  val memoryCacheOverride = MemoryCache.off

  /*
    Setting mmap to true in LevelZero will write key-values to memory-mapped write-ahead log files.
    If false, java.nio.FileChannel are used.
    https://swaydb.io/configuration/mmap/map/?language=scala
   */
  val mmapDisabled = MMAP.Off(
    ForceSave.Off
  )

  val threadStateCacheOverride = ThreadStateCache.Limit(hashMapMaxSize = 10, maxProbe = 1)
  //ThreadStateCache.off

  /*
    Configures all Persistent Segment file for the Level
    https://swaydb.io/configuration/segmentConfig/?language=scala
   */
  val segmentConfigOverride = DefaultConfigs
    .segmentConfig()
    .copyWithMmap(
      mmapDisabled
    )
    .copyWithCacheSegmentBlocksOnCreate(false)
    .copyWithFileOpenIOStrategy(IOStrategy.SynchronisedIO(cacheOnAccess = true))
    .copyWithBlockIOStrategy(
      blockIOStrategy = (_) => IOStrategy.SynchronisedIO(cacheOnAccess = false)
    )

  // Following configuration disable caching on the file read/write layer
  val fileCacheOverride = FileCache.On(
    0,
    ActorConfig.TimeLoop(
      name = s"${this.getClass.getName} - FileCache TimeLoop Actor",
      delay = 1.seconds,
      ec = DefaultExecutionContext.sweeperEC
    )
  )

  /*
    It stores all keys in sorted order.
    https://swaydb.io/configuration/sortedKeyIndex/?language=scala&q=sortedKeyIndex
   */
  val sortedKeyIndexOverride = DefaultConfigs.sortedKeyIndex(false)
  /*
    hashIndexes can double random read performance and reduce IOps which would increases overall DB performance.
    https://swaydb.io/configuration/randomKeyIndex/?language=scala
   */
  val randomSearchIndexOverride = DefaultConfigs.randomSearchIndex(false)
  /*
    It allows for faster read performance over sortedKeyIndex or linear-search for random reads and is essential for fast forward & reverse iterations.
    https://swaydb.io/configuration/binarySearchIndex/?language=scala
   */
  val binarySearchIndexOverride = DefaultConfigs.binarySearchIndex(false)
  /*
    BloomFilter is a small byte array that can be created in each persistent segment and is used to determine if a key exists in the segment without actually searching the Segment.
    https://swaydb.io/configuration/mightContainKeyIndex/?language=scal
   */
  val mightContainIndexOverride = DefaultConfigs.mightContainIndex(false)

  /*
    Configures storage for values within a persistent segment.
    https://swaydb.io/configuration/valuesConfig/?language=scala&q=mightContainIndex
   */
  val valuesConfigOverride = DefaultConfigs.valuesConfig(false)

  def mapCreate(name: String) =
    persistent.Map[String, Int, Nothing, Glass](
      dir = File.newTemporaryDirectory(name).deleteOnExit().path,
      mmapMaps = mmapDisabled,
      cacheKeyValueIds = cacheKeyValueIdsOverride,
      segmentConfig = segmentConfigOverride,
      sortedKeyIndex = sortedKeyIndexOverride,
      randomSearchIndex = randomSearchIndexOverride,
      binarySearchIndex = binarySearchIndexOverride,
      mightContainIndex = mightContainIndexOverride,
      valuesConfig = valuesConfigOverride,
      fileCache = fileCacheOverride,
      memoryCache = memoryCacheOverride,
    )
simerplaha commented 2 years ago

This will not cause any data consistency issue.

The problem here is known - Caching issue. Compaction is trying to merge a Segment file using a FileChannel that is closed. It should attempt at reopening the file. This could cause compaction to not progress if it keeps failing.

I've been wanting to fix this just haven't had the time.

Thank you for reporting these issues.