Elasticsearch 与 Kafka 整合剖析
public class EsProducer { private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class); private final KafkaConsumer consumer; private ExecutorService executorService; private Configuration conf = null; private static int counter = 0; public EsProducer() {String root = System.getProperty("user.dir") + "/conf/";
String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");
conf = Configuration.from(new File(root + path));
Properties props = new Properties();
props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));
props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));
} public void execute() {
executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads")); while (true) {
ConsumerRecords records = consumer.poll(100); if (null != records) {
executorService.submit(new KafkaConsumerThread(records, consumer));
}
}
} public void shutdown() { try { if (consumer != null) {
consumer.close();
} if (executorService != null) {
executorService.shutdown();
} if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.error("Shutdown kafka consumer thread timeout.");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
} class KafkaConsumerThread implements Runnable { private ConsumerRecords records; public KafkaConsumerThread(ConsumerRecords records, KafkaConsumer consumer) { this.records = records;
}
@Override public void run() {
String index = conf.getString("job.content.writer.parameter.index");
String type = conf.getString("job.content.writer.parameter.type"); for (TopicPartition partition : records.partitions()) {
List partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) {
JSONObject json = JSON.parseObject(record.value());
List list = new ArrayList();
Map map = new HashMap();
index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));
if (counter < 10) {
LOG.info("Index : " + index);
counter++;
}
for (String key : json.keySet()) { if ("_uid".equals(key)) {
map.put("uid", json.get(key));
} else {
map.put(key, json.get(key));
}
list.add(map);
}
EsUtils.write2Es(index, type, list);
}
}
}
}
}
页:
[1]