设为首页 收藏本站
查看: 1167|回复: 0

[经验分享] Flume NG源码分析(六)应用程序使用的RpcClient设计

[复制链接]

尚未签到

发表于 2015-11-27 18:33:55 | 显示全部楼层 |阅读模式
  上一篇Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志 介绍了ThriftSource利用Thrfit服务ThriftSourceProtocol来收集日志。这篇说说flume-ng-sdk中提供给应用层序使用的RpcClient的设计和实现。继续使用ThriftRpcClient来作例子。
  


  先看看ThriftSourceProtocol提供的原生的客户端,它是Thrfit通过flume.thrift文件定义的Thrfit服务默认生成。这个原生的Client提供了网络传输和协议编解码等RPC客户端的基本功能。关于Thrift客户端可以参考这篇Thrift源码分析(三)-- IDL和生成代码分析
  

public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
public Factory() {}
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
return new Client(prot);
}
public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
return new Client(iprot, oprot);
}
}
public Client(org.apache.thrift.protocol.TProtocol prot)
{
super(prot, prot);
}
public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
super(iprot, oprot);
}
public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException
{
send_append(event);
return recv_append();
}
public void send_append(ThriftFlumeEvent event) throws org.apache.thrift.TException
{
append_args args = new append_args();
args.setEvent(event);
sendBase(&quot;append&quot;, args);
}
public Status recv_append() throws org.apache.thrift.TException
{
append_result result = new append_result();
receiveBase(result, &quot;append&quot;);
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, &quot;append failed: unknown result&quot;);
}
public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException
{
send_appendBatch(events);
return recv_appendBatch();
}
public void send_appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException
{
appendBatch_args args = new appendBatch_args();
args.setEvents(events);
sendBase(&quot;appendBatch&quot;, args);
}
public Status recv_appendBatch() throws org.apache.thrift.TException
{
appendBatch_result result = new appendBatch_result();
receiveBase(result, &quot;appendBatch&quot;);
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, &quot;appendBatch failed: unknown result&quot;);
}
}

来看看Flume NG是如何封装Thrift客户端的。Flume NG支持Avro,Thrfit等多种RPC实现,它的RpcClient层次结构如下  
DSC0000.jpg


  


  RpcClient接口定义了给应用程序使用的RPC客户端的基本功能


  

public interface RpcClient {

public int getBatchSize();
public void append(Event event) throws EventDeliveryException;
public void appendBatch(List<Event> events) throws
EventDeliveryException;
public boolean isActive();
public void close() throws FlumeException;
}

AbstractRpcClient抽象类实现了RPCClient接口,提供了getBatchSize的默认实现,并增加了configure接口来支持配置  
  

public abstract class AbstractRpcClient implements RpcClient {
protected int batchSize =
RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
protected long connectTimeout =
RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
protected long requestTimeout =
RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
@Override
public int getBatchSize(){
return batchSize;
}
@Override
public abstract void append(Event event) throws EventDeliveryException;
@Override
public abstract void appendBatch(List<Event> events)
throws EventDeliveryException;
@Override
public abstract boolean isActive();
@Override
public abstract void close() throws FlumeException;
protected abstract void configure(Properties properties)
throws FlumeException;
}

对于一个设计良好的服务框架的客户端来说,有几个基本的特性  
  1. 服务寻址
  2. 连接池管理
  3. 客户端实现RPC调用的负载均衡
  4. 缓存


  5. 容灾处理,失效转移


  


  我们来看看Flume NG是如何设计它的服务客户端的。基本的组件如下:
DSC0001.jpg


  
  服务寻址


  Flume NG的RPC客户端的服务寻址实现比较简单,只是在Properties配置文件里设置Thrift服务器的IP和端口,然后用这个&#20540;来创建TSocket。这里是一个可以扩展点,使服务寻址的能力更强,更灵活
  

      HostInfo host = HostInfo.getHostInfoList(properties).get(0);
hostname = host.getHostName();
port = host.getPortNumber();
// ClientWrapper
public ClientWrapper() throws Exception{
// 使用hostname, port来构建TSocket
      transport = new TFastFramedTransport(new TSocket(hostname, port));
      transport.open();
      client = new ThriftSourceProtocol.Client(new TCompactProtocol
        (transport));
      // Not a great hash code, but since this class is immutable and there
      // is at most one instance of the components of this class,
      // this works fine [If the objects are equal, hash code is the same]
      hashCode = random.nextInt();
    }



  
  
  连接池管理



首先是使用ClientWrapper类来封装Thrift生成的原生的Client,可以通过Properties配置来设置Client的&#20540;,设置socket连接和protocol编解码协议  

private class ClientWrapper {
public final ThriftSourceProtocol.Client client;
public final TFastFramedTransport transport;
private final int hashCode;
public ClientWrapper() throws Exception{
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
client = new ThriftSourceProtocol.Client(new TCompactProtocol
(transport));
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
hashCode = random.nextInt();
}
}


