111 发表于 2019-1-29 08:04:00

elasticsearch源码分析---TransportClient

  在es中存在两种形式的客户端,NodeClient和TransportClient。两者的主要区别在于是否加入集群。TransportClient是不会加入集群(作为集群的一个节点)。因此TransportClient跟NodeClient相比算是轻量级的。因为不加入集群,因此访问集群需要指定一个或者多个地址。客户端,顾名思义,就是作为调用的发起方,来向服务器请求数据。因此,我们执行的诸如索引、查询等一系列操作在TransportClient中都提供了对应的接口,这些接口无需过多介绍。
  在TransportClient中,最重要的莫过于InternalTransportClient实例和NodeService服务。NodeService负责客户端与服务器的连接。
  

nodesService = injector.getInstance(TransportClientNodesService.class);  
internalClient = injector.getInstance(TransportProxyClient.class);
  

  TransportProxyClient 用来缓存nodesService 的action信息,action信息包括查询node状态,node健康情况等,方便nodesService快速执行
  我们重点关注下NodeService。客户端是如何维护与服务器的连接呢?
  在NodeService中有维护者三类节点:listedNode(调用addTransportAddress加入的初始连接节点),nodes(真正于服务器建立了连接的节点),filteredNode(没有连接上的节点,比如由于clusterName不一致)。
  当我们调用addTransportAddress加入一个或者地址时,会发生什么呢?关注TransportNodeService的如下方法:
  

public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {  synchronized (mutex) {
  if (closed) {
  throw new ElasticsearchIllegalStateException("transport client is closed, can't add an address");
  }
  List filtered = Lists.newArrayListWithExpectedSize(transportAddresses.length);
  for (TransportAddress transportAddress : transportAddresses) {
  boolean found = false;
  for (DiscoveryNode otherNode : listedNodes) {
  if (otherNode.address().equals(transportAddress)) {
  found = true;
  logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
  break;
  }
  }
  if (!found) {
  filtered.add(transportAddress);
  }
  }
  if (filtered.isEmpty()) {
  return this;
  }
  ImmutableList.Builder builder = ImmutableList.builder();
  builder.addAll(listedNodes());
  for (TransportAddress transportAddress : filtered) {
  DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
  logger.debug("adding address [{}]", node);
  builder.add(node);
  }
  listedNodes = builder.build();
  nodesSampler.sample();
  }
  return this;
  }
  

  初始状态下listedNodes是没有赋值的,因此所有address都进入了filtered的变量中,之后再调用addTransportAddress的时候listedNode就祈祷过滤作用了,防止重复添加。
  加入filtered中的address均是没有潜在的未建立连接的节点(也是我们想要建立连接的节点),用这些节点第一次对listedNodes进行了赋值。调用到了关键的nodeSampler的sample方法。这里插播一下nodesSampler实例。在构造函数中:
  

if (componentSettings.getAsBoolean("sniff", false)) {  this.nodesSampler = new SniffNodesSampler();
  } else {
  this.nodesSampler = new SimpleNodeSampler();
  }
  

  nodeSampler是根据client.transport,sniff属性的设置来生成具体的策略,默认false。设置为true意味着处理要连接的节点(listedNode)之外,还会主动去发现集群中的其他存活的节点,也就是SniffNodesSampler的doSample方法。这个sniff操作的频率通过nodes_sampler_interval这个选项来控制,默认5s。
  主要对两类节点进行ping操作,一类是listedNode,一类是上一次ping之前确认已经建立连接关系的nodes。
  

  
if (!transportService.nodeConnected(listedNode)) {
  try {
  

  // if its one of the actual nodes we will talk to, not to listed nodes, fully connect
  if (nodes.contains(listedNode)) {
  logger.trace("connecting to cluster node [{}]", listedNode);
  transportService.connectToNode(listedNode);
  } else {
  // its a listed node, light connect to it...
  logger.trace("connecting to listed node (light) [{}]", listedNode);
  transportService.connectToNodeLight(listedNode);
  }
  } catch (Exception e) {
  logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
  latch.countDown();
  return;
  }
  }
  

  通过对当前listednode的判断,如果没有建立过连接,就进入了以上的逻辑,进行fully connect还是light connect呢?判断标准在于这个listednode是否是我们要将要实际通讯的node,如果是,就进行fully connect,否则是light connect。light connect只是为了ping。fully connect包括
  recovery:做数据恢复recovery,默认个数2个;
  bulk:用于bulk请求,默认个数3个;
  med/reg:典型的搜索和单doc索引,默认个数6个;
  high:如集群state的发送等,默认个数1个;
  ping:就是node之间的ping咯。默认个数1个;
  

      this.connectionsPerNodeRecovery = this.settings.getAsInt("transport.netty.connections_per_node.recovery", settings.getAsInt(CONNECTIONS_PER_NODE_RECOVERY, 2));  this.connectionsPerNodeBulk = this.settings.getAsInt("transport.netty.connections_per_node.bulk", settings.getAsInt(CONNECTIONS_PER_NODE_BULK, 3));
  this.connectionsPerNodeReg = this.settings.getAsInt("transport.netty.connections_per_node.reg", settings.getAsInt(CONNECTIONS_PER_NODE_REG, 6));
  this.connectionsPerNodeState = this.settings.getAsInt("transport.netty.connections_per_node.high", settings.getAsInt(CONNECTIONS_PER_NODE_STATE, 1));
  this.connectionsPerNodePing = this.settings.getAsInt("transport.netty.connections_per_node.ping", settings.getAsInt(CONNECTIONS_PER_NODE_PING, 1));
  

  每个节点默认都会创建13个到其他节点的连接,并且节点之间是互相连接的,每增加一个节点,该节点会到每个节点创建13个连接,而其他每个节点也会创建13个连回来的连接。
http://i2.运维网.com/images/blog/201806/05/b212aff6dcc34480bc9492d3ae0fa1c6.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=
  对应的代码为:
  之后会通过transport模块发送向listednode发送一个request请求,来获取节点状态。根据返回的节点状态,clusterName不匹配的节点放入filteredNode,而其余节点在通过validateNewNodes来验证,最终赋值给nodes。
  

protected ImmutableList validateNewNodes(Set nodes) {  for (Iterator it = nodes.iterator(); it.hasNext(); ) {
  DiscoveryNode node = it.next();
  if (!transportService.nodeConnected(node)) {
  try {
  logger.trace("connecting to node [{}]", node);
  transportService.connectToNode(node);
  } catch (Throwable e) {
  it.remove();
  logger.debug("failed to connect to discovered node [" + node + "]", e);
  }
  }
  }
  

  return new ImmutableList.Builder().addAll(nodes).build();
  }
  

  在validataNewNode中建立了full connect,证明这是我们将要进行实际通信的节点。
  出处:
  https://blog.csdn.net/jingkyks/article/details/45032895


页: [1]
查看完整版本: elasticsearch源码分析---TransportClient