设为首页 收藏本站
查看: 1194|回复: 0

[经验分享] CDH5.1从采集(Flume)到索引(Solr)

[复制链接]

尚未签到

发表于 2015-11-27 18:03:40 | 显示全部楼层 |阅读模式
  众所周知,CDH是Cloudera公司推出的基于稳定版的ApachHadoop环境。并且比官方的Hadoop更新更快。坚持季度update,年度release。其中集成的Hadoop生态系统中的所有组件互相兼容,并坚持更新其中的bug和feature,在学习测试、生产环境中都有很好的应用。
  如果在生产环境中需要用到Hadoop,自行部署Hadoop,在其上再部署类似HBase、Flume、Impala、Hive等等,不但部署周期长,遇到问题在国内也很少能找到相应的解决办法。因为其相互的版本兼容就存在很大问题。所以笔者强烈推荐使用CDH的Hadoop。
  CDH可以通过图形界面轻松部署Hadoop生态系统中的其他组件,对于初学者部署时间不超过两天。
  本文旨在介绍在CDH5.1版本之上,使用Flume1.5采集文本数据,通过Morphline(ETL工具)进行分词,并存入Solr中创建索引。只通过配置即可,无需进行二次开发。
  一、           环境搭建
  1、 Linux:CentOS release6.5 X64(Final)
  2、 CDH:CDH5,需要Cloudera Manager和CDH两个包,下载地址:http://archive-primary.cloudera.com/cm5/cm/5/
  3、  JDK:1.7以上
  二、           模拟应用场景
  1、 从Flume的日志文件中摘取出部分内容形成一个原始的被采集文件。
  2、 Flume采集源文件。
  3、 通过编写正则表达式通过Flume自带的Morphline插件,将日志的每一行进行拆分。
  4、 拆分成timestamp/classname/msg/message
  5、 并且将timestamp再次拆分成
  date_year/date_month/date_day/date_hour/date_minute/date_second
  6、 将拆分结果存入Solr建立索引。
  原始日志文件每行内容类似如下:
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
  org.kitesdk.morphline.stdlib.LogInfoBuilder$LogInfo:found no grok match: [{message=[         atcom.typesafe.config.impl.Parseable.parse(Parseable.java:204)]}]
  </pre>
  三、           搭建采集环境(Flume)
  下图是Flume的工作原理结构图,简单了解一下。Flume-NG由三部分组成:Source、Channel、Sink。
  
  Source负责采集源数据,将源数据放入Channel。
  Channel负责暂存数据,供Sink使用。
  Sink负责从Channel读取数据并根据需要发送出去。
  以上三个部分Flume官方都提供多种类型可供使用,开发者可以根据不同需要选择不同类型的组件。并且都支持Plugin形式的二次开发,可以自定义Source、Channel和Sink。具体详细参见官方网站:http://flume.apache.org/FlumeUserGuide.html。
  进入Flumeà选择一个Slaveà配置
  代理名称填写a5
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
  a5.sources = r5//定义source
  a5.sinks = k5//定义sink
  a5.channels = c5//定义channel
  
  a5.sources.r5.type = exec//定义source类型,为执行命令。
  a5.sources.r5.channels = c5//指定source采集到的数据放入的channel
  a5.sources.r5.command = tail -F/var/log/flume-ng/input_file_test.log//source采集的文件
  
  a5.sources.r5.interceptors = i5_2 i5_3 i5_1//定义flume的拦截器
  a5.sources.r5.interceptors.i5_3.type = static//拦截器类型为static(静态)
  a5.sources.r5.interceptors.i5_3.key = BodyType//定义拦截器的key
  a5.sources.r5.interceptors.i5_3.value = line//定义拦截器的value
  a5.sources.r5.interceptors.i5_2.type = host//flume自带的拦截器(指定的被采集对象的ip地址)
  a5.sources.r5.interceptors.i5_1.type = static
  a5.sources.r5.interceptors.i5_1.key = sourceType
  a5.sources.r5.interceptors.i5_1.value = fj
  
  a5.sinks.k5.channel = c5//指定本sink从哪个channel中读取数据
  a5.sinks.k5.type= org.apache.flume.sink.solr.morphline.MorphlineSolrSink//指定sink类型,MorphlineSolrSink
  a5.sinks. k5.morphlineFile = morphlines.conf//指定morphline的配置文件路径,如果不加前面的路径,则默认读取该flume.conf文件所在同级路径的morphline文件。
  a5.sinks. k5.morphlineId = morphline1
  a5.sinks. k5.batchSize = 100//当channel中存在100条数据开始处理
  a5.sinks. k5.batchDurationMillis = 1000//当channel中数据存在超过1000ms时开始处理。batchSize与batchDurationMillis采取优先原则,哪个参数&#20540;先到,哪个先处理。
  
  a5.channels.c5.type = memory//指定channel类型,本例为内存
  a5.channels.c5.capacity = 1000000//指定channel存储数据的能力,本例为1000000条。
  a5.channels.c5.transactionCapacity = 100000//每个事务发给sink的最大event数。
  a5.channels.c5.keep-alive = 30//指定channel存活时间30s,超过设定&#20540;将日出event。
  </pre>
  四、           Morphline分词处理
  笔者理解Morphline是一个ETL工具,在本例中的作用如下:
  E(Extract):用readLine命令读取flume采集回的日志文件。
  T(Transform):利用正则表达式对每行日志文件进行拆分。
  L(Load):将拆分后的结果加载到Solr中创建索引。
  
  Morphline配置文件说明详见代码中的注释。
  
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
  
  SOLR_LOCATOR: {
  # Name of solr collection
  # 指定将索引创建到Solr中的哪个collection中。
  collection : collection1
  
  # ZooKeeper ensemble
  # 配置zookeeper地址
  zkHost :&quot;slave2.CDH:2181,slave3.CDH:2181,slave4.CDH:2181/solr&quot;
  }
  
  morphlines:
  [
  {
  #morphlineId,与Flume采集配置中的a5.sinks.k5.morphlineId对应
  id : morphline1
  # 导入commands的所有包,Morphline将所有的命令放在了org.kitesdk.包下
  # 需要使用以下命令时必须导入这些命令所在的jar包。
  # 与Solr命令相关的命令包在org.apache.solr下。
  importCommands :[&quot;org.kitesdk.**&quot;, &quot;org.apache.solr.**&quot;]
  
  commands : [
  {
  # Parse inputattachment and emit a record for each input line
  # 读一行数据,并设置成UTF-8
  readLine {
  charset :UTF-8
  }
  }
  # 打印测试数据
  { logInfo { format :&quot;output record: {}&quot;, args : [&quot;@{}&quot;] } }
  {
  # 分词命令
  grok {
  # 导入字典数据,配置路径,可配置多个并用“,”分隔。
  # grok-dictionary.conf中存储的是正则表达式采用K-V方式存储,
  # 文件中定义原子正则表达式,新定义的正则可以引用其他的原子正则表达式。使用%{}方式引用。
  # 读者可以详细参见该文件。
  dictionaryFiles :[/home/flume-ng/conf/grok-dictionary.conf]
  expressions : {
  # 拆分原始数据,以下为固定&#26684;式。flume会将读到的每行数据默认存入一个message的变量中。
  # 大括号中冒号前是grok-dictionary.conf文件中的正则标签,冒号后是solr中的field。
  message :&quot;&quot;&quot;%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel}%{DATA:classname}: %{GREEDYDATA:msg}&quot;&quot;&quot;
  }
  }
  
  }
  {
  # tryRules是异常处理功能,因为本例中要将时间戳做进一步拆分,拆分成年、月、日、时、分、秒等。
  # 不同数据的时间戳&#26684;式又不禁相同,所以可以对时间戳&#26684;式进行穷举。不同&#26684;式的时间戳就可以拆分了。
  tryRules {
  throwExceptionIfAllRulesFailed: true
  rules : [
  {
  commands : [
  {
  grok {
  dictionaryFiles: [/home/flume-ng/conf/grok-dictionary.conf]
  expressions :{
  timestamp:&quot;%{YEAR:date_year}-%{MONTHNUM:date_month}-%{MONTHDAY:date_day}[T]%{HOUR:date_hour}:?%{MINUTE:date_minute}(?::?%{SECOND:date_second})&quot;
  }
  }
  }
  ]
  }
  {
  commands : [
  {
  grok {
  dictionaryFiles: [/home/flume-ng/conf/grok-dictionary.conf]
  expressions :{
  timestamp : &quot;%{YEAR:date_year}-%{MONTHNUM:date_month}-%{MONTHDAY:date_day}[T]%{HOUR:date_hour}:?%{MINUTE:date_minute}(?::?%{SECOND:date_second})&quot;
  }
  }
  }
  ]
  }
  {
  commands : [
  {
  grok {
  dictionaryFiles: [/home/flume-ng/conf/grok-dictionary.conf]
  expressions :{
  timestamp:&quot;%{YEAR:date_year}/%{MONTHNUM:date_month}/%{MONTHDAY:date_day}[T]%{HOUR:date_hour}:?%{MINUTE:date_minute}(?::?%{SECOND:date_second})&quot;
  }
  }
  }
  ]
  }
  ]
  }
  
  }
  
  {
  # 转换时间&#26684;式,将输入时间转换成Solr识别的&#26684;式&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;
  convertTimestamp {
  # Solr中的时间戳field:timestamp
  field : timestamp
  inputFormats :[&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;, &quot;yyyy-MM-ddHH:mm:ss&quot;,&quot;yyyy/MM/dd HH:mm:ss&quot;]
  outputFormat :&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;
  }
  }
  {
  # Solr中的索引主键field:id
  generateUUID {
  field : id
  }
  }
  
  # schema.xml.
  {
  sanitizeUnknownSolrFields {
  # Location from which to fetchSolr schema
  solrLocator : ${SOLR_LOCATOR}
  
  renameToPrefix :&quot;ignored_&quot;
  }
  }
  
  # load the record into a SolrServer orMapReduce SolrOutputFormat.
  {
  loadSolr {
  solrLocator :${SOLR_LOCATOR}
  }
  }
  ]
  }
  ]
  </pre>
  五、           编辑Solr的schema.xml文件
  Solr的schema.xml文件一般存放在/etc/solr/instance.conf/conf目录。
  编辑schema.xml文件
  将必须的field进行更新。
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
  <field name=&quot;timestamp&quot; type=&quot;date&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_year&quot; type=&quot;int&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_month&quot; type=&quot;int&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_day&quot; type=&quot;int&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_hour&quot; type=&quot;int&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_minute&quot; type=&quot;int&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <field name=&quot;date_second&quot; type=&quot;string&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <!—flume配置文件中官方提供的host拦截器-->
  <field name=&quot;host&quot; type=&quot;string&quot;indexed=&quot;true&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  <!—flume配置文件中的一个静态拦截器-->
  <field name=&quot;sourceType&quot;type=&quot;string&quot; indexed=&quot;true&quot; stored=&quot;true&quot; omitNorms=&quot;true&quot;/>
  <!—Morphline文件中的message-->
  <field name=&quot;message&quot; type=&quot;string&quot;indexed=&quot;false&quot; stored=&quot;true&quot;omitNorms=&quot;true&quot;/>
  </pre>
  以上文件编辑完成并保存后需要上传并重新加载一下这个collection。
  使用如下命令:
  solrctl instancedir --update myconfig/etc/solr/instance.conf
  solrctl collection --reload collection1
  
  以上即为在CDH环境中部署Flume采集数据,利用Morphline进行分词,并加载到Solr中创建索引的过程。
  文中如有错误或更好的方案欢迎交流。
  QQ:58431505  //认证请注明:Solr交流

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144301-1-1.html 上篇帖子: Flume-ng-1.3.0 spooling source的方式增加了对目录的递归检测的支持 下篇帖子: flume指南
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表