Norbert is a library which provides easy cluster management and cluster aware client/server networking APIs. Implemented in Scala, Norbert wraps ZooKeeper, Netty and uses Protocol Buffers for transport to make it easy to build a cluster aware application. A Java API is provided and pluggable load balancing strategies are supported with round robin and consistent hash strategies provided out of the box.
Norbert can be built using Maven.
Norbert provides a simple API to interact with a cluster and to receive notifications when the cluster topology changes.
In Norbert a cluster is a named set of Nodes.
A Node is Norbert's representation of a service which can handle requests. A Node contains:
If an application is designed around a partitioned data set or workload then each node can be assigned partition ids. These partition ids can be used by Norbert's networking layer's partitioned load balancer functionality.
The set of member Nodes in a given cluster is reliably stored in ZooKeeper. Additionally, a Node can advertise that it is available to process requests. In general, a Node can be in one of three states:
Number 1 is most commonly the case that an administrator has specified the node in the cluster metadata, but the node is currently offline. Number 2 is useful when the node is online, but for whatever reason, an administrator does not want it to receive traffic.
The easiest way to define a cluster is to use the NorbertClusterClientMain
command line program which can be found in the examples sub-directory. At the prompt you can type
Under the covers, the NorbertNetworkClientMain
command line program simply uses the addNode
and removeNode
methods on the Cluster
trait. These methods create ZNodes in ZooKeeper which store the Node's hostname/port and partition mapping metadata. Custom tools can be written using those methods in your own code.
Norbert provides two ways to interact with the cluster.
ClusterClient
trait provides methods for retrieving the current data about the cluster.ClusterListener
with the cluster. The cluster will then send notifications to your ClusterListener
s whenever the state of the cluster changes.object NorbertClient {
def main(args: Array[String]) {
val cc = ClusterClient("norbert", "localhost:2181", 30000) (1)
cc.awaitConnectionUninterruptibly (2)
cc.nodes (3)
cc.addListener(new MyClusterListener) (4)
cc.markNodeAvailable(1) (5)
cc.shutdown (6)
}
}
ClusterClient
companion object provides an easy way to instantiate and start a ClusterClient
instance.ClusterListener
s with the cluster and they will be sent notifications when the state of the cluster changes.shutdown
properly cleans up the resources Norbert uses and disconnects you from the cluster.public class NorbertClient {
public static void main(String[] args) {
ClusterClient cc = new ZooKeeperClusterClient("norbert", "localhost:2181", 30000); (1)
cc.awaiteConnectionUninterruptibly(); (2)
cc.getNodes(); (3)
cc.addListener(new MyClusterListener()); (4)
cc.markNodeAvailable(1); (5).
cluster.shutdown(6)
}
}
ClusterListener
s with the cluster and they will be sent notifications when the state of the cluster changes.shutdown
properly cleans up the resources Norbert uses and disconnects you from the cluster.Both the Scala and Java ClusterClient
s take three parameters:
In addition to the cluster management, Norbert provides an API for building cluster aware client/server applications.
Norbert's client/server library uses message passing semantics and, specifically, Protocol Buffers to encode those messages. To use Norbert's client/server library, you will need to define the Protocol Buffers you will use as requests, and the associated Protocol Buffers that will be received as responses to those requests.
Norbert uses a software load balancer mechanism to route a request from a client to a server. Both partitioned and unpartitioned clusters are supported.
If you are building a service which will use an unpartitioned cluster, you must provide your NetworkClient
instance with a LoadBalancerFactory
. The LoadBalancerFactory
is used to create LoadBalancer
instance that will be used to route requests. A round robin load balancer factory is provided.
If you are building a partitioned cluster then you will want to use the PartitionedNetworkClient
and a PartitionedLoadBalancerFactory
. These are generic classes that have a PartitionedId type parameter. PartitionedId is the type of the id that you use to partition your cluster (e.g. a member id). A consistent hash load balancer factory is provided.
object NorbertNetworkServer {
def main(args: Array[String]) {
val config = new NetworkServerConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.requestThreadCorePoolSize = 5
config.requestThreadMaxPoolSize = 10
config.requestThreadKeepAliveTimeSecs = 300
val server = NetworkServer(config) (2)
server.registerHandler(MyRequestMessage.getDefaultInstance, MyResponseMessage.getDefaultInstance, messageHandler _) (3)
server.bind(nodeId) (4)
}
private def messageHandler(message: Message): Message = {
// application logic which returns a MyResponseMessage
}
}
NetworkServerConfig
contains the configuration data for a NetworkServer
.NetworkServer
companion object provides an easy to instantiate a new NetworkServer
instance.NetworkServer
. A single NetworkServer
instance can handle multiple request/response/handlers.NetworkServer
to the network by providing the id of the Node
this server handles requests for. Bind will create a socket, bind it to the port specified in the Node
's url and mark the Node
available in the cluster. After this call the NetworkServer
can begin to receive requests.public class NorbertNetworkServer {
public static void main(String[] args) {
NetworkServerConfig config = new NetworkServerConfig();
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setRequestThreadCorePoolSize(5);
config.setRequestThreadMaxPoolSize(10);
config.setRequestThreadKeepAliveTimeSecs(300);
NetworkServer ns = new NettyNetworkServer(config);
ns.registerHandler(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance(), new MessageHandler());
ns.bind(nodeId);
}
}
NetworkServerConfig
contains the configuration data for a NetworkServer
.NettyNetworkServer
is currently the only implementation of NetworkServer
.NetworkServer
. A single NetworkServer
instance can handle multiple request/response/handlers.NetworkServer
to the network by providing the id of the Node
this server handles requests for. Bind will create a socket, bind it to the port specified in the Node
's url and mark the Node
available in the cluster. After this call the NetworkServer
can begin to receive requests.ClusterClient
instance yourself and have the NetworkServer
use that instance by setting this fieldobject NorbertNetworkClient {
def main(args: Array[String]) {
val config = new NetworkClientConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.connectTimeoutMillis = 1000
config.writeTimeoutMillis = 150
config.maxConnectionsPerNode = 5
config.staleRequestTimeoutMins = 10
config.staleRequestCleanupFrequenceMins = 10
val nc = NetworkClient(config, new RoundRobinLoadBalancerFactory) (2)
OR
val nc = PartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory)
nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()) (3)
val f = nc.sendMessage(myRequestMessageInstance) (4)
OR
val f = nc.sendMessage(1210, myRequestMessageInstance)
try {
val response = f.get(500, TimeUnit.MILLISECONDS).asInstanceOf[MyResponseMessage] (5)
// do something with the response
} catch {
case ex: TimeoutException => println("Timed out")
case ex: ExecutionException => println("Error: %s".format(ex.getCause))
}
}
}
NetworkClientConfig
contains the configuration data for a NetworkClient
.NetworkClient
companion object provides an easy to instantiate a new NetworkClient
instance. Alternatively the PartitionedNetworkClient
companion object provides the same functionality for PartitionedNetworkClient
s.NetworkClient
.NetworkClient
the configured load balancer will be used to send the provided message to an available Node
in the cluster. In the case of a PartitionedNetworkClient
the passed in id will be passed to the configured partitioned load balancer to calculate the correct node to send the message to.public class NorbertNetworkClient {
public static void main(String[] args) {
NetworkClientConfig config = new NetworkClientConfig(); (1)
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setConnectTimeoutMillis(1000);
config.setWriteTimeoutMillis(150);
config.setConnectionsPerNode(5);
config.setStaleRequestTimeoutMins(10);
config.setStaleRequestCleanupFrequenceMins10);
NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); (2)
OR
PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory());
nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()); (3)
Future<Message> f = nc.sendMessage(myRequestMessageInstance); (4)
OR
Future<Message> f = nc.sendMessage(1210, myRequestMessageInstance);
try {
MyResponseMessage response = (MyResponseMessage) f.get(500, TimeUnit.MILLISECONDS); (5)
// do something with the response
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
NetworkClientConfig
contains the configuration data for a NetworkClient
.NettyNetworkClient
and NettyPartitionedNetworkClient
are currently the only implementations of NetworkClient
and PartitionedNetworkClient
respectively.NetworkClient
.NetworkClient
the configured load balancer will be used to send the provided message to an available Node
in the cluster. In the case of a PartitionedNetworkClient
the passed in id will be passed to the configured partitioned load balancer to calculate the correct node to send the message to.ClusterClient
instance yourself and have the NetworkServer
use that instance by setting this field