设为首页 收藏本站
查看: 1412|回复: 0

[经验分享] 纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

[复制链接]

尚未签到

发表于 2017-12-20 07:37:04 | 显示全部楼层 |阅读模式
  当前大多数企业版hadoop的solr版本都还停留在solr4.x,由于这个版本的solr本身的bug较多,使用起来会出很多奇怪的问题。如部分更新日期字段失败的问题。
  最新的solr版本不仅修复了以前的一些常见bug,还提供了更简便易用的功能,如ManagedSchema替代schema.xml来管理索引的schema。
  由于solr自带的接口和入库工具需要一些定制开发,所以通常用flume来作为数据采集的工具。数据流图如下:
DSC0000.png

  具体见前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引》
  在Cloudera等企业版hadoop中,Solr和Flume已经集成,并能互通。如果你目前的情况是使用Cloudera企业版,请看上面这篇文章。
  然而由于集成的版本跟不上开源社区最新版本,还是很嫌弃的。于是就有了下面的配置最新版本的Solr和Flume互通:
  1.Solr最新版服务部署及入门:
  见solr官网quickstart。
  http://lucene.apache.org/solr/quickstart.html
  说明:创建Solr集合的部分,不是本章重点,所以这里没有介绍。
  另,本例中和前文不同,使用的不是SolrCloud模式,而是单机的Solr。
  2.Flume最新版部署及入门
  下载地址:http://flume.apache.org/download.html
  入门介绍:https://cwiki.apache.org//confluence/display/FLUME/Getting+Started
  详细配置介绍:http://flume.apache.org/FlumeUserGuide.html
  详细配置介绍中,需要关注的是KafkaSource和MorphlineSolrSink。
  最终的flume.conf配置为:
  

kafka2solr.sources = source_from_kafka  
kafka2solr.channels
= customer_doc_channel  
kafka2solr.sinks
= solr_sink1  

  
# For each one of the sources, the type is defined  
  
kafka2solr.sources.source_from_kafka.type
= org.apache.flume.source.kafka.KafkaSource  
kafka2solr.sources.source_from_kafka.channels
= customer_doc_channel  
kafka2solr.sources.source_from_kafka.batchSize
= 100  
kafka2solr.sources.source_from_kafka.useFlumeEventFormat
=false  
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers
= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092  
kafka2solr.sources.source_from_kafka.kafka.topics
= tablecardLogin  
kafka2solr.sources.source_from_kafka.kafka.consumer.group.
id = catering_customer_core_070327  
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset
=earliest  

  
# Other config values specific to each type of channel(sink or source)  
  
# can be defined as well  
  
kafka2solr.channels.customer_doc_channel.type
= file  
kafka2solr.channels.customer_doc_channel.capacity
=10000000  
kafka2solr.channels.customer_doc_channel.checkpointDir
= /home/arli/data/flume-ng/customer_doc/checkpoint  
kafka2solr.channels.customer_doc_channel.dataDirs
= /home/arli/data/flume-ng/customer_doc/data  

  
kafka2solr.sinks.solr_sink1.type
= org.apache.flume.sink.solr.morphline.MorphlineSolrSink  
kafka2solr.sinks.solr_sink1.channel
= customer_doc_channel  
kafka2solr.sinks.solr_sink1.batchSize
= 5000  
kafka2solr.sinks.solr_sink1.batchDurationMillis
= 2000  
kafka2solr.sinks.solr_sink1.morphlineFile
= /home/arli/flume-config/morphlines.conf  
kafka2solr.sinks.solr_sink1.morphlineId
=morphline1  
kafka2solr.sinks.solr_sink1.isIgnoringRecoverableExceptions
=true  
#kafka2solr.sinks.solr_sink1.isProductionMode
=true  

  3.新建一个Flume配置目录,下面四个文件是比较重要的。
DSC0001.png

  flume.conf 来自上一节的配置。
  flume-env.sh 来自安装目录conf下的flume-env.sh.template。需要改动。
  log4j.properties 在调试过程中可以开启更低级别的日志打印。
  morphline.conf 参考Morphline的文档:http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
  4.接下来详细介绍上面的后三个配置文件。
  1)flume-env.sh
DSC0002.png

  需要改动的地方如上:
  

#默认的内存是不够的。需要扩大内存。  
export JAVA_OPTS
="-Xms100m -Xmx500m -Dcom.sun.management.jmxremote"  

  
#Flume官方下载的包少了一些Solr相关的包,需要把solr的lib目录加到flume的classpath下。
  
FLUME_CLASSPATH
="/xxx/solr-6.4.2/contrib/morphlines-core/lib/*:/xxxi/solr-6.4.2/dist/*:/xxx/solr-6.4.2/dist/solrj-lib/*:/xxx/solr-6.4.2/server/solr-webapp/webapp/WEB-INF/lib/*"  

  2)log4j.properties
  100MB改成10MB,以防打日志太多日志文件过大。
