flume kafka-sink high cpu
flume sink到kfka时候,导致cpu过高,以下是分析原因:一、flume的kafka的sink cpu太高分析:
1、获取flume的进程id
$ top
top - 10:17:58 up 14 days, 18:09,2 users,load average: 1.37, 1.11, 0.65
Tasks: 436 total, 1 running, 435 sleeping, 0 stopped, 0 zombie
Cpu(s): 12.1%us,2.6%sy,0.0%ni, 84.1%id,0.3%wa,0.0%hi,0.8%si,0.0%st
Mem:32751584k total, 32543520k used, 208064k free, 58760k buffers
Swap: 16383996k total, 0k used, 16383996k free, 29961680k cached
PID USER PRNIVIRTRESSHR S %CPU %MEM TIME+COMMAND
6330 kafka 20 0566m 122m 2392 S 124.20.4 5481:12 kafka_logtailer
18588 root 20 0 7913m 713m16m S 110.62.210:08.62 java
拿到flume的进程18588
2、查看哪个线程导致cpu过高:
$ ps -mp 18588 -o THREAD,tid,time
USER %CPU PRI SCNT WCHANUSER SYSTEM TID TIME
root 110 - - - - - - 00:12:05
。。。。。。。。。。。。。。
root 92.019 - - - - 18658 00:10:04
root 0.019 - futex_ - - 18659 00:00:00
root 0.019 - futex_ - - 18662 00:00:00
root 0.019 - futex_ - - 18664 00:00:00
发现线程id18658 这个导致cpu过高
3、利用jstack分析堆栈信息:
输出十六进制,并根据jstack命令输出堆栈
$ printf "%x\n" 18658
48e2
$ jstack 18588 |grep 48e2 -A 30
"SinkRunner-PollingRunner-DefaultSinkProcessor" #42 prio=5 os_prio=0 tid=0x00007fd1cc2f4000 nid=0x48e2 runnable
java.lang.Thread.State: RUNNABLE
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:93)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
发现KafkaSink.process方法导致cpu过高
4、利用strace命令跟踪:
$ strace -o log.strace -Ttt -p 18658
打开log.strace有大量的读写操作文件内容如下:
10:25:53.594363 write(127, "\177\0\0\0$\27\r\2\0\0\0\21E\241\30bO\1\0\0\31A\214}\366N\1\0\0\n\r\37"..., 41) = 41 <0.000020>
10:25:53.594412 lseek(129, 199509196, SEEK_SET) = 199509196 <0.000009>
10:25:53.594441 read(129, "\177", 1) = 1 <0.000010>
10:25:53.594472 read(129, "\0", 1) = 1 <0.000009>
10:25:53.594502 read(129, "\0", 1) = 1 <0.000009>
10:25:53.594532 read(129, "\4", 1) = 1 <0.000009>
10:25:53.594561 read(129, "\350", 1) = 1 <0.000009>
10:25:53.594591 read(129, "\27\r\1\0\0\0\21\332x\30bO\1\0\0\31,\214}\366N\1\0\0\315\t\n\301\t\22\276\t"..., 1256) = 1256 <0.000012>
10:25:53.594645 lseek(127, 0, SEEK_CUR) = 199527334 <0.000009>
10:25:53.594674 lseek(127, 0, SEEK_CUR) = 199527334 <0.000009>
10:25:53.594703 fstat(127, {st_mode=S_IFREG|0664, st_size=200168807, ...}) = 0 <0.000008>
10:25:53.594744 write(127, "\177\0\0\0$\27\r\2\0\0\0\21E\241\30bO\1\0\0\31B\214}\366N\1\0\0\n\r\37"..., 41) = 41 <0.000014>
10:25:53.594785 lseek(129, 199510457, SEEK_SET) = 199510457 <0.000008>
5、修改kafka sink源码:
———---------------------------a、之前代码:---------------------------------------------
public Status processOld() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;
try {
long processedEvents = 0;
transaction = channel.getTransaction();
transaction.begin();
messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in channel
break;
}
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR);
if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey
+ " : " + new String(eventBody, "UTF-8"));
logger.debug("event #{}", processedEvents);
}
JsonObject info = new JsonObject();
JsonObject tags = new JsonObject();
Gson gson = new Gson();
JsonElement je = gson
.toJsonTree(new String(eventBody, "UTF-8"));
info.add("message", je);
String hostName = InetAddress.getLocalHost().getHostName();
je = gson.toJsonTree(hostName);
tags.add("hostname", je);
je = gson.toJsonTree("web-ubt");
tags.add("logname", je);
je = gson.toJsonTree(System.currentTimeMillis());
tags.add("event_time", je);
je = gson.toJsonTree(tags);
info.add("tags", je);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
eventTopic, eventKey, info.toString().getBytes());
messageList.add(data);
}
// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime)
/ (1000 * 1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList
.size()));
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
————————————b、修改后代码:—————————————————————————
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Transaction transaction = null;
try {
Channel channel = getChannel();
Event event = null;
transaction = channel.getTransaction();
transaction.begin();
event = channel.take();
if (event == null) {
// no events available in channel
transaction.commit();
return result;
}
messageList.add(putkeyMessage(event));
transaction.commit();
int size = messageList.size();
if (size == 0) {
result = Status.BACKOFF;
counter.incrementBatchUnderflowCount();
} else {
if (size < batchSize) {
counter.incrementBatchUnderflowCount();
} else {
counter.incrementBatchCompleteCount();
}
counter.addToEventDrainAttemptCount(size);
}
// publish batch and commit.
if (size == batchSize) {
producer.send(messageList);
messageList.clear();
}
counter.addToEventDrainSuccessCount(size);
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
private KeyedMessage<String, byte[]> putkeyMessage(Event event) throws Exception {
String eventTopic = null;
String partitionKey = null;
KeyedMessage<String, byte[]> data = null;
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
partitionKey = headers.get(KEY_HDR);
byte[] eventBody = event.getBody();
JsonObject info = new JsonObject();
JsonObject tags = new JsonObject();
Gson gson = new Gson();
JsonElement je = gson.toJsonTree(new String(eventBody, "UTF-8"));
info.add("message", je);
String hostName = InetAddress.getLocalHost().getHostName();
je = gson.toJsonTree(hostName);
tags.add("hostname", je);
je = gson.toJsonTree("web");
tags.add("logname", je);
je = gson.toJsonTree(System.currentTimeMillis());
tags.add("event_time", je);
je = gson.toJsonTree(tags);
info.add("tags", je);
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, byte[]>(eventTopic, info.toString().getBytes());
} else {
data = new KeyedMessage<String, byte[]>(eventTopic, partitionKey, info.toString().getBytes());
}
return data;
}
6、最后前后cpu对比:
a、修改后的:
$ ps -mp 27116 -o THREAD,tid,time
USER %CPU PRI SCNT WCHANUSER SYSTEM TID TIME
root 14.8 - - - - - - 00:03:29
…………………..
root 0.019 - futex_ - - 27184 00:00:00
root 57.719 - sync_p - - 27185 00:02:07
b、修改前的:
$ ps -mp 18588 -o THREAD,tid,time
USER %CPU PRI SCNT WCHANUSER SYSTEM TID TIME
root 107 - - - - - - 00:26:54
…..
root 0.019 - futex_ - - 18657 00:00:00
root 90.919 - sync_p - - 18658 00:22:42
root 0.019 - futex_ - - 18659 00:00:00
root 0.019 - futex_ - - 18662 00:00:00
root 0.019 - futex_ - - 18664 00:00:00
root 0.019 - skb_re - - 23144 00:00:00
页:
[1]