212121 发表于 2016-11-7 08:47:31

kafka high-level consumer 多线程访问异常

在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。



   def hasNext(): Boolean = {

      if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常

          throw new IllegalStateException("Iterator is in failed state")

      state match {

          case DONE => false

          case READY => true

          case _ => maybeComputeNext()

      }

      }

      

      

      def maybeComputeNext(): Boolean = {

      state = FAILED            //重置了状态

      nextItem = Some(makeNext())         

      if(state == DONE) {

          false

      } else {

          state = READY

          true

      }

      }

      下载

      

    protected def makeNext(): MessageAndMetadata = {

      var currentDataChunk: FetchedDataChunk = null

      // if we don't have an iterator, get one

      var localCurrent = current.get()

      if(localCurrent == null || !localCurrent.hasNext) {

          if (consumerTimeoutMs < 0)

            currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞

      

          else {

            currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)

            if (currentDataChunk == null) {

            // reset state to make the iterator re-iterable

            resetState()

            throw new ConsumerTimeoutException

            }

          }

    //省略部分代码

    }

页: [1]
查看完整版本: kafka high-level consumer 多线程访问异常