Closed ArvinDevel closed 7 years ago
Below is my unit test case, which often fail. `package org.apache.distributedlog;
import static org.junit.Assert.assertEquals;
import java.net.URI; import java.util.Optional; import java.util.concurrent.CountDownLatch; import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.exceptions.LogEmptyException; import org.junit.Test;
/**
2017/10/6. */ public class TestReOpen extends TestDistributedLogBase{ DistributedLogManager dlm = null; AsyncLogWriter logWriter = null; volatile DLSN last = null; Namespace dlNamespace = null; @Test public void testReadWritewithReOpen() throws Exception { String name = "testReadWritewithReOpen"; DistributedLogConfiguration dlconfig = new DistributedLogConfiguration(); DistributedLogConfiguration streamConfig = new DistributedLogConfiguration(); URI namespaceUri = null; CountDownLatch openLatch = new CountDownLatch(1); CountDownLatch writeLatch = new CountDownLatch(1); CountDownLatch closeLatch = new CountDownLatch(1); CountDownLatch doneLatch = new CountDownLatch(1);
streamConfig.setLogSegmentRollingIntervalMinutes(10);
streamConfig.setMaxLogSegmentBytes(2 * 1024 * 1024);
streamConfig.setRetentionPeriodHours(1);
try {
namespaceUri = createDLMURI("/default_namespace");
ensureURICreated(namespaceUri);
LOG.info("created DLM URI {} succeed ", namespaceUri.toString());
} catch (Exception ioe){
LOG.info("create DLM URI error {}", ioe.toString());
}
//initialize dl namespace
//set dlog transmit outputBuffer size to 0, entry will have only one record.
dlconfig.setOutputBufferSize(0);
try {
dlNamespace = NamespaceBuilder.newBuilder()
.conf(dlconfig)
.uri(namespaceUri)
.build();
} catch (Exception e){
LOG.error("[{}] Got exception while trying to initialize dlog namespace, uri is {}", namespaceUri, e);
}
if (dlNamespace.logExists(name)) {
dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty());
} else {
dlNamespace.createLog(name);
dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty());
}
dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter asyncLogWriter) {
LOG.info("[{}] Created log writer {}", name, asyncLogWriter.toString());
logWriter = asyncLogWriter;
try {
LOG.info("before getLastDLSN");
last = dlm.getLastDLSN();
LOG.info("after getLastDLSN");
} catch (LogEmptyException lee){
LOG.info("the log stream is empty ");
} catch (Exception e){
LOG.error("Faced Exception in getLastDLSN", e);
}
LOG.info("getLastDLSN return {}", last);
openLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
LOG.error("Failed open AsyncLogWriter for {}", name, throwable);
openLatch.countDown();
}
});
openLatch.await();
logWriter.write(new LogRecord(System.currentTimeMillis(),
"thisIsTheRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){
@Override
public void onSuccess(DLSN dlsn) {
LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
writeLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
LOG.info("[{}] write-fail: throwable={}", this, throwable);
}
});
writeLatch.await();
dlm.asyncClose().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
closeLatch.countDown();
}
@Override
public void onFailure(Throwable cause) {
}
});
closeLatch.await();
// ReOpen the DLM and get lastDLSN
if (dlNamespace.logExists(name)) {
dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty());
} else {
dlNamespace.createLog(name);
dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty());
}
dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter asyncLogWriter) {
LOG.info("[{}] Created log writer again {}", name, asyncLogWriter.toString());
try {
LOG.info("before getLastDLSN again");
last = dlm.getLastDLSN();
LOG.info("after getLastDLSN again");
} catch (LogEmptyException lee){
LOG.info("the log stream is empty ");
} catch (Exception e){
LOG.error("Faced Exception in getLastDLSN", e);
}
LOG.info("getLastDLSN return {}", last);
doneLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
LOG.error("Failed open AsyncLogWriter for {}", name, throwable);
doneLatch.countDown();
}
});
doneLatch.await();
assertEquals(last, new DLSN(1, 0, 0));
dlm.close();
dlNamespace.close();
}
} `
After I moved the getLastDLSN() from the logWriter callback to last, I found this test is passed. It's likely there exists dead lock when executing getLastDLSN() in openWriter callback.
@ArvinDevel
dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter asyncLogWriter) {
LOG.info("[{}] Created log writer {}", name, asyncLogWriter.toString());
logWriter = asyncLogWriter;
try {
LOG.info("before getLastDLSN");
last = dlm.getLastDLSN();
LOG.info("after getLastDLSN");
} catch (LogEmptyException lee){
LOG.info("the log stream is empty ");
} catch (Exception e){
LOG.error("Faced Exception in getLastDLSN", e);
}
LOG.info("getLastDLSN return {}", last);
openLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
LOG.error("Failed open AsyncLogWriter for {}", name, throwable);
openLatch.countDown();
}
});
You can call synchronous function in an asynchronous callback. since the synchronous function is blocking the callback thread. so getLastDLSN will never complete.
BUG REPORT