ConnectionPoolManager实现了一个简单的连接池管理类,提供了checkOut和checkIn两个方法来借出和归还连接对象ClientWrapper。使用ReentrantLock和它的条件队列Condition来实现管程的功能,自管理同步操作。当availableClients为空,并且已经达到连接池的最大&#20540;时,checkOut操作会阻塞。当checkIn归还连接对象时,唤醒在checkOut上阻塞的线程。  
DSC0002.jpg
  

private class ConnectionPoolManager {
private final Queue<ClientWrapper> availableClients;
private final Set<ClientWrapper> checkedOutClients;
private final int maxPoolSize;
private int currentPoolSize;
private final Lock poolLock;
private final Condition availableClientsCondition;
public ConnectionPoolManager(int poolSize) {
this.maxPoolSize = poolSize;
availableClients = new LinkedList<ClientWrapper>();
checkedOutClients = new HashSet<ClientWrapper>();
poolLock = new ReentrantLock();
availableClientsCondition = poolLock.newCondition();
currentPoolSize = 0;
}
public ClientWrapper checkout() throws Exception {
ClientWrapper ret = null;
poolLock.lock();
try {
if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
ret = new ClientWrapper();
currentPoolSize++;
checkedOutClients.add(ret);
return ret;
}
while (availableClients.isEmpty()) {
availableClientsCondition.await();
}
ret = availableClients.poll();
checkedOutClients.add(ret);
} finally {
poolLock.unlock();
}
return ret;
}
public void checkIn(ClientWrapper client) {
poolLock.lock();
try {
availableClients.add(client);
checkedOutClients.remove(client);
availableClientsCondition.signal();
} finally {
poolLock.unlock();
}
}
public void destroy(ClientWrapper client) {
poolLock.lock();
try {
checkedOutClients.remove(client);
currentPoolSize--;
} finally {
poolLock.unlock();
}
client.transport.close();
}
public void closeAll() {
poolLock.lock();
try {
for (ClientWrapper c : availableClients) {
c.transport.close();
currentPoolSize--;
}
/*
* Be cruel and close even the checked out clients. The threads writing
* using these will now get an exception.
*/
for (ClientWrapper c : checkedOutClients) {
c.transport.close();
currentPoolSize--;
}
} finally {
poolLock.unlock();
}
}
}
}

客户端负载均衡  
  LoadBalancingRpcClient继承了AbstractRpcClient类,提供了RPC客户端的负载均衡。这是一个装饰器模式的实现。
  HostSelector接口定义了负载均衡的接口,它是对HostInfo进行负载均衡,再由HostInfo找到对应的RpcClient对象。
  

public interface HostSelector {
void setHosts(List<HostInfo> hosts);
Iterator<HostInfo> createHostIterator();
void informFailure(HostInfo failedHost);
}

HostSelector有两个默认的实现  
  RoundRobinHostSelector是轮询方式的负载均衡实现
  RandomOrderHostSelector是随机方式的负载均衡实现


DSC0003.jpg


  看下RoundRobinHostSelector的实现,它的逻辑主要在OrderSelector这个类中实现
  

private static class RoundRobinHostSelector implements HostSelector {
private OrderSelector<HostInfo> selector;
RoundRobinHostSelector(boolean backoff, long maxBackoff){
selector = new RoundRobinOrderSelector<HostInfo>(backoff);
if(maxBackoff != 0){
selector.setMaxTimeOut(maxBackoff);
}
}
@Override
public synchronized Iterator<HostInfo> createHostIterator() {
return selector.createIterator();
}
@Override
public synchronized void setHosts(List<HostInfo> hosts) {
selector.setObjects(hosts);
}
public synchronized void informFailure(HostInfo failedHost){
selector.informFailure(failedHost);
}
}

OrderSelector是一个支持回退backoff算法的顺序选择容器,它的类层次结构如下
  
DSC0004.jpg


  父类OrderSelector是抽象类,定义了回退算法,子类RoundRobinOrderSelector和RandomOrderSelector实现了创建迭代器的算法。
  RoundRobinOrderSelector的代码如下
  1. getIndexList()返回状态正常的对象列表
  2. nextHead索引指向当前位置,作为轮询的起点


  

public class RoundRobinOrderSelector<T> extends OrderSelector<T> {
private int nextHead = 0;
public RoundRobinOrderSelector(boolean shouldBackOff) {
super(shouldBackOff);
}
@Override
public Iterator<T> createIterator() {
List<Integer> activeIndices = getIndexList();
int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if (nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
int[] indexOrder = new int[size];
for (int i = 0; i < size; i++) {
indexOrder = activeIndices.get((begin + i) % size);
}
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}
}
  
