atomashpolskiy / bt

BitTorrent library and client with DHT, magnet links, encryption and more
https://atomashpolskiy.github.io/bt/
Apache License 2.0
2.41k stars 382 forks source link

Accessing the underlying StorageUnit for a Storage to get the downloaded bytes. #116

Closed devsh0 closed 5 years ago

devsh0 commented 5 years ago

Initially my question was how to get an instance of the FileSystemStorageUnit class but just noticed that this is package private to bt.data.file. Anyways, FileSystemStorageUnit is a StorageUnit which is available to use anywhere.

I want to access the underlying StorageUnit of a Storage. The FileSystemStorage#getUnit method requires two params, a Torrent and a TorrentFile. As I'm only starting out with this library (which I'm absolutely loving, btw), I have no idea where to get (or create) the params required for getUnit.

Also, I feel that I should mention why I'm trying to do this. The goal is to stream media files when download is still in progress. I'm assuming that I can get the downloaded bytes from StorageUnit::readBlock.

atomashpolskiy commented 5 years ago

Take a look at the following interfaces:

runtime.getEventSource().onTorrentStarted(id -> {
    TorrentRegistry registry = runtime.service(TorrentRegistry.class);

    Torrent torrent = registry.getTorrent(id).get();
    TorrentDescriptor descriptor = registry.getDescriptor(id).get();
    DataDescriptor dataDescriptor = descriptor.getDataDescriptor();
    Bitfield bitfield = dataDescriptor.getBitfield();
});

You may monitor Bitfield.getBitmask() for changes. When a new piece is downloaded and verified, some bit in the bitmask is set to 1. For streaming purposes you'll want to process pieces sequentially, so monitoring should begin with bit at position 0 and increment position by 1 at a time. If the bit at current position is 0, it should block; otherwise, it may read data from the descriptor:

ChunkDescriptor chunkDescriptor = dataDescriptor.getChunkDescriptors().get(pieceIndex);
DataRange data = chunkDescriptor.getData();
data.visitUnits((unit, offset, limit) -> {
    byte[] bytes = unit.readBlock(offset, (int)(limit - offset));
    // process bytes..
    return true; // visit next storage unit, if the piece is on the boundary of two files
});

Don't forget to enable sequential selector, when building the client. It does not guarantee that the pieces will be downloaded strictly sequentially, but will make its' best effort to do so.

atomashpolskiy commented 5 years ago

Oops, sorry. Bitfield.getBitmask() returns a copy of the bitfield, so the changes will not be reflected in it. Need to use Bitfield.isVerified(int pieceIndex).

devsh0 commented 5 years ago

Thanks, I'll give it a try. Closing this issue for now, will re-open if something goes wrong.

atomashpolskiy commented 5 years ago

I've thought that this might actually be a nice feature to have in the core, so I implemented it per 49d91bba. Looking forward for your feedback!

devsh0 commented 5 years ago

Sorry for the late response.

Based on your advice to observe the bitmask and writing chunks of bytes once the pieces are verified, here is what I've tried.

I imported NanoHTTPD and started a Server at http://localhost/ listening on port 80. Immediately after that, I started a torrent task that's wrapped in a TorrentHandler. The TorrentHandler has a ByteArrayOutputStream whos bytes are pumped to the ByteArrayInputStream of Server. Before sending the first chunked response, the Server waits until the first piece is verified. I'm observing the Bitfield by starting a ScheduledExecutorService which is fixed at 1 second. The overall solution is not that good I'd say (it might have some concurrency related issues), but it did the trick. Here is the complete code:

TorrentHandler.java

public class TorrentHandler {
    private int bitfieldCursor;
    private ScheduledExecutorService service;
    private ByteArrayOutputStream out;

