# Each server in group gets a unique id:integer in range 1-255
server.id=1
# zookeeper server configuration parameter file -relative path to this bootstrap file
zookeeperConfigFile=server1.cfg
#directory where input events for processing are polled for - common for all servers
inputEventsDir=C:/test/in_events
#directory where output / processed events are written out - may or may not be shared by
#all servers
outputEventsDir=C:/test/out_events
#listener port for Netty server (used for intra server message exchange)
messageServerPort=2195
[size=1.166em]
注意,在这个最小的服务器集群中,每一台服务器都需要一个惟一的 server
id(整数值)。
[size=1.166em]
输入事件目录被所有服务器共享。输出事件目录模拟一个合作伙伴应用程序,并可以视情况由所有服务器共享。ZooKeeper 分发提供了一个类,用于解析服务器集群的每个成员服务器或 “法定数量对等服务器” 的配置信息。因为我们的应用程序重用了这个类,所以它需要相同格式的 ZooKeeper 配置。
[size=1.166em]
还需要注意的是,messageServerPort 是
Netty 服务器(由 Norbert 库启动和管理)的侦听器端口。
package ibm.developerworks.article;
…
public class ZooKeeperServer implements Runnable
{
public ZooKeeperServer(File configFile) throws ConfigException, IOException
{
serverConfig = new QuorumPeerConfig();
…
serverConfig.parse(configFile.getCanonicalPath());
}
public void run()
{
NIOServerCnxn.Factory cnxnFactory;
try
{
// supports client connections
cnxnFactory = new NIOServerCnxn.Factory(serverConfig.getClientPortAddress(),
serverConfig.getMaxClientCnxns());
server = new QuorumPeer();
// most properties defaulted from QuorumPeerConfig; can be overridden
// by specifying in the zookeeper config file
server.setClientPortAddress(serverConfig.getClientPortAddress());
…
server.start(); //start this cluster member
// wait for server thread to die
server.join();
}
…
}
…
public boolean isLeader()
{
//used to control file poller. Only the leader process does task
// distribution
if (server != null)
{
return (server.leader != null);
}
return false;
}
第 4 步:初始化基于 Norbert 的消息传送服务器
public static void init(QuorumPeerConfig config) throws UnknownHostException
{
…
// [a] client (wrapper) for zookeeper server - points to local / in process
// zookeeper server
String host = "localhost" + ":" + config.getClientPortAddress().getPort();
//[a1] the zookeeper session timeout (5000 ms) affects how fast cluster topology
// changes are communicated back to the cluster state listener class
zkClusterClient = new ZooKeeperClusterClient("eai_sample_service", host, 5000);
zkClusterClient.awaitConnectionUninterruptibly();
…
// nettyServerURL - is URL for local Netty server URL
nettyServerURL = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
Server.getNettyServerPort());
…
// [c]
…
zkClusterClient.addNode(nodeId, nettyServerURL);
// [d] add cluster listener to monitor state
zkClusterClient.addListener(new ClusterStateListener());
// Norbert - Netty server config
NetworkServerConfig norbertServerConfig = new NetworkServerConfig();
// [e] group coordination via zookeeper cluster client
norbertServerConfig.setClusterClient(zkClusterClient);
// [f] threads required for processing requests
norbertServerConfig.setRequestThreadMaxPoolSize(20);
networkServer = new NettyNetworkServer(norbertServerConfig);
// [g] register message handler (identifies request and response types) and the
// corresponding object serializer for the request and response
networkServer.registerHandler(new AppMessageHandler(), new CommonSerializer());
// bind the server to the unique server id
// one to one association between zookeeper server and Norbert server
networkServer.bind(Server.getServerId());
}
[size=1.166em]
请注意,基于 Norbert 的消息传送服务器包括一个连接到 ZooKeeper 集群的客户端。配置此服务器,连接到本地(进程中)的 ZooKeeper 服务器,然后为 ZooKeeper 服务器创建一个客户端。会话超时将影响集群的拓扑结构更改可以多快传回应用程序。这将有效地创建一个较小的时间窗口,在该时间窗口内,集群拓扑结构的记录状态将与集群拓扑结构的实际状态不同步,这是因为新的服务器启动或现有的服务器崩溃。应用程序需要在这段时间内缓冲消息或实施消息发送失败的重试逻辑。
[size=1.166em]
MessagingServer.java (清单
4) 执行以下任务:
public class MessagingClient
{
…
public static void init()
{
…
NetworkClientConfig config = new NetworkClientConfig();
// [a] need instance of local norbert based zookeeper cluster client
config.setClusterClient(MessagingServer.getZooKeeperClusterClient());
// cluster client with round robin selection of message target
nettyClient = new NettyNetworkClient(config,
new RoundRobinLoadBalancerFactory());
…
}
...
…
// [c] – if server id <0 – used round robin strategy to choose target
// else send to identified target server
public static Future<String> sendMessage(AppRequestMsg messg, int serverId)
throws Exception
{
…
// [d] load balance message to cluster- strategy registered with client
if (serverId <= 0)
{
…
return nettyClient.sendRequest(messg, serializer);
}
else
{
// [e] message to be sent to particular server node
…
if (destNode != null)
{
…
return nettyClient.sendRequestToNode(messg, destNode, serializer);
}
…
}
}
…
}
[size=1.166em]
注意,在 清单 5 中,如果没有由一个正的 server
Id 值识别目标服务器,根据已配置的负载均衡策略从活动的组中选择服务器会发出消息。应用程序可以配置和实现它们自己的消息处理策略,也许以其他服务器属性为依据。(考虑多租户应用程序,其中的请求可以转发到已识别的服务器子集群,每个租户对应一个子集群;参见 参考资料 了解更多讨论。)
状态监测和任务分发
[size=1.166em]
模拟的应用程序还有三个组件,我们将在下面的章节中进行介绍:
一个组件用于监视集群的状态(服务器成员)。
一个 Spring Integration 流定义文件。进程定义文件定义基于 EIP 的消息流,包括从模拟应用程序的任务池流到中心任务分发程序。任务分发程序最终将每个任务路由到其中一个可用的集群成员,以进行处理。
public class ClusterStateListener implements ClusterListener
{
…
public void handleClusterNodesChanged(Set<Node> currNodeSet)
{
…
// [a] start event adapter if this server is leader
if (Server.isLeader() && !Server.isFilePollerRunning())
{
Server.startFilePoller();
}
}
…
}
基于 Spring Integration 的文件轮询程序
[size=1.166em]
Spring Integration 流执行以下任务(如 清单 7 所示):
{
…
// [a] invoked by spring integration context
public void processMessage(Message<File> polledMessg)
{
File polledFile = polledMessg.getPayload();
…
try
{
logr.info("Received file as input:" + polledFile.getCanonicalPath());
// prefix file name and a delimiter for the rest of the payload
payload = polledFile.getName() + "|~" + Files.toString(polledFile, charset);
…
// create new message
AppRequestMsg newMessg = new AppRequestMsg(payload);
// load balance the request to operating servers without
// targeting any one in particular
Future<String> retAck = MessagingClient.sendMessage(newMessg, -1);
// block for acknowledgement - could have processed acknowledgement
// asynchronously by repositing to a separate queue
String ack = retAck.get();
…
logr.info("sent message and received acknowledgement:" + ack);
…
}
}
[size=1.166em]
请注意,服务激活器 方法的调用是通过在文件轮询程序找到一个文件进行处理之后控制 Spring Integration 上下文来完成。另外请注意,文件的内容是序列化的,并形成一个新请求对象的有效负载。消息传送客户端的 sendMessage() 方法被调用,但没有针对某一特定的目标服务器。然后,Norbert
客户端模块将结果消息负载均衡到集群的其中一台运行的服务器。