atomashpolskiy / bt

BitTorrent library and client with DHT, magnet links, encryption and more
https://atomashpolskiy.github.io/bt/
Apache License 2.0
2.42k stars 381 forks source link

[BUG] The BtRuntime.shutdown() does not kill all the threads created by the ExecutorService. #249

Open visharm4 opened 1 month ago

visharm4 commented 1 month ago

Describe the bug I am trying to implement a peers discovery application where my goal is to pass a batch of say 1000 torrents to the system,

  1. Create a BtRuntime
  2. Create 1000 clients for 1000 torrents and attach to runtime.
  3. After a fixed interval of say 10 mins. I call the client.stop() on all the 1000 clients

Expected behaviour After all the clients are stopped (knownClients to the runtime becomes 0) in the end because of the automatic shutdown configuration in the BtRuntime the shutdown() method is called. I assume it will stop all the activities like the http tracker announce and the dht server and free up all the resources like threads created by the ExecutorService (provided using ExecutorServiceProvider class)

And if i recreate the BtRuntime and pass another 1000 torrents to it then it should work like the 1st batch did.

what I notice is that as I go batch by batch the total number of LIVE threads keep on increase on each batch and eventually there is server goes OutOfMemory.

I need to understand that whats the catch here.

Screen Shot 2024-10-07 at 4 40 37 PM

To Reproduce Here is my code snippet

`private void startTrackingByTorrentFiles(String torrentsDirectoryPath) { log.info("Starting Tracker App with Configurations: {}", appPropertiesConfig.getTrackerConfig());

    while (true) {  // Infinite loop to repeat the pipeline
        try {

            // Fetch the next set of torrents to process
            List<String> torrents = torrentAllocationRepository.lockAndFetchTorrentsJob(
                        Arrays.asList("tv","movie"),
                        appPropertiesConfig.getTrackerConfig().getTorrentsJobSize()
            );
            log.info("Fetched {} torrents from the Torrents Allocation Table", torrents.size());

            int torrentsBatchSize = appPropertiesConfig.getTrackerConfig().getTorrentsBatchSize();
            List<List<String>> partitions = partitionList(torrents, torrentsBatchSize);
            log.info("Created {} partitions of {} torrents each.", partitions.size(), torrentsBatchSize);

            // Process each partition
            AtomicInteger partitionNumber = new AtomicInteger();
            for (List<String> torrentsInPartition : partitions) {

                partitionNumber.incrementAndGet();
                log.info("Processing partition no. {} with {} torrents", partitionNumber.get(), torrentsInPartition.size());

                // Reinitialize BtRuntime at the start of each batch
                BtRuntime runtime = createRuntime(appPropertiesConfig.getBtRuntimeConfig());

                List<Path> torrentFiles = getTorrentFiles(torrentsDirectoryPath, torrentsInPartition);

                int torrentClientsShutdownTimeout = appPropertiesConfig.getTrackerConfig().getTorrentClientShutdownTimeout(); // Stop Torrent Client after configured minutes
                CountDownLatch latch = new CountDownLatch(torrentFiles.size());

                // RX pipeline for torrent processing
                Flowable.fromIterable(torrentFiles)
                        .buffer(torrentsBatchSize)  // Process in batches of specified size
                        .flatMap(batch -> {
                            log.info("Processing sub-batch of {} torrents, in partition : {}", batch.size(), partitionNumber.get());
                            return processTorrentBatch(batch, runtime, torrentClientsShutdownTimeout, latch)
                                    .doOnNext(clients -> log.info("Created {} clients for sub-batch, in partition : {}", clients.size(), partitionNumber.get()))
                                    .doOnNext(clients -> log.info("Clients remaining to stop : {}, in partition : {}", latch.getCount(), partitionNumber.get()))
                                    .doOnError(error -> log.error("Error occurred while creating clients for batch, in partition : {}", partitionNumber.get(), error));

                        })
                        .subscribe(clients -> {

                            log.info("Starting clients : {} for batch, in partition: {}", clients.size(), partitionNumber.get());

                            // Start all clients asynchronously in the current sub-batch
                            CompletableFuture.allOf(clients.stream()
                                    .map(BtClient::startAsync).toArray(CompletableFuture[]::new)).join();
                        }, error -> {
                            log.error("Failed to start peers discovery, Cause: ", error);
                        });

                // After all sub-batches are processed, wait for the latch to count down to zero
                log.info("Waiting for all clients to stop, in partition : {}", partitionNumber.get());
                latch.await();  // This will block until all clients in this full batch have stopped
                log.info("All clients have stopped, in partition : {}", partitionNumber.get());

                log.info("Unlocking {} torrents after processing, in partition : {}", torrentsInPartition.size(), partitionNumber.get());
                torrentAllocationRepository.unlockTorrentsJob(torrentsInPartition); // Unlock the torrents after processing

            } // End of partitions loop

            int waitTimeBetween = 5000;
            log.info("All {} torrents in current iteration finished. Waiting for {} {} before the next iteration...", torrents.size(), waitTimeBetween, TimeUnit.MILLISECONDS);
            Thread.sleep(waitTimeBetween);  // Wait 5 seconds before the next loop iteration

        } catch (Exception e) {
            log.error("Error processing torrent files", e);
        }
    }
}`

