实时事件统计项目:优化solr和morphline的时间字段
SOLR_LOCATOR : {# Name of solr collection
collection : event_count_records
# ZooKeeper ensemble
#CDH的专有写法,开源版本不支持。
zkHost : "$ZK_HOST"
}
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
#Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json
readJson{}
}
{
#读出来的json字段必须转换成filed才能被solr索引到
extractJsonPaths {
flatten:true
paths:{
account:/account
accountName:/accountName
subaccount:/subaccount
subaccountName:/subaccountName
eventTime:/timestamp
eventType:/eventType
eventTags:"/eventTags[]/name"
#按UTC时间存timestamp
eventTimeInMinuteUTC_tdt:/timestamp
#按China时间存timestamp
eventTimeInMinuteChina_tdt:/timestamp
#按UTC时间存timestamp
eventTimeInHourUTC_tdt:/timestamp
#_tdt后缀会被动态识别为日期类型的索引字段
#按不同时间间隔存索引以增加查询性能
}
}
}
#转换long型时间为Date格式
{convertTimestamp {
field : eventTimeInMinuteChina_tdt
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"
outputTimezone : Asia/Shanghai
}}
{convertTimestamp {
field : eventTimeInMinuteUTC_tdt
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"
outputTimezone : UTC
}}
{convertTimestamp {
field : eventTimeInHourUTC_tdt
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"
outputTimezone : UTC
}}
#kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存
{toString { field : _attachment_body }}
#为每一条记录生成一个UUID
{generateUUID {
field :>
}}
#对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
renameToPrefix:"tws_"
}
}
#将数据导入到solr中
{loadSolr {solrLocator : ${SOLR_LOCATOR}}}
]
}
]
页:
[1]