    CompletableFuture <Void> download (String magnet) {
        Config config = new Config() {
            @Override
            public int getNumOfHashingThreads() {
                return Runtime.getRuntime().availableProcessors() * 2;
            }
        };

        Module dhtModule = new DHTModule(new DHTConfig() {
            @Override
            public boolean shouldUseRouterBootstrap() {
                return true;
            }
        });

        BtRuntime sharedRuntime = BtRuntime.builder()
                .config(config)
                .module(dhtModule)
                .autoLoadModules()
                .build();

        Storage storage = new FileSystemStorage(Paths.get("D://blah"));

        BtClient client = Bt.client(sharedRuntime)
                .sequentialSelector()
                .storage(storage)
                .magnet(magnet)
                .stopWhenDownloaded()
                .build();

        sharedRuntime.getEventSource().onTorrentStarted(evt -> {
            System.out.println("Torrent processing has started...");
            final TorrentId id = evt.getTorrentId();
            final TorrentRegistry registry = sharedRuntime.service(TorrentRegistry.class);
            final TorrentDescriptor torrentDescriptor = registry.getDescriptor(id).get();
            final DataDescriptor dataDescriptor = torrentDescriptor.getDataDescriptor();
            final Bitfield bitfield = dataDescriptor.getBitfield();

            synchronized (this) {
                bitfieldCursor = 0;
                out = new ByteArrayOutputStream();
                service = Executors.newScheduledThreadPool(1);

                service.scheduleAtFixedRate(() -> {
                    System.out.println("In here...");

                    if (bitfield.isVerified(bitfieldCursor)) {
                        System.out.println("Verified...");
                        ChunkDescriptor chunkDescriptor = dataDescriptor.getChunkDescriptors().get(bitfieldCursor);
                        DataRange data = chunkDescriptor.getData();
                        data.visitUnits((unit, offset, limit) -> {
                            byte[] bytes = unit.readBlock(offset, (int) (limit - offset));

                            try { out.write(bytes); } catch (IOException ioe) {
                                ioe.printStackTrace();
                            }

                            bitfieldCursor++;
                            return true;
                        });

                        Application.latch.countDown();
                    }

                }, 0, 1, TimeUnit.SECONDS);
            }

        });

        sharedRuntime.getEventSource().onTorrentStopped(evt -> {
            System.out.println("Stopped...!");
            service.shutdown();
            try { out.close(); } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        });

        return CompletableFuture.allOf(client.startAsync(state -> System.out.println("Done: " + state.getDownloaded()), 1000));
    }

    byte[] getBytes() {
        return out.toByteArray();
    }
}

Server.java

class Server extends NanoHTTPD {
    private final TorrentHandler handler;
    private InputStream is;

    Server(int port) throws IOException {
        super(port);
        start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
        System.out.println("Server started...");
        handler = new TorrentHandler();
    }

    public void initDownload() {
        String magnet = "magnet:?xt=urn:btih:a086ce4afabbd8ab949662e30134a49e22d1f0c7";
        CompletableFuture<Void> cpFuture = handler.download(magnet);
        cpFuture.join();
        cleanup();
    }

