设为首页 收藏本站
查看: 1355|回复: 0

[经验分享] Flume NG 学习笔记(九)Flune Client 开发

[复制链接]

尚未签到

发表于 2015-11-27 17:51:24 | 显示全部楼层 |阅读模式
  文章内容还是来自官网http://flume.apache.org/FlumeDeveloperGuide.html
  由于在实际工作中,数据的生产方式极具多样性,Flume 虽然包含了一些内置的机制来采集数据,但是更多的时候用户更希望能将应用程序和flume直接相通。所以这边运行用户开发应用程序,通过IPC或者RPC连接flume并往flume发送数据。

一、RPC client interface
  Flume的RpcClient实现了Flume的RPC机制。用户的应用程序可以很简单的调用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法发送数据,不用担心底层信息交换的细节。用户可以提供所需的event通过直接实现Event接口,例如可以使用简单的方便的实现SimpleEvent类或者使用EventBuilder的writeBody()静态辅助方法。
  自Flume 1.4.0起,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。实现中我们需要知道我们将要连接的目标flume agent的host和port用于创建client实例,然后使用RpcClient发送数据到flume agent。
  官网给了一个Avro RPCclients的例子,这边直接拿来做实际测试例子。
  这里我们把client.init(&quot;host.example.org&quot;,41414);
  改成 client.init(&quot;192.168.233.128&quot;,50000);  与我们的主机对接



import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static voidmain(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initializeclient with the remote Flume agent's host and port
//client.init(&quot;host.example.org&quot;,41414);
client.init(&quot;192.168.233.128&quot;,50000);
// Send 10events to the remote Flume agent. That agent should be
// configured tolisten with an AvroSource.
String sampleData = &quot;Hello Flume!&quot;;
for (int i =0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPCconnection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create aFlume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName(&quot;UTF-8&quot;));
// Send theevent
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up andrecreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPCconnection
client.close();
}
}这边代码不解释了,主要是将HelloFlume 发送10遍给flume,同时记得将flume 安装主目录下的lib 文件都添加进项目,才能正常运行程序。  
  
  下面是代理配置:
  

#配置文件:avro_client_case20.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


  
  这里要注意下,之前说了,在接收端需要AvroSource或者Thrift Source来监听接口。所以配置代理的时候要把a1.sources.r1.type 写成avro或者thrift
  #敲命令
  flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console


  启动成功后
  
  在eclipse 里运行JAVA程序,当然也可以打包后在服务器上运行JAVA程序。
  
  #在启动源发送的代理终端查看console输出
   DSC0000.jpg


  
  可以看到10条数据正常发送。
  这里要说明下,开发代码中client.append(event)不仅仅可以发送一条数据,也可以发送一个List(string) 的数据信息,也就是批量发送。这边就不做演示了。

二、Failover Client
  这个类包封装了Avro RPCclient的类默认提供故障处理能力。hosts采用空&#26684;分开host:port所代表的flume agent,构成一个故障处理组。这Failover RPC Client目前不支持thrift。如果当前选择的host agent有问题,这个failover client会自动负载到组中下一个host中。
  下面是官网开发例子:

