设为首页 收藏本站
查看: 2034|回复: 0

[经验分享] flume源码分析-SinkProcessor

[复制链接]

尚未签到

发表于 2017-5-21 14:53:12 | 显示全部楼层 |阅读模式
  flume包括三种sink processor,DefaultSinkProcessor,FailoverSinkProcessor,LoadBalancingSinkProcessor
  Default sink processor that only accepts a single sink, passing on process results without any additional handling. Suitable for all sinks that aren't assigned to a group.

public class DefaultSinkProcessor implements SinkProcessor,
ConfigurableComponent {
private Sink sink;
private LifecycleState lifecycleState;
@Override
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start();
lifecycleState = LifecycleState.START;
}
@Override
public void stop() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.stop();
lifecycleState = LifecycleState.STOP;
}

@Override
public Status process() throws EventDeliveryException {
return sink.process();
}
@Override
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}
@Override
public void configure(ComponentConfiguration conf) {
}
}

  [size=1em]FailoverSinkProcessor用来处理一个sink的group组,当高优先级的sink处理失败后,FailoverSinkProcessor会选择另一个sink来处理.

/**
* FailoverSinkProcessor maintains a prioritized list of sinks,
* guarranteeing that so long as one is available events will be processed.
*
* The failover mechanism works by relegating failed sinks to a pool
* where they are assigned a cooldown period, increasing with sequential
* failures before they are retried. Once a sink succesfully sends an
* event it is restored to the live pool.
*
* FailoverSinkProcessor is in no way thread safe and expects to be run via
* SinkRunner Additionally, setSinks must be called before configure, and
* additional sinks cannot be added while running
*
* To configure, set a sink groups processor to "failover" and set priorities
* for individual sinks, all priorities must be unique. Furthermore, an
* upper limit to failover time can be set(in miliseconds) using maxpenalty
*
* Ex)
*
* host1.sinkgroups = group1
*
* host1.sinkgroups.group1.sinks = sink1 sink2
* host1.sinkgroups.group1.processor.type = failover
* host1.sinkgroups.group1.processor.priority.sink1 = 5
* host1.sinkgroups.group1.processor.priority.sink2 = 10
* host1.sinkgroups.group1.processor.maxpenalty = 10000
*
*/
public class FailoverSinkProcessor extends AbstractSinkProcessor {
private static final int FAILURE_PENALTY = 1000;
private static final int DEFAULT_MAX_PENALTY = 30000;
private class FailedSink implements Comparable<FailedSink> {
private Long refresh;
private Integer priority;
private Sink sink;
private Integer sequentialFailures;
public FailedSink(Integer priority, Sink sink, int seqFailures) {
this.sink = sink;
this.priority = priority;
this.sequentialFailures = seqFailures;
adjustRefresh();
}
@Override
public int compareTo(FailedSink arg0) {
return refresh.compareTo(arg0.refresh);
}
public Long getRefresh() {
return refresh;
}
public Sink getSink() {
return sink;
}
public Integer getPriority() {
return priority;
}
public void incFails() {
sequentialFailures++;
adjustRefresh();
logger.debug("Sink {} failed again, new refresh is at {}, " +
"current time {}", new Object[] {
sink.getName(), refresh, System.currentTimeMillis()});
}
private void adjustRefresh() {
refresh = System.currentTimeMillis()
+ Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
}
}
private static final Logger logger = LoggerFactory
.getLogger(FailoverSinkProcessor.class);
private static final String PRIORITY_PREFIX = "priority.";
private static final String MAX_PENALTY_PREFIX = "maxpenalty";
private Map<String, Sink> sinks;
private Sink activeSink;
private SortedMap<Integer, Sink> liveSinks;
private Queue<FailedSink> failedSinks;
private int maxPenalty;
@Override
public void configure(Context context) {
liveSinks = new TreeMap<Integer, Sink>();
failedSinks = new PriorityQueue<FailedSink>();
Integer nextPrio = 0;
String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
if(maxPenaltyStr == null) {
maxPenalty = DEFAULT_MAX_PENALTY;
} else {
try {
maxPenalty = Integer.parseInt(maxPenaltyStr);
} catch (NumberFormatException e) {
logger.warn("{} is not a valid value for {}",
new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
maxPenalty  = DEFAULT_MAX_PENALTY;
}
}
for (Entry<String, Sink> entry : sinks.entrySet()) {
String priStr = PRIORITY_PREFIX + entry.getKey();
Integer priority;
try {
priority =  Integer.parseInt(context.getString(priStr));
} catch (Exception e) {
priority = --nextPrio;
}
if(!liveSinks.containsKey(priority)) {
liveSinks.put(priority, sinks.get(entry.getKey()));
} else {
logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
"duplicates that of sink {}", entry.getKey(),
liveSinks.get(priority));
}
}
activeSink = liveSinks.get(liveSinks.lastKey());
}
@Override
public Status process() throws EventDeliveryException {
// Retry any failed sinks that have gone through their "cooldown" period
Long now = System.currentTimeMillis();
while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
FailedSink cur = failedSinks.poll();
Status s;
try {
s = cur.getSink().process();
if (s  == Status.READY) {
liveSinks.put(cur.getPriority(), cur.getSink());
activeSink = liveSinks.get(liveSinks.lastKey());
logger.debug("Sink {} was recovered from the fail list",
cur.getSink().getName());
} else {
// if it's a backoff it needn't be penalized.
failedSinks.add(cur);
}
return s;
} catch (Exception e) {
cur.incFails();
failedSinks.add(cur);
}
}
Status ret = null;
while(activeSink != null) {
try {
ret = activeSink.process();
return ret;
} catch (Exception e) {
logger.warn("Sink {} failed and has been sent to failover list",
activeSink.getName(), e);
activeSink = moveActiveToDeadAndGetNext();
}
}
throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}
}