    @Override
    public Response serve(IHTTPSession session) {
        Map<String, String> headers = session.getHeaders();
        for (String header : headers.keySet()) {
            System.out.println(header + ": " + headers.get(header));
        }

        try {
            Application.latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        synchronized (handler) {
            is = new ByteArrayInputStream(handler.getBytes());
            return newChunkedResponse(Response.Status.OK, "application/octet-stream", is);
        }
    }

    private void cleanup() {
        try {
            is.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }
}

After starting the app, I started vlc with vlc http://localhost:80/.

This solution has issues but I wanted to test the streaming asap to get to the conclusion whether it's possible or not. I will try to improve this as much as I can (never worked directly at a level this low in Java, honestly). Your advice would be highly appreciated.

atomashpolskiy commented 5 years ago

Yes, this is exactly, what I was thinking about. Good job!

You may also try DataDescriptor.getReader().createChannel() method, which I added yesterday. The nice thing about it is that it does not spawn a separate thread to monitor the bitfield, but instead makes use of the event system (which also delivers verified pieces ASAP without any delay). For user it looks just like a ReadableByteChannel, see the javadoc here: https://github.com/atomashpolskiy/bt/blob/master/bt-core/src/main/java/bt/data/DataReader.java

On Wed, 8 May 2019, 07:59 Devashish Jaiswal, notifications@github.com wrote:

Sorry for the late response.

Based on your advice to observe the bitmask and writing chunks of bytes once the pieces are verified, here is what I've tried.

I imported NanoHTTPD and started a Server at http://localhost/ listening on port 80. Immediately after that, I started a torrent task that's wrapped in a TorrentHandler. The TorrentHandler has a ByteArrayOutputStream whos bytes are pumped to the ByteArrayInputStream of Server. Before sending the first chunked response, the Server waits until the first piece is verified. I'm observing the Bitfield by starting a ScheduledExecutorService which is fixed at 1 second. The overall solution is not that good I'd say (it might have some concurrency related issues), but it did the trick. Here is the complete code:

TorrentHandler.java

public class TorrentHandler { private int bitfieldCursor; private ScheduledExecutorService service; private ByteArrayOutputStream out;

CompletableFuture <Void> download (String magnet) {
    Config config = new Config() {
        @Override
        public int getNumOfHashingThreads() {
            return Runtime.getRuntime().availableProcessors() * 2;
        }
    };

    Module dhtModule = new DHTModule(new DHTConfig() {
        @Override
        public boolean shouldUseRouterBootstrap() {
            return true;
        }
    });

    BtRuntime sharedRuntime = BtRuntime.builder()
            .config(config)
            .module(dhtModule)
            .autoLoadModules()
            .build();

    Storage storage = new FileSystemStorage(Paths.get("D://blah"));

    BtClient client = Bt.client(sharedRuntime)
            .sequentialSelector()
            .storage(storage)
            .magnet(magnet)
            .stopWhenDownloaded()
            .build();

    sharedRuntime.getEventSource().onTorrentStarted(evt -> {
        System.out.println("Torrent processing has started...");
        final TorrentId id = evt.getTorrentId();
        final TorrentRegistry registry = sharedRuntime.service(TorrentRegistry.class);
        final TorrentDescriptor torrentDescriptor = registry.getDescriptor(id).get();
        final DataDescriptor dataDescriptor = torrentDescriptor.getDataDescriptor();
        final Bitfield bitfield = dataDescriptor.getBitfield();

        synchronized (this) {
            bitfieldCursor = 0;
            out = new ByteArrayOutputStream();
            service = Executors.newScheduledThreadPool(1);

            service.scheduleAtFixedRate(() -> {
                System.out.println("In here...");

                if (bitfield.isVerified(bitfieldCursor)) {
                    System.out.println("Verified...");
                    ChunkDescriptor chunkDescriptor = dataDescriptor.getChunkDescriptors().get(bitfieldCursor);
                    DataRange data = chunkDescriptor.getData();
                    data.visitUnits((unit, offset, limit) -> {
                        byte[] bytes = unit.readBlock(offset, (int) (limit - offset));

                        try { out.write(bytes); } catch (IOException ioe) {
                            ioe.printStackTrace();
                        }

                        bitfieldCursor++;
                        return true;
                    });

                    Application.latch.countDown();
                }

            }, 0, 1, TimeUnit.SECONDS);
        }

    });

    sharedRuntime.getEventSource().onTorrentStopped(evt -> {
        System.out.println("Stopped...!");
        service.shutdown();
        try { out.close(); } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    });

    return CompletableFuture.allOf(client.startAsync(state -> System.out.println("Done: " + state.getDownloaded()), 1000));
}

byte[] getBytes() {
    return out.toByteArray();
}

}

Server.java

class Server extends NanoHTTPD { private final TorrentHandler handler; private InputStream is;

Server(int port) throws IOException {
    super(port);
    start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    System.out.println("Server started...");
    handler = new TorrentHandler();
}

public void initDownload() {
    String magnet = "magnet:?xt=urn:btih:a086ce4afabbd8ab949662e30134a49e22d1f0c7";
    CompletableFuture<Void> cpFuture = handler.download(magnet);
    cpFuture.join();
    cleanup();
}

@Override
public Response serve(IHTTPSession session) {
    Map<String, String> headers = session.getHeaders();
    for (String header : headers.keySet()) {
        System.out.println(header + ": " + headers.get(header));
    }

    try {
        Application.latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    synchronized (handler) {
        is = new ByteArrayInputStream(handler.getBytes());
        return newChunkedResponse(Response.Status.OK, "application/octet-stream", is);
    }
}

private void cleanup() {
    try {
        is.close();
    } catch (IOException ioe) {
        ioe.printStackTrace();
    }
}

}

I will try to improve this as much as I can but your advice would be highly appreciated.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/atomashpolskiy/bt/issues/116#issuecomment-490347742, or mute the thread https://github.com/notifications/unsubscribe-auth/AA4TJBUS74AWIYALJWUELVDPUJM3HANCNFSM4HLCN47Q .

atomashpolskiy commented 5 years ago

One more thing: if the files are of reasonable size, you may keep them in memory by using JimFS to create an in-memory storage. I use it to speed up integration tests.

devsh0 commented 5 years ago

A question. Is it possible to stream the torrent in context of a standalone bt client...or in general, is it possible to attach event listeners to clients that are not explicitly attached to a runtime?

atomashpolskiy commented 5 years ago

Judging by the API, no, it's not possible. Though, there's no difference between a "standalone" client and a runtime with a single client, it's just an API shortcut.

On Wed, 8 May 2019, 14:10 Devashish Jaiswal, notifications@github.com wrote:

A question. Is it possible to stream the torrent in context of a standalone bt client...or in general, is it possible to attach event listeners to clients that are not explicitly attached to a runtime?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/atomashpolskiy/bt/issues/116#issuecomment-490445432, or mute the thread https://github.com/notifications/unsubscribe-auth/AA4TJBQB56OWNDWPKLR3GVDPUKYIVANCNFSM4HLCN47Q .

devsh0 commented 5 years ago

Thanks! Closing the thread.