设为首页 收藏本站
查看: 1266|回复: 0

[经验分享] flume kafka-sink high cpu

[复制链接]

尚未签到

发表于 2015-11-27 20:53:51 | 显示全部楼层 |阅读模式
  flume sink到kfka时候,导致cpu过高,以下是分析原因:
  一、flume的kafka的sink cpu太高分析:
1、获取flume的进程id

[iyunv@datanode conf]$ top
top - 10:17:58 up 14 days, 18:09,  2 users,  load average: 1.37, 1.11, 0.65
Tasks: 436 total,   1 running, 435 sleeping,   0 stopped,   0 zombie
Cpu(s): 12.1%us,  2.6%sy,  0.0%ni, 84.1%id,  0.3%wa,  0.0%hi,  0.8%si,  0.0%st
Mem:  32751584k total, 32543520k used,   208064k free,    58760k buffers
Swap: 16383996k total,        0k used, 16383996k free, 29961680k cached
PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                           
6330 kafka     20   0  566m 122m 2392 S 124.2  0.4   5481:12 kafka_logtailer                                                                                                                  
18588 root    20   0 7913m 713m  16m S 110.6  2.2  10:08.62 java            
拿到flume的进程18588

2、查看哪个线程导致cpu过高:
[iyunv@datanode conf]$ ps -mp 18588 -o THREAD,tid,time
USER     %CPU PRI SCNT WCHAN  USER SYSTEM   TID     TIME
root    110   -    - -         -      -     - 00:12:05
。。。。。。。。。。。。。。
root   92.0  19    - -         -      - 18658 00:10:04
root    0.0  19    - futex_    -      - 18659 00:00:00
root    0.0  19    - futex_    -      - 18662 00:00:00
root    0.0  19    - futex_    -      - 18664 00:00:00

发现线程id18658 这个导致cpu过高
3、利用jstack分析堆栈信息:
输出十六进制,并根据jstack命令输出堆栈
[iyunv@datanode conf]$ printf "%x\n" 18658
48e2

[iyunv@datanode conf]$ jstack 18588 |grep 48e2 -A 30
"SinkRunner-PollingRunner-DefaultSinkProcessor" #42 prio=5 os_prio=0 tid=0x00007fd1cc2f4000 nid=0x48e2 runnable [0x00007fd22c30f000]
java.lang.Thread.State: RUNNABLE
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:93)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
发现KafkaSink.process方法导致cpu过高
4、利用strace命令跟踪:
[iyunv@datanode conf]$ strace -o log.strace -Ttt -p 18658

打开log.strace有大量的读写操作文件内容如下:
10:25:53.594363 write(127, &quot;\177\0\0\0$\27\r\2\0\0\0\21E\241\30bO\1\0\0\31A\214}\366N\1\0\0\n\r\37&quot;..., 41) = 41 <0.000020>
10:25:53.594412 lseek(129, 199509196, SEEK_SET) = 199509196 <0.000009>
10:25:53.594441 read(129, &quot;\177&quot;, 1)    = 1 <0.000010>
10:25:53.594472 read(129, &quot;\0&quot;, 1)      = 1 <0.000009>
10:25:53.594502 read(129, &quot;\0&quot;, 1)      = 1 <0.000009>
10:25:53.594532 read(129, &quot;\4&quot;, 1)      = 1 <0.000009>
10:25:53.594561 read(129, &quot;\350&quot;, 1)    = 1 <0.000009>
10:25:53.594591 read(129, &quot;\27\r\1\0\0\0\21\332x\30bO\1\0\0\31,\214}\366N\1\0\0\315\t\n\301\t\22\276\t&quot;..., 1256) = 1256 <0.000012>
10:25:53.594645 lseek(127, 0, SEEK_CUR) = 199527334 <0.000009>
10:25:53.594674 lseek(127, 0, SEEK_CUR) = 199527334 <0.000009>
10:25:53.594703 fstat(127, {st_mode=S_IFREG|0664, st_size=200168807, ...}) = 0 <0.000008>
10:25:53.594744 write(127, &quot;\177\0\0\0$\27\r\2\0\0\0\21E\241\30bO\1\0\0\31B\214}\366N\1\0\0\n\r\37&quot;..., 41) = 41 <0.000014>
10:25:53.594785 lseek(129, 199510457, SEEK_SET) = 199510457 <0.000008>

5、修改kafka sink源码:

