wolong 发表于 2019-1-30 10:07:20

Flume NG 学习笔记(七)Sink Processors(故障转移与负载均衡)测试

版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]
  Sink groups允许组织多个sink到一个实体上。 Sink processors能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败的情况下能够进行故障转移从一个Sink到另一个Sink。
  简单的说就是一个source 对应一个Sinkgroups,即多个sink,这里实际上与第六节的复用/复制情况差不多,只是这里考虑的是可靠性与性能,即故障转移与负载均衡的设置。
  下面是官方配置:
Property Name
Default
Description
sinks

Space-separated list of sinks that are participating in the group
processor.type
default
The component type name, needs to be default, failover or load_balance
  从参数类型上可以看出有3种Processors类型:default, failover(故障转移)和 load_balance(负载均衡),当然,官网上说目前自定义processors 还不支持。
  下面是官网例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
一、Default Sink Processor
  DefaultSink Processor 接收单一的Sink,不强制用户为Sink创建Processor,前面举了很多例子。所以这个就不多说了。
二、Failover Sink Processor(故障转移)
  FailoverSink Processor会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。
  故障转移的工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。
  在这配置中,要设置sinkgroups processor为failover,需要为所有的sink分配优先级,所有的优先级数字必须是唯一的,这个得格外注意。此外,failover time的上限可以通过maxpenalty 属性来进行设置。
  下面是官网配置:
Property Name
Default
Description
sinks

Space-separated list of sinks that are participating in the group
processor.type
default
The component type name, needs to be failover
processor.priority.

must be one of the sink instances associated with the current sink group
processor.maxpenalty
30000
(in millis)
  下面是官网例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.maxpenalty=10000
  这里首先要申明一个sinkgroups,然后再设置2个sink ,k1与k2,其中2个优先级是5和10,而processor的maxpenalty被设置为10秒,默认是30秒。‘
  下面是测试例子
  

   view plain copy

[*]  #配置文件:failover_sink_case13.conf
[*]  #Name the components on this agent
[*]  a1.sources= r1
[*]  a1.sinks= k1 k2
[*]  a1.channels= c1 c2
[*]     
[*]  a1.sinkgroups= g1
[*]  a1.sinkgroups.g1.sinks= k1 k2
[*]  a1.sinkgroups.g1.processor.type= failover
[*]  a1.sinkgroups.g1.processor.priority.k1= 5
[*]  a1.sinkgroups.g1.processor.priority.k2= 10
[*]  a1.sinkgroups.g1.processor.maxpenalty= 10000
[*]     
[*]  #Describe/configure the source
[*]  a1.sources.r1.type= syslogtcp
[*]  a1.sources.r1.port= 50000
[*]  a1.sources.r1.host= 192.168.233.128
[*]  a1.sources.r1.channels= c1 c2
[*]     
[*]  #Describe the sink
[*]  a1.sinks.k1.type= avro
[*]  a1.sinks.k1.channel= c1
[*]  a1.sinks.k1.hostname= 192.168.233.129
[*]  a1.sinks.k1.port= 50000
[*]     
[*]  a1.sinks.k2.type= avro
[*]  a1.sinks.k2.channel= c2
[*]  a1.sinks.k2.hostname= 192.168.233.130
[*]  a1.sinks.k2.port= 50000
[*]  # Usea channel which buffers events in memory
[*]  a1.channels.c1.type= memory
[*]  a1.channels.c1.capacity= 1000
[*]  a1.channels.c1.transactionCapacity= 100
  

  

  这里设置了2个channels与2个sinks ,关于故障转移的设置直接复制官网的例子。我们还要配置2个sinks对于的代理。这里的2个接受代理我们沿用之前第六章复制的2个sink代理配置。
  下面是第一个接受复制事件代理配置
  

   view plain copy

[*]  #配置文件:replicate_sink1_case11.conf
[*]  # Name the components on this agent
[*]  a2.sources = r1
[*]  a2.sinks = k1
[*]  a2.channels = c1
[*]     
[*]  # Describe/configure the source
[*]  a2.sources.r1.type = avro
[*]  a2.sources.r1.channels = c1
[*]  a2.sources.r1.bind = 192.168.233.129
[*]  a2.sources.r1.port = 50000
[*]     
[*]  # Describe the sink
[*]  a2.sinks.k1.type = logger
[*]  a2.sinks.k1.channel = c1
[*]     
[*]  # Use a channel which buffers events inmemory
[*]  a2.channels.c1.type = memory
[*]  a2.channels.c1.capacity = 1000
[*]  a2.channels.c1.transactionCapacity = 100
  

  

  下面是第二个接受复制事件代理配置:
  

   view plain copy

