Closed weickdev closed 3 years ago
Looks like transaction still active after delete and there are stale data causing CME. Assigned to @tglman because of that.
@laa I meet this error once again today. This time, the db is a new one, and has no delete operation before edge insert. By the way:
OrientGraphNoTx
graphNoTx.declareIntent(new OIntentMassiveInsert());
-DridBag.embeddedToSbtreeBonsaiThreshold=-1
I built the source code of branch develop and got the same exception too. New database, new class. I tried to batch-insert data into a edge class several times, but the same exception repeatedly occurs. And there's no guarantee where or when the exception was thrown. It just kept jumping out of nowhere. When the exception occurs, the server is hung and got no response so I have to kill and restart it.
@blueAnger could you provide source code for us?
@laa Below is my code:
private void saveByBatchQueryWithoutCache(List<Map<String, Object>> list) {
List<Map<String, Object>> copyList = new ArrayList<>();
copyList.addAll(list);
try {
semaphore.acquire();
} catch (InterruptedException e) {
logger.error("main thread is interrupted: {}", e);
}
executorService.execute(() -> {
//step 1: get OrientGraphNoTx
StringBuffer batchEdgeInsert = new StringBuffer();
OrientGraphNoTx graphNoTx = orientdbUtil.getOrientGraphNoTx();
graphNoTx.declareIntent(new OIntentMassiveInsert());
//step 2: get all the vertex rid by the business key
//<class, keys>
Map<String, Set<String>> keysToSearchMap = new HashMap<>();
copyList.forEach(map -> {
String from = map.get("_from").toString();
String fromClass = from.substring(0, from.indexOf("/"));
String to = map.get("_to").toString();
String toClass = to.substring(0, to.indexOf("/"));
Set<String> fromKeys = keysToSearchMap.computeIfAbsent(fromClass, key -> new HashSet<>());
Set<String> toKeys = keysToSearchMap.computeIfAbsent(toClass, key -> new HashSet<>());
fromKeys.add(from.substring(from.indexOf("/") + 1));
toKeys.add(to.substring(to.indexOf("/") + 1));
});
//<key, rid>
Map<String, String> ridMap = new HashMap<>();
keysToSearchMap.forEach((cls, keys) -> {
OrientDynaElementIterable iterable = graphNoTx.command(new OCommandSQL(String.format("SELECT FROM %s WHERE _key in ?", cls))).execute(keys);
iterable.forEach(obj -> {
Vertex vertex = (Vertex) obj;
ridMap.put(cls + "/" + vertex.getProperty("_key"), vertex.getId().toString());
});
});
//step 3: insert the edges
copyList.forEach(item -> {
String fromId = ridMap.get(item.get("_from").toString());
String toId = ridMap.get(item.get("_to").toString());
batchEdgeInsert.setLength(0);
batchEdgeInsert.append("CREATE EDGE ").append(tableName)
.append(" FROM ").append(fromId)
.append(" TO ").append(toId)
.append(" CONTENT ").append(JSON.toJSONString(item)).append(";\n ");
try {
graphNoTx.command(new OCommandSQL(batchEdgeInsert.toString())).execute();
} catch (ORecordDuplicatedException e) {
logger.warn("duplicate exception");
} catch (Exception e) {
logger.error("insert error: {}", e);
}
});
semaphore.release();
graphNoTx.declareIntent(null);
graphNoTx.shutdown();
});
}
I get graphNoTx by this:
OrientGraphFactory graphFactory = new OrientGraphFactory(dbUrl, username, password).setupPool(1, 40);
...
public OrientGraphNoTx getOrientGraphNoTx(){
return graphFactory.getNoTx();
}
Hi @weickdev,
Since OrientDB 3.0 the Tinker Pop 2 API have been deprecated, we suggest to use the multimodel API: http://orientdb.com/docs/3.0.x/java/Java-MultiModel-API.html or Tinker Pop 3.0 : http://orientdb.com/docs/3.0.x/tinkerpop3/OrientDB-TinkerPop3.html, some of the problem you may be experiencing may be due to the old API.
Regards
@tglman thanks for your replay, I will try the new API.
Hi @tglman , I am sorry, but same exception occurs when concurrently adding edges. My code is as follows. You can compare it with what weickdev posted.
private void saveByBatchQueryWithoutCache(List<Map<String, Object>> list) {
List<Map<String, Object>> copyList = new ArrayList<>();
copyList.addAll(list);
try {
semaphore.acquire();
} catch (InterruptedException e) {
logger.error("main thread is interrupted: {}", e);
}
executorService.execute(() -> {
//<class, keys>
Map<String, Set<String>> keysToSearchMap = new HashMap<>();
copyList.forEach(map -> {
String from = map.get("_from").toString();
String fromClass = from.substring(0, from.indexOf("/"));
String to = map.get("_to").toString();
String toClass = to.substring(0, to.indexOf("/"));
Set<String> fromKeys = keysToSearchMap.computeIfAbsent(fromClass, key -> new HashSet<>());
Set<String> toKeys = keysToSearchMap.computeIfAbsent(toClass, key -> new HashSet<>());
fromKeys.add(from.substring(from.indexOf("/") + 1));
toKeys.add(to.substring(to.indexOf("/") + 1));
});
ODatabaseSession session = orientdbUtil.getGraphSession();
session.begin();
// session.declareIntent(new OIntentMassiveInsert());
long start = System.currentTimeMillis();
//<key, rid>
Map<String, OVertex> ridMap = new HashMap<>();
keysToSearchMap.forEach((cls, keys) -> {
OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", cls, keys);
resultSet.vertexStream().forEach(vertex -> ridMap.put(cls + "/" + vertex.getProperty("_key"), vertex));
resultSet.close();
});
copyList.forEach(item -> {
OVertex fromVertex = ridMap.get(item.get("_from").toString());
OVertex toVertex = ridMap.get(item.get("_to").toString());
long insertStart = System.currentTimeMillis();
try {
OEdge edge = session.newEdge(fromVertex, toVertex, tableName);
item.forEach(edge::setProperty);
edge.save();
} catch (Exception e) {
logger.error("insert error: {}", e);
}
logger.debug("create current edge costs {} ms", System.currentTimeMillis() - insertStart);
});
logger.info("create {} item cause {} ms", copyList.size(), System.currentTimeMillis() - start);
semaphore.release();
session.commit();
session.close();
});
}
how I got the ODatabaseSession
private ODatabasePool pool;
@PostConstruct
private void init()
{
pool = new ODatabasePool(dbUrl, username, password);
}
public ODatabaseSession getGraphSession() {
return pool.acquire();
}
Also, I found this documentation, I already set it. The JAVA_OPTS in bin/server.sh is as follows.
if [ -z "$JAVA_OPTS_SCRIPT" ] ; then
JAVA_OPTS_SCRIPT="-Djna.nosys=true -XX:+HeapDumpOnOutOfMemoryError -Djava.awt.headless=true -Dfile.encoding=UTF8 -Drhino.opt.level=9 -DridBag.embeddedToSbtreeBonsaiThreshold=-1 -Dstorage.useWAL=false -Dstorage.diskCache.bufferSize=102400 -XX:+PrintGCDetails"
Also, as you can see, I use session transaction in my code to make the get and set together an atomic operation. I don't know what else to do. Hope my information helps.
Thanks.
Hi @blueAnger,
We will check this and let you know.
Regards
hi @blueAnger, @weickdev,
I wrote a simple test case similar to your case, with no luck to reproduce that, here is my test case:
public class TestInsert {
private ODatabasePool pool;
@Before
public void before() {
pool = new ODatabasePool("remote:localhost/test", "admin", "admin");
}
@After
public void after() {
pool.close();
}
@Test
public void saveByBatchQueryWithoutCache() {
String tableName = "E";
ODatabaseSession insertSession = pool.acquire();
insertSession.begin();
Set<String> keyToSearch = new HashSet<>();
for (int i = 0; i < 20; i++) {
OVertex v = insertSession.newVertex("one");
String key = "a" + i;
keyToSearch.add(key);
v.setProperty("_key", key);
insertSession.save(v);
}
insertSession.commit();
insertSession.close();
for (int z = 0; z < 5; z++) {
ODatabaseSession session = pool.acquire();
long start = System.currentTimeMillis();
//<key, rid>
Map<String, OVertex> ridMap = new HashMap<>();
keyToSearch.forEach((key) -> {
OResultSet resultSet = session.query("SELECT FROM one WHERE _key = ?", key);
resultSet.vertexStream().forEach(vertex -> ridMap.put(vertex.getProperty("_key"), vertex));
resultSet.close();
});
for (int i = 0; i < 19; i++) {
OVertex fromVertex = ridMap.get("a" + i);
OVertex toVertex = ridMap.get("a" + (i + 1));
try {
OEdge edge = session.newEdge(fromVertex, toVertex, tableName);
edge.setProperty("some", "value");
edge.save();
} catch (Exception e) {
e.printStackTrace();
}
}
session.commit();
session.close();
}
}
}
Anyway from your previous message you said that the server got stuck, in case you reproduced could you get an heap dump and send to us ?
Regards
Hi @tglman I saw your test code, looks like you're just adding some edges sequently in a single thread. I think I made it quite clear that the exception occurs when concurrently adding edges. Therefore, I change your test code so maybe you can reproduce the exception.
public class TestInsert
{
private ODatabasePool pool;
private final String TABLE_V1 = "table1";
private final String TABLE_V2 = "table2";
private final String TABLE_E1 = "edge1";
private final int VERTEX_NUM = 50;
private final int EDGE_NUM = 1000;
private final int THREAD_NUM = 10;
@Before
public void before() {
pool = new ODatabasePool("remote:localhost/test", "admin", "admin");
ODatabaseSession session = pool.acquire();
Arrays.asList(new ORawPair<>(TABLE_V1, "V"), new ORawPair<>(TABLE_V2, "V"), new ORawPair<>(TABLE_E1, "E")).forEach(pair -> {
try {
String table = pair.getFirst();
String superClass = pair.getSecond();
OClass oClass = session.getClass(table);
if (oClass != null)
{
session.command(String.format("TRUNCATE CLASS %s UNSAFE", table)).close();
session.command(String.format("DROP CLASS %s IF EXISTS UNSAFE", table)).close();
}
session.command(String.format("CREATE CLASS %s EXTENDS %s", table, superClass)).close();
session.command(String.format("CREATE PROPERTY %s.name STRING ", table)).close();
session.command(String.format("CREATE PROPERTY %s._key STRING ", table)).close();
session.command(String.format("CREATE INDEX `%s._key` ON `%s` (_key) UNIQUE_HASH_INDEX", table, table)).close();
} catch (Exception e) {
e.printStackTrace();
}
});
session.close();
}
@Test
public void addEdgesConcurrently() {
ODatabaseSession insertSession = pool.acquire();
insertSession.begin();
//<cls, keys>
Map<String, Set<String>> keysToSearchMap = new HashMap<>();
Arrays.asList(TABLE_V1, TABLE_V2).forEach(table -> {
for (int i = 0; i < VERTEX_NUM; ++i) {
OVertex v = insertSession.newVertex(table);
String key = String.format("%s_key_%d", table, i);
Set<String> keys = keysToSearchMap.computeIfAbsent(table, cls -> new HashSet<>());
keys.add(key);
v.setProperty("_key", key);
v.setProperty("name", String.format("%s_name_%d", table, i));
v.save();
}
});
insertSession.commit();
insertSession.close();
System.out.println("======inserting vertices finished=======");
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
final Random random = new Random(System.currentTimeMillis());
for(int t = 0; t < THREAD_NUM; ++t)
{
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " start");
ODatabaseSession session = pool.acquire();
session.begin();
//<key, vertex>
Map<String, OVertex> vertexMap = new HashMap<>();
keysToSearchMap.forEach((cls, keys) -> {
OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", cls, keys);
resultSet.vertexStream().forEach(vertex -> vertexMap.put(vertex.getProperty("_key"), vertex));
resultSet.close();
});
for (int i = 0; i < EDGE_NUM; ++i)
{
int i1 = random.nextInt(VERTEX_NUM);
int i2 = random.nextInt(VERTEX_NUM);
OVertex fromV1 = vertexMap.get(String.format("%s_key_%d", TABLE_V1, i1));
OVertex toV2 = vertexMap.get(String.format("%s_key_%d", TABLE_V2, i2));
OEdge edge = session.newEdge(fromV1, toV2, TABLE_E1);
edge.setProperty("_key", String.format("%s_key_%d", TABLE_E1, i));
edge.setProperty("name", String.format("%s_name_%d", TABLE_E1, i));
edge.save();
}
session.commit();
session.close();
System.out.println(Thread.currentThread().getName() + " finished");
});
}
executorService.shutdown();
while (!executorService.isTerminated())
{
try {
executorService.awaitTermination(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {}//ignore
}
pool.close();
}
}
Thanks.
hi @blueAnger,
I found the reason for this problem, the close of the pool was not rollbacking eventual pending transaction, this was causing the local cache stay active and giving back old record on query, and using old record bring to councurrent modification exception. I fixed this in the last commit, can you double check ?
And as well I would suggest to check if all the methods do commit correctly.
Regards
Hi @tglman The problem remains. I took a look at your commit. You added rollback(true) in several pool-like classes' close() method. IMHO, that doesn't resolve the concurrent modification problem, since the exception is thrown before invoking pool.close().
Let me re-state the problem here:
I realize my test code had some problem. So maybe you can try the following test code. Hope it helps.
import com.orientechnologies.common.util.ORawPair;
import com.orientechnologies.orient.core.db.ODatabasePool;
import com.orientechnologies.orient.core.db.ODatabaseSession;
import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.record.OEdge;
import com.orientechnologies.orient.core.record.OVertex;
import com.orientechnologies.orient.core.sql.executor.OResultSet;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by thomas on 18/1/5.
*/
public class TestInsert
{
private ODatabasePool pool;
private final String TABLE_V1 = "table1";
private final String TABLE_V2 = "table2";
private final String TABLE_E1 = "edge1";
private final int VERTEX_NUM = 50;
private final int EDGE_NUM = 1000;
private final int THREAD_NUM = 10;
/**
* min retry wait time(mills) when concurrent modification occurs
*/
private final long MIN_WAIT_TIME = 1000;
/**
* max retry wait time(mills) when concurrent modification occurs
*/
private final long MAX_WAIT_TIME = 3000;
private final int MAX_RETRY = 3;
private final Logger LOGGER = LoggerFactory.getLogger(TestInsert.class);
@Before
public void before() {
pool = new ODatabasePool("remote:localhost/test", "root", "admin");
ODatabaseSession session = pool.acquire();
Arrays.asList(new ORawPair<>(TABLE_V1, "V"), new ORawPair<>(TABLE_V2, "V"), new ORawPair<>(TABLE_E1, "E")).forEach(pair -> {
try {
String table = pair.getFirst();
String superClass = pair.getSecond();
OClass oClass = session.getClass(table);
if (oClass != null)
{
session.command(String.format("TRUNCATE CLASS %s UNSAFE", table)).close();
session.command(String.format("DROP CLASS %s IF EXISTS UNSAFE", table)).close();
}
session.command(String.format("CREATE CLASS %s EXTENDS %s", table, superClass)).close();
session.command(String.format("CREATE PROPERTY %s.name STRING ", table)).close();
session.command(String.format("CREATE PROPERTY %s._key STRING ", table)).close();
session.command(String.format("CREATE INDEX `%s._key` ON `%s` (_key) UNIQUE_HASH_INDEX", table, table)).close();
} catch (Exception e) {
e.printStackTrace();
}
});
session.close();
}
@Test
public void addEdgesConcurrently() {
ODatabaseSession insertSession = pool.acquire();
insertSession.begin();
//<cls, keys>
Map<String, Set<String>> keysToSearchMap = new HashMap<>();
Arrays.asList(TABLE_V1, TABLE_V2).forEach(table -> {
for (int i = 0; i < VERTEX_NUM; ++i) {
OVertex v = insertSession.newVertex(table);
String key = String.format("%s_key_%d", table, i);
Set<String> keys = keysToSearchMap.computeIfAbsent(table, cls -> new HashSet<>());
keys.add(key);
v.setProperty("_key", key);
v.setProperty("name", String.format("%s_name_%d", table, i));
v.save();
}
});
insertSession.commit();
insertSession.close();
LOGGER.info("======inserting vertices finished=======");
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
final Random random = new Random(System.currentTimeMillis());
for(int t = 0; t < THREAD_NUM; ++t)
{
final int _t = t;
executorService.execute(() -> {
LOGGER.info("=======start==========");
boolean finished = false;
int time = 0;
while (!finished && (++time) <= MAX_RETRY)
{
ODatabaseSession session = null;
try {
session = pool.acquire();
session.begin();
//<key, vertex>
Map<String, OVertex> vertexMap = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : keysToSearchMap.entrySet())
{
OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", entry.getKey(), entry.getValue());
resultSet.vertexStream().forEach(vertex -> vertexMap.put(vertex.getProperty("_key"), vertex));
resultSet.close();
}
for (int i = 0; i < EDGE_NUM; ++i)
{
int i1 = random.nextInt(VERTEX_NUM);
int i2 = random.nextInt(VERTEX_NUM);
OVertex fromV1 = vertexMap.get(String.format("%s_key_%d", TABLE_V1, i1));
OVertex toV2 = vertexMap.get(String.format("%s_key_%d", TABLE_V2, i2));
OEdge edge = session.newEdge(fromV1, toV2, TABLE_E1);
edge.setProperty("_key", String.format("%s_key_%d", TABLE_E1, _t * EDGE_NUM + i));
edge.setProperty("name", String.format("%s_name_%d", TABLE_E1, _t * EDGE_NUM + i));
edge.save();
}
session.commit();
finished = true;
} catch (OConcurrentModificationException e) {
long waitTime = (Math.abs(random.nextLong()) + MIN_WAIT_TIME) % MAX_WAIT_TIME;
LOGGER.info("concurrent modification. retry after {} ms", waitTime);
finished = false;
try {
Thread.sleep(waitTime);
} catch (InterruptedException ignore) {}
} catch (Exception e) {
LOGGER.error("", e);
} finally {
if (session != null) session.close();
}
}
if(finished) LOGGER.info("======finish=======");
else LOGGER.info("========stop after {} retries========", MAX_RETRY);
});
}
executorService.shutdown();
boolean terminated = true;
try {
terminated = executorService.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {}//ignore
if(!terminated)
executorService.shutdownNow();
pool.close();
}
}
Hi @blueAnger,
I got your test and run a few times, i couldn't reproduce the problem, so is quite likely that is fixed now.
OConcurrentModificationException can still happen if you are doing operations concurrently on the same record, for example adding 2 edges to the same record from two different threads.
If you are not able to connect any more to a database after a OConcurrentModificationException is a problem, if this happen to you again would be cool if you can send to us an heap dump of the server.
Regards
@tglman I compile the new develop code, server is not hung right now. As you said OConcurrentModificationException can still happen. But I find the document says
Consider the case where multiple clients attempt to add edges on the same vertex. OrientDB could throw the OConcurrentModificationException exception. This occurs because collections of edges are kept on vertices, meaning that, every time OrientDB adds or removes an edge, both vertices update and their versions increment. You can avoid this issue by using RIDBAG Bonsai structure, which are never embedded, so the edge never updates the vertices.
To use this configuration at run-time, before launching OrientDB, use this code:
java -DridBag.embeddedToSbtreeBonsaiThreshold=-1
So I set this parameter in server.sh, but the OConcurrentModificationException is still there. Does not this parameter work now?
Hi @weickdev,
This is true only if running standalone or embedded, the "RIDBAG Bonsai" mentioned in the documentation is not supported in distributed.
Regards
@tglman But I test it in standalone mode.
Hi @tglman , I know why your commits didn't work in my server. It turns out if I set -Dstorage.useWAL=false in my bin/server.sh, then when OConcurrentModificationException occurs, the server is kinda stuck and the same database is not connectable. For what I know, I posed the JAVA_OPTS in my bin/server.sh early enough, but it seems that no one paid much attention.
Still, according to what weickdev said, the -DridBag.embeddedToSbtreeBonsaiThreshold=-1 didn't work because the OConcurrentModificationException still occurs in standalone mode.
Thanks.
Any updates on this?
With the fix for OrientJS queries getting queued up: https://github.com/orientechnologies/orientjs/commit/5acca8b53b7fdd426c0b2ad203ede0d90caf0007 I am also facing the same OConcurrentModificationException
issue.
Any workaround for this ?
With python goblin ORM, I have stuck in this issue also.
Each request is handled with different OrientGraph instance that is inside OrientStandardGraph. Each have local cache and I can find the same vertex with different versions inside these caches.
After adding edge an edge, the vertex version changes. However other caches retain old versions.
OrientDB Version: 3.0.0.SNAPSHOT
Java Version: 1.8
OS: CentOS
When I insert massive edges, I get below exception occasionally.
In this error log, #156:26159 is an edge. Before this massive edge inserting, I have ran
delete
command to delete all datas of this edge collection. But I can't make sure if it was causes by the delete operation. If I create a new edge collection and and re-insert all the datas (without run the delete command), it woks well.