kaitai-io / kaitai_struct_java_runtime

Kaitai Struct: runtime for Java
MIT License
42 stars 18 forks source link

Usage of InputStream instead of file path #16

Open mykolapolonskyi opened 5 years ago

mykolapolonskyi commented 5 years ago

maybe generated code can be more flexible with usage of of InputStream instead of file path

e.g.

Stl.fromFile(ClassPathResource("stl/bottom_shell_ONLY.stl").file.absolutePath) 

it won't give chance to process files that comes from network(gdrive) for example without saving on disk

GreyCat commented 5 years ago

Note that InputStream does not have a way to do seeks/positioning in a stream, and that's vital for many formats.

You're welcome to contribute an implementation of KaitaiStream which will use InputStream (for example, you can throw RuntimeExceptions on attempts to seek, or try some magic with mark/reset), but without seeks it will be probably only useful for only a few simple formats, and with seek emulation it will be relatively slow and memory-hogging.

ratcashdev commented 2 months ago

Consider the code below:

import io.kaitai.struct.ByteBufferKaitaiStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.agrona.LangUtil;

public class KaitaiInputStream extends ByteBufferKaitaiStream {
  private static final int BUFFER_SIZE = 32 * 1024;

  private final ReadableByteChannel channel;
  private final ByteBuffer buffer;
  private final ByteBuffer helper;
  private long pos;

  public static KaitaiInputStream fromStream(InputStream is) {
    final var channel = Channels.newChannel(is);
    final var buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    return new KaitaiInputStream(channel, buffer);
  }

  private KaitaiInputStream(ReadableByteChannel ch, ByteBuffer buffer) {
    super(buffer);
    this.channel = ch;
    this.buffer = buffer;
    this.buffer.limit(0);
    this.helper = ByteBuffer.allocateDirect(buffer.capacity());
    this.helper.limit(0);
  }

  private void readBytesToBuffer(int count) {
    int unused = buffer.capacity() - buffer.limit();
    if (unused > count) {
      append(buffer, count);
      return;
    }

    pos += buffer.position();
    moveRemainingToFront();
    append(buffer, count);
  }

  private void moveRemainingToFront() {
    helper.clear();
    helper.put(buffer);
    helper.flip();
    buffer.clear();
    buffer.put(helper);
    buffer.flip();
  }

  private void append(ByteBuffer dst, int count) {
    int mark = dst.position();
    dst.position(dst.limit());
    dst.limit(dst.capacity());
    fillBuffer(dst, count);
    dst.position(mark);
  }

  private void fillBuffer(ByteBuffer dst, int count) {
    int remaining = count;
    try {
      while (remaining > 0 && channel.isOpen()) {
        int bytesRead = channel.read(dst);
        if (bytesRead == -1) {
          throw new IOException("End of stream");
        }
        if (bytesRead > 0) {
          remaining -= bytesRead;
        }
      }
      dst.flip();
    } catch (IOException ex) {
      LangUtil.rethrowUnchecked(ex);
    }
  }

  private void ensureBytes(int byteCount) {
    if (buffer.remaining() >= byteCount) {
      return;
    }
    readBytesToBuffer(byteCount - buffer.remaining());
  }

  @Override
  public void close() throws IOException {
    channel.close();
  }

  @Override
  public boolean isEof() {
    return !channel.isOpen();
  }

  @Override
  public void seek(int i) {
    ensureBytes(i);
    buffer.position(buffer.position() + i);
    pos += i;
  }

  @Override
  public void seek(long l) {
    if (l > Integer.MAX_VALUE) {
      throw new UnsupportedOperationException("Not supported yet.");
    }
    seek((int) l);
  }

  @Override
  public int pos() {
    return (int) pos;
  }

  @Override
  public long size() {
    return Integer.MAX_VALUE;
  }

  @Override
  public byte readS1() {
    ensureBytes(1);
    return super.readS1();
  }

  @Override
  public short readS2be() {
    ensureBytes(2);
    return super.readS2be();
  }

  @Override
  public int readS4be() {
    ensureBytes(4);
    return super.readS4be();
  }

  @Override
  public long readS8be() {
    ensureBytes(8);
    return super.readS8be();
  }

  @Override
  public short readS2le() {
    ensureBytes(2);
    return super.readS2le();
  }

  @Override
  public int readS4le() {
    ensureBytes(4);
    return super.readS4le();
  }

  @Override
  public long readS8le() {
    ensureBytes(8);
    return super.readS8le();
  }

  @Override
  public int readU1() {
    ensureBytes(1);
    return super.readU1();
  }

  @Override
  public int readU2be() {
    ensureBytes(2);
    return super.readU2be();
  }

  @Override
  public long readU4be() {
    ensureBytes(4);
    return super.readU4be();
  }

  @Override
  public int readU2le() {
    ensureBytes(2);
    return super.readU2le();
  }

  @Override
  public long readU4le() {
    ensureBytes(4);
    return super.readU4le();
  }

  @Override
  public float readF4be() {
    ensureBytes(4);
    return super.readF4be();
  }

  @Override
  public double readF8be() {
    ensureBytes(8);
    return super.readF8be();
  }

  @Override
  public float readF4le() {
    ensureBytes(4);
    return super.readF4le();
  }

  @Override
  public double readF8le() {
    ensureBytes(8);
    return super.readF8le();
  }

  @Override
  public byte[] readBytes(long n) {
    ensureBytes(toByteArrayLength(n));
    return super.readBytes(n);
  }

  @Override
  public byte[] readBytesFull() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public byte[] readBytesTerm(
      byte term, boolean includeTerm, boolean consumeTerm, boolean eosError) {
    throw new UnsupportedOperationException("Not supported yet.");
  }
}

with the above, we can have a streaming PCap parser, as follows:

public class StreamingPCap extends Pcap implements Iterable<Packet> {
  private Pcap.Header hdr;

  public StreamingPCap(KaitaiStream io) {
    super(io);
    hdr = new Pcap.Header(this._io, this, this);
  }

  @Override
  public Header hdr() {
    return hdr;
  }

  @Override
  protected void _read() {
    // do nothing
  }

  public Stream<Packet> packetStream() {
    return StreamSupport.stream(this.spliterator(), false);
  }

  @Override
  public Iterator<Packet> iterator() {
    return new PacketIterator(_io, this);
  }

  private static class PacketIterator implements Iterator<Packet> {
    private final KaitaiStream io;
    private final Pcap parent;

    private PacketIterator(KaitaiStream io, Pcap parent) {
      this.io = io;
      this.parent = parent;
    }

    @Override
    public boolean hasNext() {
      return !io.isEof();
    }

    @Override
    public Packet next() {
      return new Packet(this.io, parent, parent);
    }
  }
}