  对于LoadBalanceRpcClient来说,它的配置文件里,同一个RPC服务的服务器列表至少有两个服务端信息才能使用负载均衡。在配置文件中还配置了回退算法和负载均衡算法相关的配置
  

protected void configure(Properties properties) throws FlumeException {
clientMap = new HashMap<String, RpcClient>();
configurationProperties = new Properties();
configurationProperties.putAll(properties);
hosts = HostInfo.getHostInfoList(properties);
if (hosts.size() < 2) {
throw new FlumeException(&quot;At least two hosts are required to use the &quot;
+ &quot;load balancing RPC client.&quot;);
}
String lbTypeName = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR,
        RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN);
    boolean backoff = Boolean.valueOf(properties.getProperty(
            RpcClientConfigurationConstants.CONFIG_BACKOFF,
            String.valueOf(false)));
    String maxBackoffStr = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);
    long maxBackoff = 0;
    if(maxBackoffStr != null) {
      maxBackoff     = Long.parseLong(maxBackoffStr);
    }
    if (lbTypeName.equalsIgnoreCase(
        RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) {
      selector = new RoundRobinHostSelector(backoff, maxBackoff);
    } else if (lbTypeName.equalsIgnoreCase(
        RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) {
      selector = new RandomOrderHostSelector(backoff, maxBackoff);
    } else {
      try {
        @SuppressWarnings(&quot;unchecked&quot;)
        Class<? extends HostSelector> klass = (Class<? extends HostSelector>)
            Class.forName(lbTypeName);
        selector = klass.newInstance();
      } catch (Exception ex) {
        throw new FlumeException(&quot;Unable to instantiate host selector: &quot;
            + lbTypeName, ex);
      }
    }
    selector.setHosts(hosts);
    isOpen = true;
}



  
  客户端负载均衡的主要组件如下
DSC0005.jpg


  


  客户端缓存


  客户端缓存比较简单,使用了一个Map结构,保存了HostInfo和对应的RPCClient对象,这样可以复用RPCClient对象,这是一个重对象,包含了一个连接池的实例。


  

clientMap = new HashMap<String, RpcClient>();

private synchronized RpcClient getClient(HostInfo info)
      throws FlumeException, EventDeliveryException {
    throwIfClosed();
    String name = info.getReferenceName();
    RpcClient client = clientMap.get(name);
    if (client == null) {
      client = createClient(name);
      clientMap.put(name, client);
    } else if (!client.isActive()) {
      try {
        client.close();
      } catch (Exception ex) {
        LOGGER.warn(&quot;Failed to close client for &quot; + info, ex);
      }
      client = createClient(name);
      clientMap.put(name, client);
    }
    return client;
  }

  
  客户端容灾处理



FailoverRpcClient类实现了客户端的容灾处理,它也是装饰器模式的实现,基础了AbstractRpcClient,实现了RpcClient接口FailoverRpcClient主要是实现了失效转移,利用重试机制,当一个RpcClient失效,就使用下一个RpcClient重试RPC请求,直到成功,或者全部失败  FailoverRpcClient也维护了一个HostInfo列表,由HostInfo再找到对应的RpcClient。还维护了一个最大的重试次数maxTries
  

private synchronized void configureHosts(Properties properties)
throws FlumeException {
if(isActive){
logger.error(&quot;This client was already configured, &quot; +
&quot;cannot reconfigure.&quot;);
throw new FlumeException(&quot;This client was already configured, &quot; +
&quot;cannot reconfigure.&quot;);
}
hosts = HostInfo.getHostInfoList(properties);
String tries = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS);
if (tries == null || tries.isEmpty()){
maxTries = hosts.size();
} else {
try {
maxTries = Integer.parseInt(tries);
} catch (NumberFormatException e) {
maxTries = hosts.size();
}
}
......
}


看一下它的append方法,实现了重试机制来做失效转移  
  

public void append(Event event) throws EventDeliveryException {
//Why a local variable rather than just calling getClient()?
//If we get an EventDeliveryException, we need to call close on
//that specific client, getClient in this case, will get us
//the next client - leaving a resource leak.
RpcClient localClient = null;
synchronized (this) {
if (!isActive) {
logger.error(&quot;Attempting to append to an already closed client.&quot;);
throw new EventDeliveryException(
&quot;Attempting to append to an already closed client.&quot;);
}
}
// Sit in an infinite loop and try to append!
int tries = 0;
while (tries < maxTries) {
try {
tries++;
localClient = getClient();
localClient.append(event);
return;
} catch (EventDeliveryException e) {
// Could not send event through this client, try to pick another client.
logger.warn(&quot;Client failed. Exception follows: &quot;, e);
localClient.close();
localClient = null;
} catch (Exception e2) {
logger.error(&quot;Failed to send event: &quot;, e2);
throw new EventDeliveryException(
&quot;Failed to send event. Exception follows: &quot;, e2);
}
}
logger.error(&quot;Tried many times, could not send event.&quot;
throw new EventDeliveryException(&quot;Failed to send the event!&quot;);
}


  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144319-1-1.html 上篇帖子: 分布式海量日志采集、聚合和传输系统:Cloudera Flume 下篇帖子: Flume Hello World!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表