设为首页 收藏本站
查看: 583|回复: 0

[经验分享] (12)自定义数据流(实战Docker事件推送的REST API)——响应式Spring的道法术器

[复制链接]

尚未签到

发表于 2018-5-26 07:44:55 | 显示全部楼层 |阅读模式
  本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范
本文 测试源码 | 实战源码


2.2 自定义数据流
  这一小节介绍如何通过定义相应的事件(onNext、onError和onComplete) 创建一个 Flux 或 Mono。Reactor提供了generate、create、push和handle等方法,所有这些方法都使用 sink(池)来生成数据流。
  sink,顾名思义,就是池子,可以想象一下厨房水池的样子。如下图所示:
DSC0000.jpg

  下面介绍到的方法都有一个sink提供给方法使用者,通常至少会暴露三个方法给我们,next、error和complete。next和error相当于两个下水口,我们不断将自定义的数据放到next口,Reactor就会帮我们串成一个Publisher数据流,直到有一个错误数据放到error口,或按了一下complete按钮,数据流就会终止了。

2.2.1 generate
  generate是一种同步地,逐个地发出数据的方法。因为它提供的sink是一个SynchronousSink, 而且其next()方法在每次回调的时候最多只能被调用一次。
  generate方法有三种签名:

    public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
  1)使用SynchronousSink生成数据流

    @Test
public void testGenerate1() {
final AtomicInteger count = new AtomicInteger(1);   // 1
Flux.generate(sink -> {
sink.next(count.get() + " : " + new Date());   // 2
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count.getAndIncrement() >= 5) {
sink.complete();     // 3
}
}).subscribe(System.out::println);  // 4
}

  • 用于计数;
  • 向“池子”放自定义的数据;
  • 告诉generate方法,自定义数据已发完;
  • 触发数据流。
  输出结果为每1秒钟打印一下时间,共打印5次。
  2)增加一个伴随状态
  对于上边的例子来说,count用于记录状态,当值达到5之后就停止计数。由于在lambda内部使用,因此必须是final类型的,且不能是原生类型(如int)或不可变类型(如Integer)。
  如果使用第二个方法签名,上边的例子可以这样改:

    @Test
public void testGenerate2() {
Flux.generate(
() -> 1,    // 1
(count, sink) -> {      // 2
sink.next(count + " : " + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count >= 5) {
sink.complete();
}
return count + 1;   // 3
}).subscribe(System.out::println);
}

  • 初始化状态值;
  • 第二个参数是BiFunction,输入为状态和sink;
  • 每次循环都要返回新的状态值给下次使用。
  3)完成后处理
  第三个方法签名除了状态、sink外,还有一个Consumer,这个Consumer在数据流发完后执行。

        Flux.generate(
() -> 1,
(count, sink) -> {
sink.next(count + " : " + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count >= 5) {
sink.complete();
}
return count + 1;
}, System.out::println)     // 1
.subscribe(System.out::println);
}

  • 最后将count值打印出来。
  如果 state 使用了数据库连接或者其他需要进行清理的资源,这个 Consumer lambda 可以用来在最后完成资源清理任务。

2.2.2 create
  create是一个更高级的创建Flux的方法,其生成数据流的方式既可以是同步的,也可以是异步的,并且还可以每次发出多个元素。
  create用到了FluxSink,后者同样提供 next,error 和 complete 等方法。 与generate不同的是,create不需要状态值,另一方面,它可以在回调中触发多个事件(即使事件是发生在未来的某个时间)。

  create 常用的场景就是将现有的 API 转为响应式,比如监听器的异步方法。

  先编写一个事件源:

    public class MyEventSource {
private List<MyEventListener> listeners;
public MyEventSource() {
this.listeners = new ArrayList<>();
}
public void register(MyEventListener listener) {    // 1
listeners.add(listener);
}
public void newEvent(MyEvent event) {
for (MyEventListener listener :
listeners) {
listener.onNewEvent(event);     // 2
}
}
public void eventStopped() {
for (MyEventListener listener :
listeners) {
listener.onEventStopped();      // 3
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class MyEvent {   // 4
private Date timeStemp;
private String message;
}
}

  • 注册监听器;
  • 向监听器发出新事件;
  • 告诉监听器事件源已停止;
  • 事件类,使用了lombok注解。
  准备一个监听器接口,它可以监听上边第2和3的两种事件:(1)新的MyEvent到来;(2)事件源停止。如下:

    public interface MyEventListener {
void onNewEvent(MyEventSource.MyEvent event);
void onEventStopped();
}
  下面的测试方法逻辑是:创建一个监听器注册到事件源,这个监听器再收到事件回调的时候通过Flux.create的sink将一系列事件转换成异步的事件流:

    @Test
public void testCreate() throws InterruptedException {
MyEventSource eventSource = new MyEventSource();    // 1
Flux.create(sink -> {
eventSource.register(new MyEventListener() {    // 2
@Override
public void onNewEvent(MyEventSource.MyEvent event) {
sink.next(event);       // 3
}
@Override
public void onEventStopped() {
sink.complete();        // 4
}
});
}
).subscribe(System.out::println);       // 5
for (int i = 0; i < 20; i++) {  // 6
Random random = new Random();
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));  
}
eventSource.eventStopped(); // 7
}

  • 事件源;
  • 向事件源注册用匿名内部类创建的监听器;
  • 监听器在收到事件回调的时候通过sink将事件再发出;
  • 监听器再收到事件源停止的回调的时候通过sink发出完成信号;
  • 触发订阅(这时候还没有任何事件产生);
  • 循环产生20个事件,每个间隔不超过1秒的随机时间;
  • 最后停止事件源。
  运行一下这个测试方法,20个MyEvent陆续打印出来。
  如果将上边的create方法换成generate方法,则会报出异常:

java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
  证明generate并不支持异步的方式。
  create方法还有一个变体方法push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种回压策略。所以上边的例子可以替换为push方法。区别在于,push方法中,调用next、complete或error的必须是同一个线程。
  除了next、complete或error方法外,FluxSink还有onRequest方法,这个方法可以用来响应下游订阅者的请求事件。从而不仅可以像上一个例子那样,上游在数据就绪的时候将其推送到下游,同时下游也可以从上游拉取已经就绪的数据。这是一种推送/拉取混合的模式。比如:

    Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s);   // 1
}
}
});
sink.onRequest(n -> {   // 2
List<String> messages = myMessageProcessor.request(n);  // 3
for(String s : message) {
sink.next(s);
}
});
...
}

  • push方式,主动向下游发出数据;
  • 在下游发出请求时被调用;
  • 响应下游的请求,查询是否有可用的message。

2.2.3 实战Docker事件推送API
  Docker提供了一个用来监听事件的命令:docker events,运行这个命令后,会监听docker daemon的事件并打印出来,执行是持续进行的,就像top或前边介绍的mongostat命令一样。Docker的java开发包的DockerClient也提供了相应的API,这个API是基于回调的,因此我们就可以使用Reactor的create方法,将这个基于回调的API转换为响应式流,流中的数据就是一个一个的docker事件。如下图所示:
DSC0001.jpg

  1)测试DockerClient
  首先,我们先启动docker。
  然后,我们继续用第一章的webflux-demomaven项目模块,在pom.xml中添加Docker开发相关的依赖:

        <!--docker client begin-->
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>3.0.14</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.26</version>
</dependency>
<!--docker client end-->
  最后编写测试方法:

public class DockerEventTest {
@Test
public void dockerEventToFlux() throws InterruptedException {
collectDockerEvents().subscribe(System.out::println);   // 5
TimeUnit.MINUTES.sleep(1);  // 6
}
private Flux<Event> collectDockerEvents() {
DockerClient docker = DockerClientBuilder.getInstance().build();    // 1
return Flux.create((FluxSink<Event> sink) -> {
EventsResultCallback callback = new EventsResultCallback() {    // 2
@Override
public void onNext(Event event) {   // 3
sink.next(event);
}
};
docker.eventsCmd().exec(callback);  // 4
});
}
}

  • 创建DockerClient,默认会连接tcp://localhost:2375,2375是docker默认的端口号,可以通过指定的IP和端口连接docker daemon:DockerClientBuilder.getInstance("tcp://192.168.0.123:2375").build(),不过要注意docker daemon监听接口和防火墙的配置。
  • 自定义回调类。
  • 当有docker事件产生时,会回调onNext,这时候通过FluxSink的next方法将Event对象发出。
  • 开始对docker事件进行监听。
  • 通过订阅的方式打印出来。
  • 主线程会立刻返回,因此等待1分钟。
  OK,看一下效果。
  为了方便对比,我们首先在终端运行docker events命令,然后在另一个终端进行docker操作,比如本例:

docker run -it -m 200M --memort-swap=200M progrium/stress --vm 1 --vm-bytes 300M
  progrium/stress是一个用于压力测试的容器,通过-m 200M指定为该容器的运行最多分配200M内存,然后在压力测试的时候,通过--vm-bytes 300M使其运行时尝试分配300M的内存,此时会出现内存不足(OOM)的错误并导致容器被杀死(single 9)。
DSC0002.jpg

  如图所示,上方是分别运行两个命令的终端窗口,可以看到docker events命令打印出了一系列事件,如果是第一个运行progrium/stress应该回先有一个pull镜像的事件。下方是我们的测试代码的输出,除了一些日志之外,可以看到这些事件也被输出了。
  2)REST API推送到前端
  下面,我们更进一步将Event事件通过REST API推送到浏览器端,看过第1.3.3节的话,对这一块儿应该是轻车熟路了。
  (一)首先定义一个我们自己的DockerEvent,这一步不是必须的哈,不过DockerClient返回的Event本身字段比较多,通常前端展示的话会转换为dvo,“戏要做足”嘛,哈哈。
  DockerEvent.java

@Data
@Document(collection = "docker-event")
public class DockerEvent {
@Indexed
private String status;
@Id
private String id;
private String from;
private Node node;
private EventType type;
private String action;
private String actorId;
private Long time;
private Long timeNano;
}
  (二)然后就是DAO层了,创建一个DockerEventMongoRepository,增加三个@Tailable的查询方法,分别用于查询全部、按照状态查询和按类型+名称查询(比如查询某某容器的事件):
  DockerEventMongoRepository.java

