设为首页 收藏本站
查看: 1292|回复: 0

[经验分享] Flink kafka 定制技巧

[复制链接]

尚未签到

发表于 2019-1-31 09:18:56 | 显示全部楼层 |阅读模式
  动态路由:
方案1: 定制一个特殊的KafkaDynamicSink,内嵌多个原生的FlinkKafkaProducer,每个对应一个下游的KAFKA队列
在OPEN方法中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer

重载INVOKE方法
根据路由规则找到当前流数据对应所有的ChannelId (允许多个),再从MAP中获取对 FlinkKafkaProducer 并调用其INVOKE方法

核心代码:
public class DynamicKafkaSink extends RichSinkFunction {
    @Override
    public void open(Configuration parameters) throws Exception {
        List allChannels = channelRepository.getAll();
        for(ChannelModel nextChannel: allChannels) {
            FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
            FlinkKafkaProducer010.class, Collections.emptyMap());
            nextProducer.setRuntimeContext(this.getRuntimeContext());
            nextProducer.open(parameters);
            producers.put(nextChannel.getChannelId(), nextProducer);
        }
    }
   
    @Override
    public void invoke(IN value) throws Exception {
        List channelIds = channelRouteStrategy.route(value);
        for (String nextChannelId: channelIds) {
            FlinkKafkaProducer010 nextProducer = producers.get(nextChannelId);
            nextProducer.invoke(converted);
        }
    }

}




注意:
Map不能在构造函数中初始化,而要在OPEN方法中初始化,FLINK分布式特性决定了构造函数和OPEN不在同一个JVM里执行
类级别的变量需要可序列化,否则需要声明为TRANSIENT

每个新构建的FlinkKafkaProducer需要先调用
setRuntimeContext(this.getRuntimeContext())
再调用open 方法才能被使用


优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好

缺陷:
所有的FlinkKafkaProducer只在OPEN的时候创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由
更改了FlinkKafkaProducer创建和初始化的过程,从MAIN函数中转到了KafkaDynamicSink的OPEN方法里,未经过全面测试,可能存在问题


方案2:方案1的升级版,利用FLINK SPLIT STREAM的特性,根据路由规则将原生数据流分成多个,每个子数据流对应一个下游KAFKA队列
在FLINK Main 函数中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer
在输入流上构建一个SplitStream, OutputSelector 中根据路由逻辑返回一组ChannelId
遍历Map,对于Map中的每个Key (ChannelID) 调用 SplitStream 的 select方法获取对应的分支流数据,然后路由到对应的 FlinkKafkaProducer

核心代码:
public static void main(String[] args) {
    List allChannels = channelRepository.getAll();
    for(ChannelModel nextChannel: allChannels) {
        FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
        FlinkKafkaProducer010.class, Collections.emptyMap());
        nextProducer.setRuntimeContext(this.getRuntimeContext());
        nextProducer.open(parameters);
        producers.put(nextChannel.getChannelId(), nextProducer);
    }
   
    DataStreamSource source = ....
    SplitStream splitStream = source.split(new OutputSelector() {

        @Override
        public Iterable select(String value) {
            List channelIds = channelRouteStrategy.route(value);
            return channeIds;
        }
    });
   
    for(String nextChannel: producers.keySet()) {
        FlinkKafkaProducer010 target = producers.get(nextChannel);
        splitStream.select(nextChannel).addSink(target);
    }
}



优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好
完全利用FLINK原生的特性,更加简洁优雅,解决了方案1的第二点不足

缺陷:
所有的FlinkKafkaProducer只在MAIN函数中创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由


方案3: 利用FLINK的 KeyedSerializationSchema中的getTargetTopic函数,KeyedSerializationSchema 除了将对象转化Kafka ProducerRecord
的键值对之外还可以动态指定Topic
在FLINK Main 函数中将输入流通过flatMap 转化为 Tuple2, 其中key 是目标所属的Topic, value 是原生数据
实现一个KeyedSerializationSchema作为构造函数传给FlinkKafkaProducer,重载getTargetTopic方法: 返回 tuple2.f0

