coderplay / dragon

Real-time computing system based on hadoop yarn
12 stars 5 forks source link

Serializable DAG utilities is needed #1

Closed coderplay closed 12 years ago

coderplay commented 12 years ago

Every stream computing workflow can be represented by a DAG. These kind of DAG jobs submitted by Hadoop dragon users should be serialized into a file on ApplicationMaster's local filesystem or a distributed filesystem, thus NodeManager can download it and launch worker's containers according to the serialized DAG. Each worker with the same kind acts as a vertex of the DAG. Its input stream comes from another kind of workers(vertices) and its output is transferred into vertices downstream. The stream between two vertexs is represented by edge.

reference:

http://algs4.cs.princeton.edu/42directed/ http://www.jgrapht.org/

coderplay commented 12 years ago

jgrapht can't detect cycle correctly..

  public void testCycleDetection() {
    // vertex type: String, edge type: String
    String vertex1 = "new vertex1";
    String vertex2 = "new vertex2";
    String vertex3 = "new vertex3";
    DirectedAcyclicGraph<String, DefaultEdge> dag =
        new DirectedAcyclicGraph<String, DefaultEdge>(DefaultEdge.class);
    dag.addVertex(vertex1);
    dag.addVertex(vertex2);
    dag.addVertex(vertex3);
    boolean dagRejectedEdge = false;
    try {
      dag.addDagEdge(vertex1, vertex2);
      dag.addDagEdge(vertex2, vertex3);
      dag.addDagEdge(vertex3, vertex1);
    } catch (CycleFoundException e) {
      dagRejectedEdge = true;
    }
    assertTrue(dagRejectedEdge); // failed

    CycleDetector<String, DefaultEdge> detector =
        new CycleDetector<String, DefaultEdge>(dag);
     assertTrue( detector.detectCycles()); // failed
  }

We need to fix this bug.

coderplay commented 12 years ago

Each edge created by the EdgeFactory would have the same hashcode, that leaded to the failure.

  protected E addEdgeInternal(V fromVertex, V toVertex) {
    E e = edgeFactory.createEdge(fromVertex, toVertex); // <-- this line will create a empty string
    if (containsEdge(e)) { // this restriction should stay!
      return null;  // <-- next call of  addEdge()  should reach here  
    } else {
      InternalEdge<V> internalEdge = new InternalEdge<V>(fromVertex, toVertex);
      edgeMap.put(e, internalEdge);
      vertexMap.get(fromVertex).addOutgoingEdge(e);
      vertexMap.get(toVertex).addIncomingEdge(e);
      return e;
    }
  }

If we change the test code into some Class with unique hashcode, the test will succeed.

  public void testCycleDetection() {
    // vertex type: String, edge type: String
    String vertex1 = "new vertex1";
    String vertex2 = "new vertex2";
    String vertex3 = "new vertex3";
    DirectedAcyclicGraph<String, Object> dag =
        new DirectedAcyclicGraph<String, Object>(Object.class);
    dag.addVertex(vertex1);
    dag.addVertex(vertex2);
    dag.addVertex(vertex3);
    boolean dagRejectedEdge = false;
    try {
      dag.addEdge(vertex1, vertex2);
      dag.addEdge(vertex2, vertex3);
      dag.addEdge(vertex3, vertex1);
    } catch (CycleFoundException e) {
      dagRejectedEdge = true;
    }

    assertTrue(dagRejectedEdge);
  }
coderplay commented 12 years ago

I decided to let it be, the default edge class DragonEdge that overrides the hashCode() method will make this bug invisible to users.