Spark2.11 两种流操作 + Kafka-13054681
def main(args: Array): Unit = {val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf,
"value.deserializer" -> classOf,
"group.id" -> "use_a_separate_group_id_for_each_stream",
// "auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(OpContext.sc, Seconds(2))
val fromOffsets = Map(new TopicPartition("test", 0) -> 1100449855L)
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Assign(fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf.offsetRanges
for (o(record.key, record.value)).print(1)
ssc.start()
ssc.awaitTermination()
}
页:
[1]