———---------------------------a、之前代码:---------------------------------------------
public Status processOld() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;
try {
long processedEvents = 0;
transaction = channel.getTransaction();
transaction.begin();
messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in channel
break;
}
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR);
if (logger.isDebugEnabled()) {
logger.debug(&quot;{Event} &quot; + eventTopic + &quot; : &quot; + eventKey
+ &quot; : &quot; + new String(eventBody, &quot;UTF-8&quot;));
logger.debug(&quot;event #{}&quot;, processedEvents);
}
JsonObject info = new JsonObject();
JsonObject tags = new JsonObject();
Gson gson = new Gson();
JsonElement je = gson
.toJsonTree(new String(eventBody, &quot;UTF-8&quot;));
info.add(&quot;message&quot;, je);
String hostName = InetAddress.getLocalHost().getHostName();
je = gson.toJsonTree(hostName);
tags.add(&quot;hostname&quot;, je);
je = gson.toJsonTree(&quot;web-ubt&quot;);
tags.add(&quot;logname&quot;, je);
je = gson.toJsonTree(System.currentTimeMillis());
tags.add(&quot;event_time&quot;, je);
je = gson.toJsonTree(tags);
info.add(&quot;tags&quot;, je);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
eventTopic, eventKey, info.toString().getBytes());
messageList.add(data);
}
// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime)
/ (1000 * 1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList
.size()));
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = &quot;Failed to publish events&quot;;
logger.error(&quot;Failed to publish events&quot;, ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error(&quot;Transaction rollback failed&quot;, e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}

————————————b、修改后代码:—————————————————————————

@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Transaction transaction = null;
try {
Channel channel = getChannel();
Event event = null;
transaction = channel.getTransaction();
transaction.begin();
event = channel.take();
if (event == null) {
// no events available in channel
transaction.commit();
return result;
}
messageList.add(putkeyMessage(event));
transaction.commit();
int size = messageList.size();
if (size == 0) {
result = Status.BACKOFF;
counter.incrementBatchUnderflowCount();
} else {
if (size < batchSize) {
counter.incrementBatchUnderflowCount();
} else {
counter.incrementBatchCompleteCount();
}
counter.addToEventDrainAttemptCount(size);
}
// publish batch and commit.
if (size == batchSize) {
producer.send(messageList);
messageList.clear();
}
counter.addToEventDrainSuccessCount(size);
} catch (Exception ex) {
String errorMsg = &quot;Failed to publish events&quot;;
logger.error(&quot;Failed to publish events&quot;, ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error(&quot;Transaction rollback failed&quot;, e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
private KeyedMessage<String, byte[]> putkeyMessage(Event event) throws Exception {
String eventTopic = null;
String partitionKey = null;
KeyedMessage<String, byte[]> data = null;
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
partitionKey = headers.get(KEY_HDR);
byte[] eventBody = event.getBody();
JsonObject info = new JsonObject();
JsonObject tags = new JsonObject();
Gson gson = new Gson();
JsonElement je = gson.toJsonTree(new String(eventBody, &quot;UTF-8&quot;));
info.add(&quot;message&quot;, je);
String hostName = InetAddress.getLocalHost().getHostName();
je = gson.toJsonTree(hostName);
tags.add(&quot;hostname&quot;, je);
je = gson.toJsonTree(&quot;web&quot;);
tags.add(&quot;logname&quot;, je);
je = gson.toJsonTree(System.currentTimeMillis());
tags.add(&quot;event_time&quot;, je);
je = gson.toJsonTree(tags);
info.add(&quot;tags&quot;, je);
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, byte[]>(eventTopic, info.toString().getBytes());
} else {
data = new KeyedMessage<String, byte[]>(eventTopic, partitionKey, info.toString().getBytes());
}
return data;
}


6、最后前后cpu对比:
a、修改后的:
[iyunv@xg-ubt-web-1 conf]$ ps -mp 27116 -o THREAD,tid,time
USER     %CPU PRI SCNT WCHAN  USER SYSTEM   TID     TIME
root   14.8   -    - -         -      -     - 00:03:29
…………………..
root    0.0  19    - futex_    -      - 27184 00:00:00
root    57.7  19    - sync_p    -      - 27185 00:02:07

b、修改前的:
[iyunv@datanode conf]$ ps -mp 18588 -o THREAD,tid,time
USER     %CPU PRI SCNT WCHAN  USER SYSTEM   TID     TIME
root    107   -    - -         -      -     - 00:26:54
…..
root    0.0  19    - futex_    -      - 18657 00:00:00
root   90.9  19    - sync_p    -      - 18658 00:22:42
root    0.0  19    - futex_    -      - 18659 00:00:00
root    0.0  19    - futex_    -      - 18662 00:00:00
root    0.0  19    - futex_    -      - 18664 00:00:00
root    0.0  19    - skb_re    -      - 23144 00:00:00

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144382-1-1.html 上篇帖子: kafka学习四: flume+kafka+storm 下篇帖子: 日志系统系列flume-ng+kafka+storm+HDFS搭建实验
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表