OpenHFT / Java-Thread-Affinity

Bind a java thread to a given core
http://chronicle.software/products/thread-affinity/
Apache License 2.0
1.78k stars 361 forks source link

Threads to isolated CPUs strategy #73

Closed thhart closed 3 years ago

thhart commented 3 years ago

I try to use the library for Netty as seen in following code. As I see from the log the library assign to isolated CPUs first but if there are more threads these are assigned to unisolated CPUs as well. How can I force the Thread factory to use only isolated CPUs for these threads?

final int workerThreads = 4;
ThreadFactory threadFactory = new AffinityThreadFactory("atf_wrk", AffinityStrategies.DIFFERENT_CORE);
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads, threadFactory);
thhart commented 3 years ago

I can see there is a section in the README: Question: How to lock a specific cpuId, however even if using the recommend method acquireLock(int) it is denied when there is already a thread registered.

thhart commented 3 years ago

What I understand from the library it is hard to achieve, however I have created a quick approach for a thread factory which is solving my needs:

import java.util.concurrent.*;
import net.openhft.affinity.*;
import org.apache.logging.log4j.*;
import org.jetbrains.annotations.*;

/**
 * This is a ThreadFactory which assigns all threads to the first isolated CPU found.
 * <p>
 *
 * @author thomas.hartwig
 */
public class AffinityThreadFactorySameCore implements ThreadFactory {
    private final String name;
    private final boolean daemon;
    @NotNull
    private final AffinityStrategy[] strategies;
    @Nullable
    private AffinityLock lastAffinityLock = null;
    private int id = 1;
    protected final static Logger logger = LogManager.getLogger(AffinityThreadFactorySameCore.class);

    public AffinityThreadFactorySameCore(String name, AffinityStrategy... strategies) {
        this(name, true, strategies);
    }

    public AffinityThreadFactorySameCore(String name, boolean daemon, @NotNull AffinityStrategy... strategies) {
        this.name = name;
        this.daemon = daemon;
        this.strategies = strategies.length == 0 ? new AffinityStrategy[]{AffinityStrategies.ANY} : strategies;
    }

    private volatile ThreadRunner threadRunner = null;

    @NotNull
    @Override
    public synchronized Thread newThread(@NotNull final Runnable r) {
        String name2 = (name + '-' + id);
        id++;
        Thread t = new Thread(() -> {
            try {
                synchronized (AffinityThreadFactorySameCore.this) {
                    if(threadRunner == null) {
                        threadRunner = new ThreadRunner();
                    }
                }
                threadRunner.initialized.await();
                final Future<?> future = threadRunner.submit(r);
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                logger.error(e, e);
            }
        }, name2);
        t.setDaemon(daemon);
        return t;
    }

    private synchronized AffinityLock acquireLockBasedOnLast() {
        AffinityLock al = lastAffinityLock == null ? AffinityLock.acquireLock() : lastAffinityLock.acquireLock(strategies);
        if (al.cpuId() >= 0)
            lastAffinityLock = al;
        return al;
    }

    private class ThreadRunner extends Thread {
        private ExecutorService service = null;
        private final CountDownLatch initialized = new CountDownLatch(1);
        int counter = 1;
        int cpu = -1;
        public ThreadRunner() {
            Thread t = new Thread(() -> {
                try (AffinityLock ignored = acquireLockBasedOnLast()) {
                    cpu = ignored.cpuId();
                    service = Executors.newCachedThreadPool();
                    initialized.countDown();
                    try {
                        //noinspection ResultOfMethodCallIgnored
                        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                    } catch (InterruptedException e) {
                        logger.error(e, e);
                    }
                }
            });
            t.start();
        }

        public Future<?> submit(Runnable r) {
            try {
                return service.submit(r);
            } finally {
                logger.info("started threads: " + counter++ + ", on cpu: " + cpu);
            }
        }
    }
}
RobAustin commented 3 years ago

Thank you for your question, however we reserve git issues for just issues, please repost your question on stack overflow.

thhart commented 3 years ago

I can understand to be forced to StackOverflow in general.However this question is very code specific and addresses a lack either in functionality or documentation. Further I doubt this library is extremely special for the audience in StackOverflow.