[*]  #配置文件:replicate_sink2_case11.conf
[*]  # Name the components on this agent
[*]  a3.sources = r1
[*]  a3.sinks = k1
[*]  a3.channels = c1
[*]     
[*]  # Describe/configure the source
[*]  a3.sources.r1.type = avro
[*]  a3.sources.r1.channels = c1
[*]  a3.sources.r1.bind = 192.168.233.130
[*]  a3.sources.r1.port = 50000
[*]     
[*]  # Describe the sink
[*]  a3.sinks.k1.type = logger
[*]  a3.sinks.k1.channel = c1
[*]     
[*]  # Use a channel which buffers events inmemory
[*]  a3.channels.c1.type = memory
[*]  a3.channels.c1.capacity = 1000
[*]  a3.channels.c1.transactionCapacity = 100
  

  

  #敲命令
  首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
  flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
  在启动源发送的代理
  flume-ng agent -cconf -f conf/failover_sink_case13.conf -n a1 -Dflume.root.logger=INFO,console
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "hello failoversink" | nc 192.168.233.128 50000
  #在启动源发送的代理终端查看console输出
  因为k1的优先级是5,K2是10因此当K2正常运行的时候,是发送到K2的。下面数据正常输出。
http://img.blog.csdn.net/20141029113049250?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  然后我们中断K2的代理进程。
http://img.blog.csdn.net/20141029113121527?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  再尝试往侦听端口送数据
  echo "hello close k2"| nc 192.168.233.128 50000
  我们发现源代理发生事件到K2失败,然后他将K2放入到failover list(故障列表)
http://img.blog.csdn.net/20141029113231875?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  因为K1还是正常运行的,因此这个时候他会接收到数据。
http://img.blog.csdn.net/20141029113212414?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  然后我们再打开K2的大理进程,我们继续往侦听端口送数据
  echo " hello open k2 again" | nc192.168.233.128 50000
http://img.blog.csdn.net/20141029113244222?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  数据正常发生,Failover SinkProcessor测试完毕。
三、Load balancing SinkProcessor
  负载均衡片处理器提供在多个Sink之间负载平衡的能力。实现支持通过round_robin(轮询)或者random(随机)参数来实现负载分发,默认情况下使用round_robin,但可以通过配置覆盖这个默认值。还可以通过集成AbstractSinkSelector类来实现用户自己的选择机制。
  当被调用的时候,这选择器通过配置的选择规则选择下一个sink来调用。
  下面是官网配置
Property Name
Default
Description
processor.sinks

Space-separated list of sinks that are participating in the group
processor.type
default
The component type name, needs to be load_balance
processor.backoff
false
Should failed sinks be backed off exponentially.
processor.selector
round_robin
Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut
30000
Used by backoff selectors to limit exponential backoff (in milliseconds)
  下面是官网的例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=random
  这个与故障转移的设置差不多。
  下面是测试例子
  

   view plain copy

[*]  #配置文件:load_sink_case14.conf
[*]  # Name the components on this agent
[*]  a1.sources = r1
[*]  a1.sinks = k1 k2
[*]  a1.channels = c1
[*]     
[*]  a1.sinkgroups = g1
[*]  a1.sinkgroups.g1.sinks = k1 k2
[*]  a1.sinkgroups.g1.processor.type =load_balance
[*]  a1.sinkgroups.g1.processor.backoff = true
[*]  a1.sinkgroups.g1.processor.selector =round_robin
[*]     
[*]  # Describe/configure the source
[*]  a1.sources.r1.type = syslogtcp
[*]  a1.sources.r1.port = 50000
[*]  a1.sources.r1.host = 192.168.233.128
[*]  a1.sources.r1.channels = c1
[*]     
[*]  # Describe the sink
[*]  a1.sinks.k1.type = avro
[*]  a1.sinks.k1.channel = c1
[*]  a1.sinks.k1.hostname = 192.168.233.129
[*]  a1.sinks.k1.port = 50000
[*]     
[*]  a1.sinks.k2.type = avro
[*]  a1.sinks.k2.channel = c1
[*]  a1.sinks.k2.hostname = 192.168.233.130
[*]  a1.sinks.k2.port = 50000
[*]  # Use a channel which buffers events inmemory
[*]  a1.channels.c1.type = memory
[*]  a1.channels.c1.capacity = 1000
[*]  a1.channels.c1.transactionCapacity = 100
  

  这里要说明的是,因此测试的是负载均衡的例子,因此这边使用一个channel来作为数据传输通道。这里sinks的对应的接收数据的代理配置,我们沿用故障转移的接收代理配置。
  #敲命令
  首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
  flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1
  -Dflume.root.logger=INFO,console
  flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1
  -Dflume.root.logger=INFO,console
  在启动源发送的代理
  flume-ng agent -cconf -f conf/load_sink_case14.conf -n a1
  -Dflume.root.logger=INFO,console
  
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "loadbanlancetest1" | nc 192.168.233.128 50000
  echo "loadbantest2" | nc 192.168.233.128 50000
  echo "loadban test3"| nc 192.168.233.128 50000
  echo "loadbantest4" | nc 192.168.233.128 50000
  echo "loadbantest5" | nc 192.168.233.128 50000
  #在启动源发送的代理终端查看console输出
  其中K1收到3条数据
http://img.blog.csdn.net/20141029113316062?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

  其中K1收到2条数据
http://img.blog.csdn.net/20141029113346077?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  

因为我们负载均衡选择的类型是轮询,因此可以看出flume 让代理每次向一个sink发送2次事件数据后就换另一个sinks 发送。
Sink Processors测试完毕
  




roxettewu 发表于 2019-2-28 20:49:37

感谢楼主的分享!
页: [1]
查看完整版本: Flume NG 学习笔记(七)Sink Processors(故障转移与负载均衡)测试