public interface DockerEventMongoRepository extends ReactiveMongoRepository<DockerEvent, String> {
@Tailable
Flux<DockerEvent> findBy();
@Tailable
Flux<DockerEvent> findByStatus(String status);
@Tailable
Flux<DockerEvent> findByTypeAndFrom(String type, String from);
}
  (三)定义一个CommandLineRunner,用于在应用启动后即开始监听docker事件:
  DockerEventsCollector.java

@Slf4j
@Component
public class DockerEventsCollector implements CommandLineRunner {
private DockerEventMongoRepository dockerEventMongoRepository;
private MongoTemplate mongo;    // 1
public DockerEventsCollector(DockerEventMongoRepository dockerEventMongoRepository, MongoTemplate mongo) {  // 1
this.dockerEventMongoRepository = dockerEventMongoRepository;
this.mongo= mongo;
}
@Override
public void run(String... args) {
mongo.dropCollection(DockerEvent.class);    // 2
mongo.createCollection(DockerEvent.class, CollectionOptions.empty().maxDocuments(200).size(100000).capped()); // 2
dockerEventMongoRepository.saveAll(collect()).subscribe();  // 6
}
private Flux<DockerEvent> collect() {   // 3
DockerClient docker = DockerClientBuilder.getInstance().build();
return Flux.create((FluxSink<Event> sink) -> {
EventsResultCallback callback = new EventsResultCallback() {
@Override
public void onNext(Event event) {
sink.next(event);
}
};
docker.eventsCmd().exec(callback);
})
.map(this::trans)   // 4
.doOnNext(e -> log.info(e.toString())); // 5
}
private DockerEvent trans(Event event) {    // 4
DockerEvent dockerEvent = new DockerEvent();
dockerEvent.setAction(event.getAction());
dockerEvent.setActorId(Objects.requireNonNull(event.getActor()).getId());
dockerEvent.setFrom(event.getFrom() == null ? null : event.getFrom().replace("//", "_"));
dockerEvent.setId(UUID.randomUUID().toString());
dockerEvent.setNode(event.getNode());
dockerEvent.setStatus(event.getStatus());
dockerEvent.setTime(event.getTime());
dockerEvent.setTimeNano(event.getTimeNano());
dockerEvent.setType(event.getType());
return dockerEvent;
}
}

  • 这里使用的是MongoTemplate,Spring 4.3 之后,如果有构造方法,Spring会自动注入,不需要@Autowired注解了。
  • 每次启动应用针对DockerEvent创建“capped”的collection,方便测试,如果提前手动创建好的话可以不加这两句。如果在//1处使用的是响应式的ReactiveMongoTemplate,因为是异步的,所以要用then()或thenMany()将后续的所有操作连接起来,如mongo.dropCollection(...).then(mongo.createCollection(...)).thenMany(dockerEventMongoRepository.saveAll(collect())),保证能先后依次执行。
  • 监听docker事件的方法。
  • 将返回的Event转换为我们定义的DockerEvent,其中DockerEvent.from字段是事件主体名称,比如容器名,可能有/,因此进行一个字符替换,否则在URL中会有问题。
  • 打印个日志(可选)。
  • 将收集的DockerEvent保存到MongoDB,用subscribe()触发执行。
  (四)Service层没有啥逻辑,我们直接写Controller:
  DockerEventController.java

@Slf4j
@RestController
@RequestMapping(value = "/docker/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)    // 1
public class DockerEventController {
private DockerEventMongoRepository dockerEventMongoRepository;
public DockerEventController(DockerEventMongoRepository dockerEventMongoRepository) {
this.dockerEventMongoRepository = dockerEventMongoRepository;
}
@GetMapping
public Flux<DockerEvent> dockerEventStream() {  // 2
return dockerEventMongoRepository.findBy();
}
@GetMapping("/{type}/{from}")
public Flux<DockerEvent> dockerEventStream(@PathVariable("type") String type, @PathVariable("from") String from) {    // 3
return dockerEventMongoRepository.findByTypeAndFrom(type, from);
}
@GetMapping("/{status}")
public Flux<DockerEvent> dockerEventStream(@PathVariable String status) {   // 4
return dockerEventMongoRepository.findByStatus(status);
}
}
  OK了,启动试一下:
DSC0003.jpg

  可以看到,右侧的浏览器的小图标一直在旋转,表示持续接收推送中,当在终端中进行docker操作的时候,所产生的事件就立刻出现在浏览器中了。如果请求/docker/events/oom将只推送OOM事件,如果请求/docker/events/container/progrium_stress将只推送来自容器progrium/stress的事件。

  再次提醒,当capped 的 Collection中一条数据都没有的时候,@Tailable的API也会立刻返回,所以需要等到数据库中有至少一条数据之后(比如先执行以下pull),再在浏览器中请求docker/eventsAPI。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-481140-1-1.html 上篇帖子: Docker(五):Docker 三剑客之 Docker Machine 下篇帖子: docker_基础_1
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表