swimos / swim

Full stack application platform for building stateful microservices, streaming APIs, and real-time UIs
https://www.swimos.org
Apache License 2.0
488 stars 41 forks source link

Join Map Lane not responding after a high volume of usage #36

Open SirCipher opened 4 years ago

SirCipher commented 4 years ago

When using a JoinMapLane, if one is to put a large number of entries in to the map (>9000) in one go, either the observers stop being called or the entries are not written. I have tried waiting for ~30 or so seconds and then attempting to use the map again but still nothing happens. When adding a delay of 10ms between the entries this is not observed.

This is to be investigated and a reproducible example will be put here.

SirCipher commented 4 years ago

A minimal example of this issue:

// Copyright 2015-2019 SWIM.AI inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package swim.server;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import swim.actor.ActorSpaceDef;
import swim.api.SwimLane;
import swim.api.SwimRoute;
import swim.api.agent.AbstractAgent;
import swim.api.agent.AgentRoute;
import swim.api.downlink.MapDownlink;
import swim.api.lane.JoinMapLane;
import swim.api.lane.MapLane;
import swim.api.plane.AbstractPlane;
import swim.codec.Format;
import swim.kernel.Kernel;
import swim.observable.function.DidUpdateKey;
import swim.observable.function.WillUpdateKey;
import swim.service.web.WebServiceDef;
import java.util.concurrent.CountDownLatch;
import static org.testng.Assert.fail;

public class JoinMapLaneSpec {

  static class TestMapLaneAgent extends AbstractAgent {
    @SwimLane("map")
    MapLane<String, String> testMap = this.<String, String>mapLane()
        .observe(new TestMapLaneController());

    class TestMapLaneController implements WillUpdateKey<String, String>, DidUpdateKey<String, String> {
      @Override
      public String willUpdate(String key, String newValue) {
        System.out.println(nodeUri() + " willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
        return newValue;
      }

      @Override
      public void didUpdate(String key, String newValue, String oldValue) {
        System.out.println(nodeUri() + " didUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue) + "; oldValue: " + Format.debug(oldValue));
      }
    }
  }

  static class TestJoinMapLaneAgent extends AbstractAgent {
    @SwimLane("join")
    JoinMapLane<String, String, String> testJoinMap = this.<String, String, String>joinMapLane()
        .observe(new TestJoinMapLaneController());

    class TestJoinMapLaneController implements WillUpdateKey<String, String>, DidUpdateKey<String, String> {
      @Override
      public String willUpdate(String key, String newValue) {
        System.out.println(nodeUri() + " willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
        return newValue;
      }

      @Override
      public void didUpdate(String key, String newValue, String oldValue) {
        System.out.println(nodeUri() + " didUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue) + "; oldValue: " + Format.debug(oldValue));
      }
    }

    @Override
    public void didStart() {
      testJoinMap.downlink("xs").hostUri("warp://localhost:53556").nodeUri("/map/xs").laneUri("map").open();
      testJoinMap.downlink("ys").hostUri("warp://localhost:53556").nodeUri("/map/ys").laneUri("map").open();
    }
  }

  static class TestJoinMapPlane extends AbstractPlane {
    @SwimRoute("/map/:name")
    AgentRoute<TestMapLaneAgent> mapRoute;

    @SwimRoute("/join/map/:name")
    AgentRoute<TestJoinMapLaneAgent> joinMapRoute;
  }

  private TestJoinMapPlane plane;
  private Kernel kernel;

  @BeforeMethod
  public void init() {
    kernel = ServerLoader.loadServerStack();
    plane = kernel.openSpace(ActorSpaceDef.fromName("test"))
        .openPlane("test", TestJoinMapPlane.class);

    kernel.openService(WebServiceDef.standard().port(53556).spaceName("test"));
    kernel.start();
  }

  @AfterMethod
  public void close() {
    kernel.stop();
  }

  @Test
  public void testLinkToJoinMapLane() throws InterruptedException {
    final MapDownlink<String, String> xs = getDownlink("/map/xs", "map", null);
    final MapDownlink<String, String> ys = getDownlink("/map/ys", "map", null);
    final MapDownlink<String, String> join = getDownlink("/join/map/all", "join", (WillUpdateKey) (key, newValue) -> {
      System.out.println("join link willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
      return newValue;
    });

    for (int i = 0; i < 15000; i++) {
      xs.put(Integer.toString(i), Integer.toString(i));
    }

    Thread.sleep(5000);
  }

  private MapDownlink<String, String> getDownlink(String nodeUri, String laneUri, Object observer) {
    CountDownLatch didSyncLatch = new CountDownLatch(1);
    MapDownlink<String, String> downlink = plane.downlinkMap()
        .keyClass(String.class)
        .valueClass(String.class)
        .hostUri("warp://localhost:53556/")
        .nodeUri(nodeUri)
        .laneUri(laneUri)
        .didSync(didSyncLatch::countDown);

    if (observer != null) {
      downlink.observe(observer);
    }

    downlink.open();

    try {
      didSyncLatch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
      fail();
    }

    return downlink;
  }

}