` private Flowable<List<BtClient>> processTorrentBatch(List<Path> batch, BtRuntime runtime, int torrentClientsShutdownTimeout, CountDownLatch latch) {
    return Flowable.fromIterable(batch)
            .flatMap(torrentFilePath -> {

                // Extract TorrentId from the file name
                TorrentId torrentId = extractTorrentIdFromFileName(torrentFilePath);

                // Attach Peer Listener for the torrent
                attachPeerListener(runtime, torrentId);

                // Create the client asynchronously
                log.info("Creating client for torrent: {}", torrentId);
                CompletableFuture<BtClient> asyncClient = createClientFromTorrentFileAsync(runtime, torrentFilePath);

                // Once client is created, schedule stop task and store the future in the map
                asyncClient.thenAccept(client -> {
                    log.info("Scheduling client to stop after {} {} for torrent: {}", torrentClientsShutdownTimeout, TimeUnit.MINUTES, torrentId);
                    scheduleClientStopTask(torrentId, client, torrentClientsShutdownTimeout, latch);
                });

                return Flowable.fromFuture(asyncClient);  // Creates BT Client for each Torrent File asynchronously
            })
            .toList()
            .toFlowable();
}`

`private void scheduleClientStopTask(TorrentId torrentId, BtClient client, long delay, CountDownLatch latch) {

    clientLifeCycleScheduler.schedule(() -> {
        try {

            if (client.isStarted()) {
                log.info("Stopping client after {} {} for torrent : {} , client : {}, current latch : {}",
                        delay, TimeUnit.MINUTES, torrentId.toString(), client, latch.getCount());
                client.stop();
            } else {
                log.info("Client already stopped for torrent: {} , client: {}, current latch : {}",
                        torrentId.toString(), client, latch.getCount());
            }

        } catch (Exception e) {
            log.error("Error stopping client for torrent: {} , client: {}, current latch : {}", torrentId.toString(), client, latch.getCount(), e);
            log.warn("Performing latch countdown for torrent: {} , client: {}", torrentId.toString(), client);
        } finally {
            // Always count down the latch in the finally block to ensure it is decremented no matter what
            latch.countDown();
        }

        log.info("latch after client stop : {} ", latch.getCount());

    }, delay, TimeUnit.MINUTES);
}`

Additional context I have modified the get() method in ExecutorServiceProvider to use a FixedThreadPool with limits to core pool and max threads hoping it will reuse the threads but somehow I there are some other threads that keep piling up.

`@Override public ExecutorService get() {

    if (executorService == null) {
        synchronized (lock) {
            if (executorService == null) {

                BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
                // Use a fixed thread pool with a maximum number of threads
                executorService = new ThreadPoolExecutor(
                        config.getServiceExecutorMaxThread(),
                        config.getServiceExecutorMaxThread(),
                        60L,
                        TimeUnit.SECONDS,
                        workQueue,
                        new ThreadFactory() {
                            private AtomicInteger threadId = new AtomicInteger(1);

                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, Objects.requireNonNull(getNamePrefix()) + "-" + threadId.getAndIncrement());
                            }
                        }
                );

                // Optional: Allow core threads to time out if idle
                ((ThreadPoolExecutor) executorService).allowCoreThreadTimeOut(true);
            }
        }
    }
    return executorService;
}`