// Setup properties for the failover
Properties props = new Properties();
props.put(&quot;client.type&quot;, &quot;default_failover&quot;);
// List of hosts (space-separated list of user-chosen host aliases)
props.put(&quot;hosts&quot;, &quot;h1 h2 h3&quot;);
// host/port pair for each host alias
String host1 = &quot;host1.example.org:41414&quot;;
String host2 = &quot;host2.example.org:41414&quot;;
String host3 = &quot;host3.example.org:41414&quot;;
props.put(&quot;hosts.h1&quot;, host1);
props.put(&quot;hosts.h2&quot;, host2);
props.put(&quot;hosts.h3&quot;, host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);

  下面是测试的开发例子

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
import java.util.Properties;
public class Failover_Client {
public static void main(String[] args) {
MyRpcClientFacade2 client = new MyRpcClientFacade2();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = &quot;Hello Flume!&quot;;
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade2 {
private RpcClient client;
private String hostname;
private int port;
public void init() {
// Setup the RPC connection
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
// Setup properties for the failover
Properties props = new Properties();
props.put(&quot;client.type&quot;, &quot;default_failover&quot;);
// List of hosts (space-separated list of user-chosen host aliases)
props.put(&quot;hosts&quot;, &quot;h1 h2 h3&quot;);
// host/port pair for each host alias
String host1 = &quot;192.168.233.128:50000&quot;;
String host2 = &quot;192.168.233.128:50001&quot;;
String host3 = &quot;192.168.233.128:50002&quot;;
props.put(&quot;hosts.h1&quot;, host1);
props.put(&quot;hosts.h2&quot;, host2);
props.put(&quot;hosts.h3&quot;, host3);
// create the client with failover properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName(&quot;UTF-8&quot;));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}

  这边代码设三个host用于故障转移,这里偷懒,用同一个主机的3个端口模拟。代码还是将Hello Flume 发送10遍给第一个flume代理,当第一个代理故障的时候,则发送给第二个代理,以顺序进行故障转移。
  下面是代理配置沿用之前的那个,并对配置文件进行拷贝,
  cp avro_client_case20.conf avro_client_case21.conf
  cp avro_client_case20.conf avro_client_case22.conf
  分别修改avro_client_case21.conf与avro_client_case22.conf中的
  a1.sources.r1.port= 50001 与a1.sources.r1.port = 50002
  #敲命令
  flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
  
  启动成功后
  
  在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。
  #在启动源发送的3个代理终端查看console输出
  我们可以看到第一个代理终端收到了,数据而其他2个终端没有数据。

DSC0001.jpg


  然后我们把第一个终端的进程关掉,再运行一遍client程序,然后会发现这个时候是发生到第二个终端中。当第二个终端也关闭的时候,再发送数据,则是发送到最后一个终端。这里我们可以看到,故障转移的代理主机转移是采用顺序序列的。
  

三、LoadBalancing RPC client
  Flume Client SDK也支持在多个host之间使用负载均衡的Rpc Client。这种类型的client带有一个通过空&#26684;分隔的host:port主机列表并构成了一个负载均衡组。这个client可以指定一个负载均衡的策略,既可以随机的选择一个配置的host,也可以循环选择一个host。当然你也可以自己编写一个类实现LoadBalancingRpcClient$HostSelector接口以至于用户可以使用自己编写的选择顺序。在这种情况下,用户自定义的类需要被指定为host-selector属性的&#20540;。LoadBalancing
RPC Client当前不支持thrift。
  如果开启了backoff,那么client失败将被放入黑名单中,只有过了被指定的超时之间之后这个被选择的失败的主机才会从黑名单中被排除。当超时到了,如果主机还是没有反应,那么这被认为是一个连续的失败并且超时时间会成倍的增长,以避免可能陷入对反应迟钝主机的长时间等待中。
  这backoff的最大超时时间可以通过maxBackoff属性来配置,单位是毫秒。在默认情况下maxBackoff的&#20540;是30秒(在orderSelector类里面指定)。
  下面是官网例子

// Setup properties for the load balancing
Properties props = new Properties();
props.put(&quot;client.type&quot;, &quot;default_loadbalance&quot;);
// List of hosts (space-separated list of user-chosen host aliases)
props.put(&quot;hosts&quot;, &quot;h1 h2 h3&quot;);
// host/port pair for each host alias
String host1 = &quot;host1.example.org:41414&quot;;
String host2 = &quot;host2.example.org:41414&quot;;
String host3 = &quot;host3.example.org:41414&quot;;
props.put(&quot;hosts.h1&quot;, host1);
props.put(&quot;hosts.h2&quot;, host2);
props.put(&quot;hosts.h3&quot;, host3);
props.put(&quot;host-selector&quot;, &quot;random&quot;); // For random host selection
// props.put(&quot;host-selector&quot;, &quot;round_robin&quot;); // For round-robin host
//                                            // selection
props.put(&quot;backoff&quot;, &quot;true&quot;); // Disabled by default.
props.put(&quot;maxBackoff&quot;, &quot;10000&quot;); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);

  下面是测试的开发例子

import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.util.Properties;
public class Load_Client {
public static void main(String[] args) {
MyRpcClientFacade3 client = new MyRpcClientFacade3();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = &quot;Flume Load_Client&quot;;
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade3{
private RpcClient client;
private String hostname;
private int port;
public void init() {
Properties props = new Properties();
props.put(&quot;client.type&quot;, &quot;default_loadbalance&quot;);
// List of hosts (space-separated list of user-chosen host aliases)
props.put(&quot;hosts&quot;, &quot;h1 h2 h3&quot;);
// host/port pair for each host alias
String host1 = &quot;192.168.233.128:50000&quot;;
String host2 = &quot;192.168.233.128:50001&quot;;
String host3 = &quot;192.168.233.128:50002&quot;;
props.put(&quot;hosts.h1&quot;, host1);
props.put(&quot;hosts.h2&quot;, host2);
props.put(&quot;hosts.h3&quot;, host3);
props.put(&quot;host-selector&quot;, &quot;random&quot;); // For random host selection
// props.put(&quot;host-selector&quot;, &quot;round_robin&quot;); // For round-robin host
//                                              // selection
props.put(&quot;backoff&quot;, &quot;true&quot;); // Disabled by default.
props.put(&quot;maxBackoff&quot;, &quot;10000&quot;); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName(&quot;UTF-8&quot;));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}

  这里采用随机的负载均衡props.put(&quot;host-selector&quot;,&quot;random&quot;) 。测试的时候沿用之前的3个接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并将他们起起来。
  #敲命令
  flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
  
  启动成功后
  
  在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。
  #在启动源发送的3个代理终端查看console输出
  下面是Host1,收到了2条数据

DSC0002.jpg
  下面是Host2,收到了2条数据

DSC0003.jpg
  下面是Host3,收到了6条数据。

DSC0004.jpg
  
  可以看到我们开发例子中,host-selector选择的是随机,因此程序也是随机发送数据。下面我们测试轮询round_robin选项。
  程序里我们修改这句
//props.put(&quot;host-selector&quot;,&quot;random&quot;); // For random host selection

  props.put(&quot;host-selector&quot;, &quot;round_robin&quot;);//
Forround-robin host

  再运行Java 程序
  下面是Host1,收到了4条数据

DSC0005.jpg
  
  下面是Host2,收到了3条数据

DSC0006.jpg
  
  同样Host3,收到了3条数据,这边就不放图了。轮询就是按照顺序放图。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144295-1-1.html 上篇帖子: Flume NG源码分析(一)基于静态properties文件的配置模块 下篇帖子: 修改Flume Log4j Appender
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表