设为首页 收藏本站
查看: 1394|回复: 0

[经验分享] Flume学习07 — FlumeRpcClientUtils工具类

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-27 17:23:56 | 显示全部楼层 |阅读模式
  FlumeRpcClientUtils提供通用的Event操作功能,通过配置文件可以在多个RpcClient之间进行切换。
  FlumeRpcClientUtils配置参数


# 可选值default、thrift、default_failover、default_loadbalance
# 其中default使用avro协议
# 如果使用default_failover和default_loadbalance,那么也只能用avro协议
client.type = default
# 如果client.type值为default或者thrift,那么hosts只有一台主机
# 如果client.type值为default_failover或者default_loadbalance,那么hosts至少要配置两台主机。
hosts = h1
# hosts = h1,h2,h3
# 主机配置
hosts.h1 = 127.0.0.1:41414
# hosts.h2 = host2.example.org:41414
# hosts.h3 = host3.example.org:41414
# 如果主机连接失败,是否把主机临时放入黑名单中,过一段时间再启用。
# 默认为false,不放入黑名单。
backoff = false
# 主机连接失败,下次连接该主机的最大时间间隔,单位毫秒。
# 默认值为0,相当于30000毫秒。
maxBackoff = 0
# 如果client.type使用default_loadbalance,该模式下主机的轮询策略。
# 可选策略有round_robin、random或者自定义策略(实现LoadBalancingRpcClient$HostSelector接口)。
# 默认是round_robin
host-selector = round_robin
# 批量发送的数量,该值必须大于0,默认是100。
batch-size = 500
# 连接超时,该值必须大于1000,默认是20000,单位毫秒。
connect-timeout = 20000
# 请求超时,该值必须大于1000,默认是20000,单位毫秒。
request-timeout = 20000
# 消息编码,默认是utf-8
charset = utf-8
# 如果消息发送失败,尝试发送的消息次数,默认为3次
attemptTimes = 3
  