DSC0003.png

  在调试阶段,加上如下两行会省心很多,调试完再去掉。
  

log4j.logger.org.apache.flume.sink.solr=DEBUG  
log4j.logger.org.kitesdk.morphline
=TRACE  

  3)morphline.conf
  大部分和前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引》雷同。由于我使用的是单机版本的Solr,所以在配置时如下。
  注意solrUrl和solrHomeDir的配置,在官网中没有介绍(因为morphline是cloudera开发并开源的,cloudera的solr默认是solrCloud),但是在源码阅读时可以看到这两个单机solr配置参数。
  

SOLR_LOCATOR : {  
solrUrl
: "http:\/\/localhost:8983\/solr\/catering_customer_core1"  
solrHomeDir
: "/xxx/server/solr/catering_customer_core1/conf"  
}
  

  
morphlines
: [  
{
  

#customer morphline  
id : morphline1
  

  
# Import all morphline commands in these java packages and their subpackages.

  
# Other commands that may be present on the>  
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
  

  
commands : [                 
  
{
  
readJson {}
  
}
  

  
{
  
tryRules
  
{
  
catchExceptions : false
  
throwExceptionIfAllRulesFailed : true
  
rules : [
  
{
  
commands : [
  
{
  
contains {topic : [tablecardLogin] }
  
}
  

  
#field need to be indexed from json.
  
                {
  
extractJsonPaths {
  
flatten : false
  
paths : {
  
account:/account         
  
customer_id:/customerId
  
history_signin_dates:/opt_time
  
history_signin_timestamps:/opt_time
  
name:/name
  
sex:/sex
  
}
  
}
  
}
  
]
  

  
}
  

  

  
# if desired, the last rule can serve as a fallback mechanism
  
# for records that don't match any rule:
  
            {
  
commands : [
  
{ logWarn { format : "Ignoring record with unsupported input format: {}", args : ["@{}"] } }
  
{ dropRecord {} }   
  
]
  
}
  
]
  
}
  
}
  

  
{
  
convertTimestamp {
  
field : history_signin_dates
  
inputFormats : ["yyyy-MM-dd HH:mm:ss"]
  
inputTimezone : Asia/Shanghai
  
outputFormat : "yyyy-MM-dd'T'HH:mm:ss'Z/SECOND'"
  
outputTimezone : Asia/Shanghai
  
}
  
}
  

  
{
  
convertTimestamp {
  
field : history_signin_timestamps
  
inputFormats : ["yyyy-MM-dd HH:mm:ss"]
  
inputTimezone : Asia/Shanghai
  
outputFormat : "unixTimeInMillis"
  
outputTimezone : UTC
  
}
  
}
  

  
{
  
java {
  
imports : "import java.util.*;import org.kitesdk.morphline.api.Command;import org.kitesdk.morphline.api.Record;"
  
code:     """
  
Object customerId = record.getFirstValue("customer_id");
  
Object account = record.getFirstValue("account");
  
record.put("id", account + "@" + customerId);
  
return child.process(record);
  
"""
  
}
  
}
  

  
{sanitizeUnknownSolrFields {solrLocator : ${SOLR_LOCATOR}}}
  

  
#将数据导入到solr中
  
{loadSolr {solrLocator : ${SOLR_LOCATOR}}}
  

  
]
  
}
  
]
  

  4.Morphline中的sanitizeUnknownSolrFields命令需要有schema.xml才能使用。
  Solr6.4.2的schema默认是用managed-schema文件管理的。如果上面配置中的solrHomeDir目录下没有shema.xml文件,则会报错。
  好在managed-schema和之前schema.xml文件内容几乎一致。执行如下命令即可。
  

cp managed-schema schema.xml  

  5.解决Flume1.7.0和solr6.4.2的jar包冲突问题。
  Flume1.7在编译时使用的是Solr4.10.1的包,而其中lib目录下,Solrj依赖的httpcore-4.1.3包已与最新的Solrj不兼容,因此在solr目录dist/solrj-lib下找到对应的包然后替换。
DSC0004.png

  另外还需要清理的两种包:1.Flume的lib目录老的solr版本相关的包,2.若存在kite-morphline-solr-core(因为solr自己发布的版本已经包含了等价的solr-morphline-core包)则需要清理。(由于本文在写作时相应的包都已经清理了,所以记录的不够细节,望见谅。)
  6.启动flume。调试时可以先在控制台启动,去掉最后的&。
  

bin/flume-ng agent --conf ~/flume-config/ -f ~/flume-config/flume.conf  -n kafka2solr &  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425932-1-1.html 上篇帖子: json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引 下篇帖子: rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表