宇文氏 发表于 2019-1-30 13:55:31

Spark2.11 两种流操作 + Kafka-13054681

def main(args: Array): Unit = {  
val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf,
  "value.deserializer" -> classOf,
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  //      "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
  
)
  
val ssc = new StreamingContext(OpContext.sc, Seconds(2))
  
val fromOffsets = Map(new TopicPartition("test", 0) -> 1100449855L)
  
val stream = KafkaUtils.createDirectStream(
  ssc,
  PreferConsistent,
  Assign(fromOffsets.keys.toList, kafkaParams, fromOffsets)
  
)
  

  
stream.foreachRDD(rdd => {
  val offsetRanges = rdd.asInstanceOf.offsetRanges
  for (o(record.key, record.value)).print(1)
  
ssc.start()
  
ssc.awaitTermination()
  
}


页: [1]
查看完整版本: Spark2.11 两种流操作 + Kafka-13054681