|
kclient消息中间件
从使用角度上开始入手学习
kclient-processor
该项目使用springboot调用kclient库,程序目录如下:
domain
Cat : 定义了一个cat对象
Dog : 定义了一个Dog对象
handler : 消息处理器
AnimalsHandler : 定义了Cat和Dog的具体行为
KClientApplication.java : Spring boot的主函数——程序执行入口
KClientController.java : Controller 文件
top.ninwoo.kclient.app.KClientApplication
1.启动Spring Boot
ApplicationContext ctxBackend = SpringApplication.run(
KClientApplication.class, args);
2.启动程序后将自动加载KClientController(@RestController)
top.ninwoo.kclient.app.KClientController
1.通过@RestController,使@SpringBootApplication,可以自动加载该Controller
2.通过kafka-application.xml加载Beans
private ApplicationContext ctxKafkaProcessor =
new ClassPathXmlApplicationContext("kafka-application.xml");
kafka-application.xml声明了一个kclient bean,并设置其初始化执行init方法,具体实现见下章具体实现。
另外声明了一个扫描消息处理器的bean
具体内容在下一节介绍
使用@RequestMapping定义/,/status,/stop,/restart定义了不同的接口
这些接口实现比较简单,需要注意的是他们调用的getKClientBoot()函数。
上文,我们已经通过xml中,添加了两个Bean,调用Bean的具体实现方法如下:
private KClientBoot getKClientBoot() {
return (KClientBoot) ctxKafkaProcessor.getBean("kClientBoot");
}
通过Bean获取到KClient获取到了KClientBoot对象,便可以调用其具体方法。
top.ninwoo.kclient.app.handler.AnimalsHandler
消息处理函数
1.使用@KafkaHandlers进行声明bean,关于其具体实现及介绍在具体实现中进行介绍
2.定义了三个处理函数
dogHandler
catHandler
ioExceptionHandler
dogHandler
具体处理很简单,主要分析@InputConsumer和@Consumer的作用,具体实现将在后续进行介绍。
@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
@OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
public Cat dogHandler(Dog dog) {
System.out.println("Annotated dogHandler handles: " + dog);
return new Cat(dog);}
@InputConsumer根据输入参数定义了一个Consumer,通过该Consumer传递具体值给dog,作为该处理函数的br/>@OutputProducer根据输入参数定义一个Producer,而该处理函数最后返回的Cat对象,将通过该Producer最终传递到Kafka中 |
|
|