设为首页 收藏本站
查看: 3944|回复: 0

[经验分享] spark-streaming连接flume时报错org.jboss.netty.channel.ChannelException: Failed to bin

[复制链接]

尚未签到

发表于 2015-11-28 15:55:59 | 显示全部楼层 |阅读模式
  http://bbs.csdn.net/topics/390971594?page=1#post-398808154
  上面是我当时提问用的,折磨了我好几天,后来发现问题了,分析如下:
  连接flume是通过
  JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(jssc, args[0], Integer.parseInt(args[1]));
  

FlumeUtils {
/**
* Create a input stream from a Flume source.
* @param ssc      StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port     Port of the slave machine to which the flume data will be sent
* @param storageLevel  Storage level to use for storing the received objects
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
  
  在跟进FlumeInputDStream内部:
  

class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}


在跟进FlumeReceiver  
  

class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new <span style=&quot;color:#ff0000;&quot;>InetSocketAddress</span>(host, port))
def onStart() {
server.start()
logInfo(&quot;Flume receiver started&quot;)
}
def onStop() {
server.close()
logInfo(&quot;Flume receiver stopped&quot;)
}
override def preferredLocation = Some(host)
}


这就是建立端口连接的地方了,我们会发现InetSocketAddress这个类,他属于Jdk的jar包rt.jar  
  现在把这个jar包也放入调用,命令里:
  spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn-cluster --executor-memory 10G --num-executors 50 --jars /home/hadoop/spark-streaming-flume_2.10-1.1.0.jar,/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/avro/avro-ipc-1.7.5-cdh5.1.0.jar,/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.1.0.jar,/home/hadoop/fastjson-1.1.41.jar,/home/hadoop/rt.jar/home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar
10.4.22.16 58006


  发现可以了,究其原因:主要是我们集群的JAVA_HOME没有设置的缘故。
  PS:养成看源码的习惯,还是不错的。。
  如果上面的解释有问题,还请路过的大神指点,谢谢
  ----------------------------------------------------------------------------------------------------
  不是上面的问题,问题还在解决中~~~
  ----------------------------------------------------------------------------------------------------
  折磨了我好几天的事情终于解决了,看看自己之前的排错信息,很是可笑啊,这件事还是自己对yarn和spark不了解所致:
  详情可以看这篇文章:
  http://m.blog.csdn.net/blog/gengqi88/39089349


  

从日志看是端口没有启动。查看下yarn container 上启动的worker,同样报没有绑定端口的异常。




      查看 yarn 上job的启动的节点,在配置接收flume 数据的节点上,并没有worker的启动。问题正好出现这这里,由于yarn 上发布container 是有RM 根据集群的资源使用情况进行分配的,事先并不&#20540;得哪个节点上有启动到container,也就无法&#20540;得那个节点上有spark的worker了。更谈不上事先设置在启动spark stream 程序中的host 是否能真正有worker再运行了。这个或许是spark在yarn模式下的一个BUG
吧。


       解决方法是,将host 修改为0.0.0.0 进行运行。等spark 程序启动后,查看在哪个节点上启动了接收flume流数据的端口,在将该主机和端口配置到flume的配置文件中,启动flume,就可以实现数据得传输了。


意思就是把程序提交到yarn集群后,RM会根据资源情况分配哪些container来执行这个程序,比如一个集群有1——10台node,在机器1上提交application到yarn,yarn的RM分配机器2和机器3上的container来执行该application,那么命令如下:  spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn --deploy-mode cluster --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar
/home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar 10.4.22.16 58006    (10.4.22.16是机器1的IP)


  但是在机器1上并没有运行application的程序,所以无法打开58006端口,这个是spark的BUG。解决办法是调用如下命令:
  spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn --deploy-mode cluster --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar
/home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar 0.0.0.0 58006


  用0.0.0.0代替,不能使用127.0.0.1,否则flume连不上对应机器。(我也不知道为什么连不上)当application运行时,在到各个机器上去查看哪台机器的58006端口在listen。netstat -anp | grep 58006
  这时把flume的配置文件改成该地址,重启就OK了。
  啊,多么痛的领悟啊。
  ---------------------------------------------------------------------------------
  启动命令中可以设置--num-executors 10,意思就是启动10个executors,在输出的日志中也会看到如下信息:
  

15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002041.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022054.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022045.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022018.ksc.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002023.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022016.ksc.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022055.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002024.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002026.hz01.ksyun.com:8041
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022042.hz01.ksyun.com:8041

  
  就是在上面10个节点上会有一个节点启动flume监听端口,那么我们就需要挨个到对应的机器上去检查哪个被启动了。使用以下脚本,在调用时传入机器列表,就可以快速定位:
  for arg in $* ; do
re=`ssh -o strictHostKeyChecking=no $arg -t &quot;sudo netstat -anp  | grep 58006 | grep LISTEN |wc -l&quot;`
if [[ $re > &quot;1&quot; ]];then
echo $arg
fi
done

其中的58006是你对应启动的那个端口。前提是这个脚本所在的机器能够ssh到对应的机器列表中
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144406-1-1.html 上篇帖子: (3)flume 单节点写入HDFS练习 以及 自定义拦截器 进行formatLog 下篇帖子: flume学习(七)、(八):如何使用event header中的key值以及自定义source
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表