核心代码:
class DynaRouteSerializationSchema implements KeyedSerializationSchema {
   
    String getTargetTopic(T element) {
        Tuple2 tuple = (Tuple2)element;
        return tuple.f0;
    }
}

public static void main(String[] args) {
    DataStreamSource source = ....
    DataStream converted = source
    .flatMap(new RichFlatMapFunction() {
        @Override
        public void flatMap(T value, Collector out)
        throws Exception {
            List channelIds = channelRouteStrategy.route(value);
            for(String nextChannel: channelIds) {
                out.collect(Tuple2.valueOf(nextChannel, value));
            }
        }
    });
   
   

}


优点:
完全利用FLINK原生的特性,代码量非常少
新增加的TOPIC也可以被路由到,不需要启停流处理

缺陷:
无法像前两个方案实现Broker级别的路由,只能做到Topic级别的路由


断流功能:
  有时系统升级或者其他组件不可用,需要暂时停止KAFKA PRODUCER
FLINK 原生机制:
被动反压:
Kafka09Fetcher 包含了一根独立的 KafkaConsumerThread,从KAFKA中读取数据,再交给HANDOVER
HANDOVER可以理解为一个大小为1的队列, Kafka09Fetcher 再从队列中获取并处理数据,一旦当处理速度变慢,KafkaConsumerThread
无法将数据写入HANDOVER, 线程就会被阻塞

另外KeyedDeserializationSchema定义了一个isEndOfStream方法,如果返回true, Kafka09Fetcher就会停止循环并退出,导致整个流处理结束
  

  设计思路:

  SignalService:  注册SignalListener, 利用Curator TreeCache 监听一个Zookeeper 路径获取起动/停止流处理的信号量
  SignalListener: 接收ZOOKEEPER变更信息的回调接口
  PausableKafkaFetcher: 继承Flink原生的KafkaFetcher, 监听到信号变化阻塞ConsumerThread的处理

  PausableKafkaConsumer: 继承Flink原生的KafkaConsumer, 创建PausableKafkaFetcher

  

  核心代码:
  public class PausableKafkaFetcher extends Kafka010Fetcher implements SignalListener {
      private final ReentrantLock pauseLock = new ReentrantLock(true);
      private final Condition pauseCond = pauseLock.newCondition();
      private volatile boolean paused = false;
     

     public void onSignal(String path, String value) {
         try {
              pauseLock.lockInterruptibly();

         } catch(InterruptedException e) {
         }
         try {
             if (SIGNAL_PAUSE.equals(value)) {
                 paused = true;

             } else if (SIGNAL_START.equals(value)) {
                 paused = false;

             }
             pauseCond.signal();  

         }

         finally {
             pauseLock.unlock();

         }  

     }
  

     protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception {
        super.emitRecord(record, partition, offset, consumerRecord);
        pauseLock.lockInterruptibly();
        try {
           while (paused) {
              pauseCond.await();

           }

        } finally {
           pauseLock.unlock();

        }

    }

  }
  

  public class PausableKafkaConsumer extends FlinkKafkaConsumer010 {
       public void open(Configuration configuration) {
          signalService = ZKSignalService.getInstance();

          signalService.initialize(zkConfig);

       }
  

       public void cancel() {
           super.cancel();
           unregisterSignal();

       }   
  

       public void close() {
          super.close();
          unregisterSignal();

       }
  

       private void unregisterSignal() {
           if (signalService != null) {
              String fullPath = WATCH_PREFIX + "/" + watchPath;
              signalService.unregisterSignalListener(fullPath);

           }

       }   
  

       protected AbstractFetcher createFetcher(...) throws  Exception {
          PausableKafkaFetcher fetcher = new PausableKafkaFetcher (...);
          if (signalService != null) {
              String fullPath = WATCH_PREFIX + "/" + watchPath;
              signalService.registerSignalListener(fullPath, fetcher);

          }
          return fetcher

       }

  }





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669875-1-1.html 上篇帖子: kafka参数配置详解 下篇帖子: kafka producer实例及原理分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表