public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
public static final String CONFIG_SELECTOR = "selector";
public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
public static final String SELECTOR_NAME_RANDOM = "RANDOM";

private static final Logger LOGGER = LoggerFactory
.getLogger(LoadBalancingSinkProcessor.class);
private SinkSelector selector;
@Override
public void configure(Context context) {
Preconditions.checkState(getSinks().size() > 1,
"The LoadBalancingSinkProcessor cannot be used for a single sink. "
+ "Please configure more than one sinks and try again.");
String selectorTypeName = context.getString(CONFIG_SELECTOR,
SELECTOR_NAME_ROUND_ROBIN);
selector = null;
if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector();
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector();
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
selector.setSinks(getSinks());
selector.configure(
new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
LOGGER.debug("Sink selector: " + selector + " initialized");
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}

/**
* <p>
* An interface that allows the LoadBalancingSinkProcessor to use
* a load-balancing strategy such as round-robin, random distribution etc.
* Implementations of this class can be plugged into the system via
* processor configuration and are used to select a sink on every invocation.
* </p>
* <p>
* An instance of the configured sink selector is create during the processor
* configuration, its {@linkplain #setSinks(List)} method is invoked following
* which it is configured via a subcontext. Once configured, the lifecycle of
* this selector is tied to the lifecycle of the sink processor.
* </p>
* <p>
* At runtime, the processor invokes the {@link #createSinkIterator()}
* method for every <tt>process</tt> call to create an iteration order over
* the available sinks. The processor then loops through this iteration order
* until one of the sinks succeeds in processing the event. If the iterator
* is exhausted and none of the sinks succeed, the processor will raise
* an <tt>EventDeliveryException</tt>.
* </p>
*/
public interface SinkSelector extends Configurable, LifecycleAware {
void setSinks(List<Sink> sinks);
Iterator<Sink> createSinkIterator();
}
/**
* A sink selector that implements the round-robin sink selection policy.
* This implementation is not MT safe.
*/
private static class RoundRobinSinkSelector extends AbstractSinkSelector {
private int nextHead = 0;
@Override
public Iterator<Sink> createSinkIterator() {
int size = getSinks().size();
int[] indexOrder = new int[size];
int begin = nextHead++;
if (nextHead == size) {
nextHead = 0;
}
for (int i=0; i < size; i++) {
indexOrder = (begin + i)%size;
}
return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}
/**
* A sink selector that implements a random sink selection policy. This
* implementation is not thread safe.
*/
private static class RandomOrderSinkSelector extends AbstractSinkSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public Iterator<Sink> createSinkIterator() {
int size = getSinks().size();
int[] indexOrder = new int[size];
List<Integer> indexList = new ArrayList<Integer>();
for (int i=0; i<size; i++) {
indexList.add(i);
}
while (indexList.size() != 1) {
int pick = random.nextInt(indexList.size());
indexOrder[indexList.size() - 1] = indexList.remove(pick);
}
indexOrder[0] = indexList.get(0);
return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}
}

  [size=1em]LoadBalancingSinkProcessor是用来做load balance的,分为两种selector,RandomOrderSinkSelector和RoundRobinSinkSelector。RoundRobinSinkSelector循环选取一个sink作为最先处理。[size=1em]RandomOrderSinkSelector随机选取一个作为最先处理。

public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
public static final String CONFIG_SELECTOR = "selector";
public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
public static final String SELECTOR_NAME_RANDOM = "RANDOM";

private static final Logger LOGGER = LoggerFactory
.getLogger(LoadBalancingSinkProcessor.class);
private SinkSelector selector;
@Override
public void configure(Context context) {
Preconditions.checkState(getSinks().size() > 1,
"The LoadBalancingSinkProcessor cannot be used for a single sink. "
+ "Please configure more than one sinks and try again.");
String selectorTypeName = context.getString(CONFIG_SELECTOR,
SELECTOR_NAME_ROUND_ROBIN);
selector = null;
if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector();
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector();
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
selector.setSinks(getSinks());
selector.configure(
new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
LOGGER.debug("Sink selector: " + selector + " initialized");
}
@Override
public void start() {
super.start();
selector.start();
}
@Override
public void stop() {
super.stop();
selector.stop();
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}

/**
* <p>
* An interface that allows the LoadBalancingSinkProcessor to use
* a load-balancing strategy such as round-robin, random distribution etc.
* Implementations of this class can be plugged into the system via
* processor configuration and are used to select a sink on every invocation.
* </p>
* <p>
* An instance of the configured sink selector is create during the processor
* configuration, its {@linkplain #setSinks(List)} method is invoked following
* which it is configured via a subcontext. Once configured, the lifecycle of
* this selector is tied to the lifecycle of the sink processor.
* </p>
* <p>
* At runtime, the processor invokes the {@link #createSinkIterator()}
* method for every <tt>process</tt> call to create an iteration order over
* the available sinks. The processor then loops through this iteration order
* until one of the sinks succeeds in processing the event. If the iterator
* is exhausted and none of the sinks succeed, the processor will raise
* an <tt>EventDeliveryException</tt>.
* </p>
*/
public interface SinkSelector extends Configurable, LifecycleAware {
void setSinks(List<Sink> sinks);
Iterator<Sink> createSinkIterator();
}
/**
* A sink selector that implements the round-robin sink selection policy.
* This implementation is not MT safe.
*/
private static class RoundRobinSinkSelector extends AbstractSinkSelector {
private int nextHead = 0;
@Override
public Iterator<Sink> createSinkIterator() {
int size = getSinks().size();
int[] indexOrder = new int[size];
int begin = nextHead++;
if (nextHead == size) {
nextHead = 0;
}
for (int i=0; i < size; i++) {
indexOrder = (begin + i)%size;
}
return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}
/**
* A sink selector that implements a random sink selection policy. This
* implementation is not thread safe.
*/
private static class RandomOrderSinkSelector extends AbstractSinkSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public Iterator<Sink> createSinkIterator() {
int size = getSinks().size();
int[] indexOrder = new int[size];
List<Integer> indexList = new ArrayList<Integer>();
for (int i=0; i<size; i++) {
indexList.add(i);
}
while (indexList.size() != 1) {
int pick = random.nextInt(indexList.size());
indexOrder[indexList.size() - 1] = indexList.remove(pick);
}
indexOrder[0] = indexList.get(0);
return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379665-1-1.html 上篇帖子: 一个flume的问题,帮忙看一下 下篇帖子: Flume接收Log4j日志 发送到控制台
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表