设为首页 收藏本站
查看: 1221|回复: 0

[经验分享] elasticsearch-river-kafka 插件的环境配置和使用

[复制链接]

尚未签到

发表于 2017-5-21 08:26:15 | 显示全部楼层 |阅读模式
1.elasticsearch-river-kafka 插件的安装
Github地址:https://github.com/endgameinc/elasticsearch-river-kafka
elasticsearch-river-kafka 插件的安装与其他插件一样
cd $ELASTICSEARCH_HOME
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka



 
插件更新
cd $ELASTICSEARCH_HOME
./bin/plugin -remove elasticsearch-river-kafka
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka



 
2.river节点的配置
配置river节点的时候,river节点和非river节点都要配置。
river节点:在es的配置文件中添加下面几行
#node.river: _none_    ##这一行要注释掉,表示为river节点
threadpool:
    bulk:
        type: fixed
        size: 60
        queue_size: 1000



 
非river节点:在es的配置文件中添加下面几行
node.river: _none_    ##这一行要解注,表示该节点不是river节点
threadpool:
    bulk:
        type: fixed
        size: 60
        queue_size: 1000



注意:一般,不会将数据落在river节点上(即node.data: false),但测试环境上就无所谓了,机器资源又紧张。
          节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点
 
3.elasticsearch-river-kafka 插件的开发
社区中的elasticsearch-river-kafka 插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。
那么这个时候,我们就得自己去开发elasticsearch-river-kafka 插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka 插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述
@Override
    public String name() {
        return "river-kafka";
    }
    @Override
    public String description() {
        return "River Kafka Plugin";
    }



 
2)es-plugin.properties配置文件
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin



 
3)KafkaRiverModule
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来
public void onModule(RiversModule module) {
        module.registerRiver("kafka", KafkaRiverModule.class);
 }



KafkaRiverModule必须继承AbstractModule 。在KafkaRiverModule中会生成一个KafkaRiver。KafkaRiver是River接口的实现。
public class KafkaRiverModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(River.class).to(KafkaRiver.class).asEagerSingleton();
    }
}



 
4)KafkaRiver
    – KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
    – KafkaRiver只提供两个方法:start和close。
    – AbstractRiverComponent 用于initialize kafkariver的logger、river名、river的配置
    – 构造函数通过@Inject注入river所需要的一切东西:RiverName, RiverSettings、logger、自定义的配置信息
      (这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
    – 在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
    – kafkaConsumer用来定义从kafka中读取数据时的用户操作。
    – ElasticsearchProducer用来定义将数据写入ES时的用户操作。
public class KafkaRiver extends AbstractRiverComponent implements River {
    private BasicProperties properties;
    private KafkaConsumer kafkaConsumer;
    private ElasticsearchProducer elasticsearchProducer;
    private static  ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    private Thread riverMonitorThread;
    private KafkaRiverSubMonitor kafkaRiverSubMonitor;
    private Thread thread;
    private ESLogger logger;
    @Inject
    protected KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
        super(riverName, settings);
        this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
        properties = new BasicProperties(settings);
        elasticsearchProducer = new ElasticsearchProducer(client, properties);
        kafkaConsumer = new KafkaConsumer(riverName, properties, elasticsearchProducer);
    }
    @Override
 
    public void start() {
     //启动KafkaRiver的线程
        try {
            logger.info("MHA: Starting Kafka Worker...");
            thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "kafka_river").newThread(kafkaConsumer);
            thread.start();
        } catch (Exception ex) {
            logger.error("Unexpected Error occurred", ex);
            throw new RuntimeException(ex);
        }
    }
     ......
}



 
4.kafka→river→es的数据存储
通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。
注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类
curl -XPUT 'http://localhost:9200/_river/baymaxriver1/_meta' -d '{
    "type": "kafka",
    "kafka": {
        "topic" : "test",
        "numOfConsumer" : "2",
        "zk.connect" : "10.10.10.10:2181",
        "zk.session.timeout.ms" : "50000",
        "zk.sync.time.ms" : "200",
        "zk.auto.commit.interval.ms" : "1000",
        "zk.auto.commit.enable" : "true",
        "zk.auto.offset.reset" : "smallest",
        "zk.fetch.message.max.bytes" : "5242880",
        "serializer" : "com.test.elasticsearch.river.kafka.serializer.AASerializer"
    },
     "elasticsearch" : {
        "indexName" : "stringfortest",
        "indexType" : "message1",
        "batch_size" : "500",
        "handling_batch_coresize" : "2",
        "handling_batch_maximumPoolSize" : "2",
        "handling_batch_keepAliveTime" : "600",
        "handling_batch_queueSize" : "10",
        "es_bulk_timeout" : "5"
    }
}'



上述指令中主要配置信息的说明:
kafka中  →
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中  →
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果
{"_index":"_river",
 "_type":"baymaxriver1",
 "_id":"_meta",
 "_version":1,
 "created":true
}



 
查看river的元数据:http://ip:9200/_river/rivername/_meta
 
删除一条river
curl -XDELETE 'http://localhost:9200/_river/rivername'


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379464-1-1.html 上篇帖子: zabbix 监控kafka方法 下篇帖子: flume与kafka集成
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表