dominion-dev / dominion-ecs-java

Insanely fast ECS (Entity Component System) for Java
https://dominion-dev.github.io
MIT License
288 stars 17 forks source link

Thread-safe for IntStack #192

Closed endison1986 closed 3 months ago

endison1986 commented 4 months ago

Is IntStack designed to be thread-safe? @enricostara

endison1986 commented 4 months ago

I modify unit test IntStackTest.concurrentPop()

@Test
void concurrentPop() throws InterruptedException {
    final int capacity = 1 << 22;
    final var set = new HashSet<Integer>();
    try (IntStack stack = new IntStack(Integer.MIN_VALUE, capacity >>> 1)) {
        final ExecutorService pool = Executors.newFixedThreadPool(4);
        for (int i = 0; i < capacity; i++) {
            if (i % 10 == 0) {
                pool.execute(() -> {
                    int value;
                    //noinspection StatementWithEmptyBody
                    while ((value = stack.pop()) == Integer.MIN_VALUE) ;
                    set.add(value);
                });
            }
            pool.execute(() -> stack.push(1));
        }
        pool.shutdown();
        Assertions.assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS));
        Assertions.assertEquals((int) (capacity * .9), stack.size());
        Assertions.assertEquals(1, set.size());
        Assertions.assertTrue(set.contains(1));
    }
}
endison1986 commented 4 months ago

This is IntStack code:

public final class IntStack implements AutoCloseable {
    private static final int INT_BYTES = 4;
    private static final Unsafe unsafe = UnsafeFactory.INSTANCE;
    private final AtomicInteger index = new AtomicInteger(-INT_BYTES);
    private final StampedLock lock = new StampedLock();
    private final int nullInt;
    private long address;
    private int capacity;

    public IntStack(int nullInt, int initialCapacity) {
        this.nullInt = nullInt;
        this.capacity = initialCapacity;
        address = unsafe.allocateMemory(initialCapacity);
    }

    public int pop() {
        int i = index.get();
        if (i < 0) {
            return nullInt;
        }
        // The following two lines of code are problematic
        // 1. returnValue may be an unknown value because `index.addAndGet` has been executed when push is called, but `unsafe.putInt(address+offset, id)` has not been executed yet.
        // 2. is `Integer.MIN_VALUE` coding error, should `nullint` be used instead?
        int returnValue = unsafe.getInt(address + i);
        returnValue = index.compareAndSet(i, i - INT_BYTES) ? returnValue : Integer.MIN_VALUE;
        return returnValue;
    }
    // The method maybe has one problem
    // if push is called by multi-threads, then unsafe.putInt(address + offset, id) may called after `copyMemory` and before `address = newAddress`
    // t1: long offset = index.addAndGet(INT_BYTES); // offset = 12, capacity: 16
    // t2: long offset = index.addAndGet(INT_BYTES); // offset = 16, capacity: 16
    // t2: unsafe.copyMemory(address, newAddress, currentCapacity);
    // t1: unsafe.putInt(address + offset, id); // data maybe loss
    // t2: address = newAddress;
    public boolean push(int id) {
        long offset = index.addAndGet(INT_BYTES);
        if (offset >= capacity) {
            long l = lock.writeLock();
            try {
                int currentCapacity;
                if (offset >= (currentCapacity = capacity)) {
                    int newCapacity = currentCapacity + (currentCapacity >>> 1);
                    long newAddress = unsafe.allocateMemory(newCapacity);
                    unsafe.copyMemory(address, newAddress, currentCapacity);
                    capacity = newCapacity;
                    address = newAddress;
                }
            } finally {
                lock.unlock(l);
            }
        }
        unsafe.putInt(address + offset, id);
        return true;
    }

    public int size() {
        return (index.get() >> 2) + 1;
    }

    @Override
    public void close() {
        unsafe.freeMemory(address);
    }

    @Override
    public String toString() {
        return "IntStack={"
                + "capacity=" + capacity + "|off-heap"
                + '}';
    }
}
endison1986 commented 4 months ago

This is my new code,it looks like work well with new unit test

public final class IntStack implements AutoCloseable {
    public static final int NULL_VALUE = 0b10000000100000001000000010000000;
    private static final int INT_BYTES = 4;
    private static final Unsafe unsafe = UnsafeFactory.INSTANCE;
    private static final AtomicIntegerFieldUpdater<IntStack> INDEX_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(IntStack.class, "index");
    private volatile int index = -INT_BYTES;
    private static final AtomicIntegerFieldUpdater<IntStack> CTRL =
            AtomicIntegerFieldUpdater.newUpdater(IntStack.class, "ctrl");
    private final StampedLock lock = new StampedLock();
    private volatile int ctrl;
    private volatile long address;
    private volatile int capacity;

    public IntStack(int nullInt, int initialCapacity) {
        this.capacity = initialCapacity;
        address = unsafe.allocateMemory(initialCapacity);
        unsafe.setMemory(address, initialCapacity, (byte) (1 << 7));
    }

    public int pop() {
        final var offset = index;
        if (offset < 0) {
            return NULL_VALUE;
        }
        // try to read value, if value equals NULL_VALUE, it means that push is calling, offset has added but value added not yet
        int returnValue = unsafe.getInt(address + offset);
        if (returnValue != NULL_VALUE && unsafe.compareAndSwapInt(null, address + offset, returnValue, NULL_VALUE)) {
            INDEX_UPDATER.addAndGet(this, -INT_BYTES);
            return returnValue;
        }
        return NULL_VALUE;
    }

    public boolean push(int id) {
        final var offset = INDEX_UPDATER.addAndGet(this, INT_BYTES);
        if (offset >= capacity) {
            long l = lock.writeLock();
            try {
                int currentCapacity;
                if (offset >= (currentCapacity = capacity)) {
                    for (; ; ) {
                        if (CTRL.compareAndSet(this, 0, -1)) {
                            int newCapacity = currentCapacity + (currentCapacity >>> 1);
                            long newAddress = unsafe.allocateMemory(newCapacity);
                            long oldAddress = address;
                            unsafe.setMemory(newAddress, newCapacity, (byte) (1 << 7));
                            unsafe.copyMemory(oldAddress, newAddress, currentCapacity);
                            capacity = newCapacity;
                            address = newAddress;
                            unsafe.freeMemory(oldAddress);
                            ctrl = 0;
                            break;
                        } else {
                            Thread.yield();
                        }
                    }
                }
            } finally {
                lock.unlock(l);
            }
        }
        for (; ; ) {
            final var c = ctrl;
            if (c == -1) {
                Thread.yield();
            } else if (CTRL.compareAndSet(this, c, c + 1)) {
                unsafe.putInt(address + offset, id);
                CTRL.decrementAndGet(this);
                return true;
            }
        }
    }

    public int size() {
        return (index >> 2) + 1;
    }

    @Override
    public void close() {
        unsafe.freeMemory(address);
    }

    @Override
    public String toString() {
        return "IntStack={"
                + "capacity=" + capacity + "|off-heap"
                + '}';
    }
}
enricostara commented 4 months ago

Hi @endison1986, thanks so much for your findings, analysis, and fixes!

If it passes the concurrent tests you put in place, please proceed with the usual PR in the 'develop' branch. Then, it might even be the moment to promote a new release version to collect all the latest bug fixes.