jankotek / mapdb

MapDB provides concurrent Maps, Sets and Queues backed by disk storage or off-heap-memory. It is a fast and easy to use embedded Java database engine.
https://mapdb.org
Apache License 2.0
4.87k stars 872 forks source link

InputData2.DataInputToStream is broken #971

Open tribbloid opened 4 years ago

tribbloid commented 4 years ago

Written in scala but should be interoperable with kotlin:

      val input = new DataInput2.ByteArray(Array(2, 2))
      val iStream = new DataInput2.DataInputToStream(input)
      val bytes = IOUtils.toByteArray(iStream)

looks simple right?

java.lang.ArrayIndexOutOfBoundsException
    at java.lang.System.arraycopy(Native Method)
    at org.mapdb.DataInput2$ByteArray.readFully(DataInput2.java:28)
    at org.mapdb.DataInput2$DataInputToStream.read(DataInput2.java:242)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2314)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:2270)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2291)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:2246)
    at org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:765)
    at org.apache.spark.sql.spookystuf.ExternalAppendOnlyArray$SerDe$.<init>(ExternalAppendOnlyArray.scala:93)

This should be a low level mistake

tribbloid commented 4 years ago

BTW this is my implementation:

    override def read(b: Array[Byte], off: Int, len: Int): Int = {

      val srcArray = in.internalByteArray()
      val srcBuffer = in.internalByteBuffer()

      val _len =
        if (srcArray != null) Math.min(srcArray.length, len)
        else if (srcBuffer != null) Math.min(srcBuffer.remaining(), len)
        else len

      val pos = in.getPos

      try {
        in.readFully(b, off, _len)
        _len
      } catch {

        // inefficient way
        case e: RuntimeException =>

          in.setPos(pos)

          (off until (off + len)).foreach { i =>
            try {

              val next = in.readByte()
              b.update(i, next)
            } catch {
              case _: EOFException | _: RuntimeException =>
                return i
            }
          }
          len
      }
    }

Not sure if shorter version is possible