import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flume客户端工具类
* 工具类初始化默认读取配置文件flume-client.properties
* 配置文件放在classpath下
*
* @author accountwcx@qq.com
*
*/
public class FlumeRpcClientUtils {
private static final Logger logger = LoggerFactory.getLogger(FlumeRpcClientUtils.class);
private static Properties props = new Properties();
// 是否已初始化
private static volatile boolean isInit = false;
// 发送消息的默认编码,如果没有设置编码,则使用该默认编码
private static final String defaultCharsetName = "utf-8";
// 发送消息的编码
private static Charset charset;
// 如果消息发送失败,尝试发送的消息次数,默认为3次
private static int attemptTimes = 3;
private static RpcClient client;
/**
* 初始化客户端配置 客户端配置必须放在classpath根目录下的flume-client.properties文件中
*/
public synchronized static void init() {
if(isInit){
return;
}
logger.info("初始化配置flume-client.properties");
// 读取配置文件
InputStream is = FlumeRpcClientUtils.class.getClassLoader().getResourceAsStream("flume-client.properties");
if (is == null) {
logger.error("找不到配置文件flume-client.properties");
return;
}
try {
props.load(is);
// 从配置文件中读取消息编码
String charsetName = props.getProperty("charset");
if (charsetName != null && !charsetName.equals("")) {
try {
charset = Charset.forName(charsetName);
} catch (Exception e) {
logger.error("编码charset=" + charsetName + "初始化失败,使用默认编码charset=" + defaultCharsetName, e);
}
}
props.remove("charset");
// 如果编码为空,则使用默认编码utf-8
if (charset == null) {
try {
charset = Charset.forName(defaultCharsetName);
} catch (Exception e) {
logger.error("默认编码charset=" + defaultCharsetName + "初始化失败", e);
}
}
// 读取消息发送次数配置
String strAttemptTimes = props.getProperty("attemptTimes");
if (strAttemptTimes != null && !strAttemptTimes.equals("")) {
int tmpAttemptTimes = 0;
try {
tmpAttemptTimes = Integer.parseInt(strAttemptTimes);
} catch (NumberFormatException e) {
logger.error("消息发送次数attemptTimes=" + strAttemptTimes + "初始化失败,使用默认发送次数attemptTimes=" + attemptTimes, e);
}
if (tmpAttemptTimes > 0) {
attemptTimes = tmpAttemptTimes;
}
}
props.remove("attemptTimes");
// 初始化Flume Client
// 根据不同的client.type,实例也不一样
client = RpcClientFactory.getInstance(props);
isInit = true;
} catch (IOException e) {
logger.error("配置文件flume-client.properties读取失败", e);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
}
}
}
}
/**
* 发送一条记录,如果发送失败,该方法会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* 建议使用appendBatch以获得更好的性能。
*
* @param data 发送内容
* @return 发送成功返回true,失败返回false
*/
public static boolean append(String data) {
boolean flag = false;
Event event = EventBuilder.withBody(data, charset);
int current = 0;
while (!flag && current < attemptTimes) {
current++;
try {
client.append(event);
flag = true;
} catch (EventDeliveryException e) {
logger.error("发送失败,当前已尝试" + current + "次", e);
logger.error("失败消息" + data);
}
}
return flag;
}
/**
* 发送一条记录,如果发送失败,该方法会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* 建议使用appendBatch以获得更好的性能。
*
* @param event 发送内容
* @return 发送成功返回true,失败返回false
*/
public static boolean append(Event event){
boolean flag = false;
int current = 0;
while (!flag && current < attemptTimes) {
current++;
try {
client.append(event);
flag = true;
} catch (EventDeliveryException e) {
logger.error("发送失败,当前已尝试" + current + "次", e);
logger.error("失败消息" + new String(event.getBody(), charset));
}
}
return flag;
}
/**
* 发送一条记录,如果发送失败,该方法会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* 建议使用appendBatch以获得更好的性能。
*
* @param data 发送内容
* @param headers 发送的头文件
* @return 发送成功返回true,失败返回false
*/
public static boolean append(String data, Map<String, String> headers){
Event event = EventBuilder.withBody(data, charset);
if(headers != null){
event.setHeaders(headers);
}
return append(event);
}
/**
* 以批量的方式发送一条记录,该记录不会立即发送,而是会放到内存队列中,直到队列中的记录数达到batchSize才会发送。
* 如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
*
* appendBatch性能远高于append,建议使用。
*
* @param data 单个记录
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatch(String data){
List<String> items = new ArrayList<String>();
items.add(data);
return appendBatch(items);
}
/**
* 以批量的方式发送一条记录,该记录不会立即发送,而是会放到内存队列中,直到队列中的记录数达到batchSize才会发送。
* 如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
*
* appendBatch性能远高于append,建议使用。
*
* @param data 单个记录
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatch(String data, Map<String, String> headers){
List<Event> events = new ArrayList<Event>();
Event event = EventBuilder.withBody(data, charset);
event.setHeaders(headers);
events.add(event);
return appendBatchEvent(events);
}
/**
* 以批量的方式发送一条记录,该记录不会立即发送,而是会放到内存队列中,直到队列中的记录数达到batchSize才会发送。
* 如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
*
* appendBatch性能远高于append,建议使用。
*
* @param data 单个记录
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatch(Event event){
List<Event> events = new ArrayList<Event>();
events.add(event);
return appendBatchEvent(events);
}
/**
* 批量发送多条记录,如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* appendBatch性能远高于append,建议使用。
*
* @param items 内容列表
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatch(List<String> items){
return appendBatch(items, null);
}
/**
* 批量发送多条记录,如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* appendBatch性能远高于append,建议使用。
*
* @param items 内容列表
* @param headers 头部内容
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatch(List<String> items, Map<String, String> headers){
boolean flag = false;
// 如果参数不符合要求,则退出
if(items == null || items.size() < 1){
return flag;
}
List<Event> events = new LinkedList<Event>();
if(headers != null){
for(String item : items){
Event event = EventBuilder.withBody(item, charset);
event.setHeaders(headers);
events.add(event);
}
}else{
for(String item : items){
events.add(EventBuilder.withBody(item, charset));
}
}
// 当前尝试发送的次数
int current = 0;
while (!flag && current < attemptTimes) {
current++;
try {
client.appendBatch(events);
flag = true;
} catch (EventDeliveryException e) {
logger.error("批量发送失败,当前已尝试" + current + "次", e);
}
}
return flag;
}
/**
* 批量发送多条记录,如果发送失败,会尝试多次发送,尝试次数在attemptTimes中设置,默认3次。
* appendBatch性能远高于append,建议使用。
*
* @param events 内容列表
* @return 发送成功返回true,失败返回false
*/
public static boolean appendBatchEvent(List<Event> events){
boolean flag = false;
// 如果参数不符合要求,则退出
if(events == null || events.size() < 1){
return flag;
}
// 当前尝试发送的次数
int current = 0;
while (!flag && current < attemptTimes) {
current++;
try {
client.appendBatch(events);
flag = true;
} catch (EventDeliveryException e) {
logger.error("批量发送失败,当前已尝试" + current + "次", e);
}
}
return flag;
}
public static int getBatchSize(){
return client.getBatchSize();
}
}

  

  示例代码

// 初始化
FlumeRpcClientUtils.init();
for(int i = 0; i < 10; i++){
String data = "发送第" + i + "条数据";
FlumeRpcClientUtils.append(data);
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144276-1-1.html 上篇帖子: Flume NG configuration sample 下篇帖子: Flume 1.4.0 NG 分布式集群搭建
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表