mgjj 发表于 2018-11-17 12:48:55

Apache Ignite 改装(一) -- 服务异步化支持

本文假设读者了解Apache Ignite,阅读过ignite service grid的官方文档,或使用过ignite的service grid,本文同样假设读者了解 java的CompletionStage的相关用法。本文涉及的ignite版本为2.4.0。
  使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的:

服务接口:
  

public interface MyService {  public String sayHello(String to);
  
}
  

  本文将实现如下样式的Service,使其异步化:

异步化的服务接口:
  

public interface MyServiceAsync {  public CompletionStage sayHello(String to);
  
}
  

  当前ignite对上边这样的异步的service方法并没有remote支持。当调用端与服务部署再同一节点时,ignite会发起一个本地方法调用,这样是没有问题的,但是当服务部署端与调用端在不同节点时,ignite通过发起一个distributed task,将调用通过消息方式发布到服务部署节点,由于服务实现是异步的,通常来说,会返回一个未完成状态的CompletionStage,后续当真正complete的时候,调用端的CompletionStage并不会被notify,即调用端永远无法得到真正的调用结果。
  为了能够支持CompletionStage的远程状态专递,我们需要对ignite进行如下改动:

org/apache/ignite/internal/processors/service/GridServiceProxy.java
  

...  
// line 192
  
if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) {
  //call async and notify completion stage
  final IgniteInternalFuture future = ctx.closure().callAsyncNoFailover(
  GridClosureCallMode.BROADCAST,
  new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
  Collections.singleton(node),
  false,
  waitTimeout,
  true);
  CompletableFuture cs = new CompletableFuture() {
  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
  try {
  future.cancel();
  } catch (Exception ignore) {}
  return super.cancel(mayInterruptIfRunning);
  }
  };
  future.listen(f -> {
  if(f.error() != null) {
  cs.completeExceptionally(f.error());
  }else if(f.isCancelled()) {
  cs.cancel(false);
  }
  if(f.isDone()) {
  try {
  Object result = f.get();
  if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) {
  cs.completeExceptionally((IgniteException)result);
  }else {
  cs.complete(f.get());
  }
  } catch (IgniteCheckedException e) {
  cs.completeExceptionally(e);
  }
  }
  });
  return cs;
  
}
  
...
  

  这段代码做了如下的事情:检测当服务方法返回值是一个CompletionStage的时候,则创建一个CompletableFuture作为代理对象返回给调用端。随后监听服务的远程调用的结果,并且用这个结果来更新这个CompletableFuture。到这里,调用端的service proxy的改造就完成了。接下来,我们还需要改造服务节点这一端:

org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的这个finally块),改造前:
  

finally {  // Finish here only if not held by this thread.
  if (!HOLD.get())
  finishJob(res, ex, sndRes);
  else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  
}
  

改造后:
  

finally {  if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
  final boolean sendResult = sndRes;
  final IgniteException igException = ex;
  @SuppressWarnings("unchecked")
  CompletionStage cs = (CompletionStage)res;
  cs.exceptionally(t->{
  return new IgniteException(t);
  }).thenAccept(r->{
  if (!HOLD.get()) {
  IgniteException e = igException;
  finishJob(r, e, sendResult);
  } else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  });
  } else {
  // Finish here only if not held by this thread.
  if (!HOLD.get())
  finishJob(res, ex, sndRes);
  else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  }
  
}
  

  这里做的事情是:当在服务部署节点上拿到执行结果的时候,如果发现服务返回结果是一个CompletionStage,那么处理这个CompletionStage的exceptionally和thenAccept, 把结果发送给remote的调用端。
  就这样,通过简单的改装,我们使ignite有了处理异步服务方法调用的能力。下边我们实现一个服务来看看改装结果:

服务定义与实现:
  

import java.util.concurrent.CompletionStage;  
import org.apache.ignite.services.Service;
  
public interface MyService extends Service {
  public CompletionStage sayHelloAsync(String to);
  public String sayHelloSync(String to);
  
}
  

import java.util.concurrent.CompletionStage;
  
public>  private ScheduledExecutorService es;
  @Override public void init(ServiceContext ctx) throws Exception {
  es = Executors.newSingleThreadScheduledExecutor();
  }
  @Override public CompletionStage sayHelloAsync(String to){
  CompletableFuture ret = new CompletableFuture();
  //return "async hello $to" after 3 secs
  es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
  return ret;
  }
  @Override public String sayHelloSync(String to){
  return "sync hello " + to;
  }
  ...
  
}
  

  然后将服务部署在Service grid中:
  

...  
ServiceConfiguration sConf = new ServiceConfiguration();
  
sConf.setName("myservice.version.1");
  
sConf.setService(new MyServiceImpl());
  
sConf.setMaxPerNodeCount(2);
  
sConf.setTotalCount(4);
  
ignite.services().deploy(sConf);
  
...
  

  然后启动一个客户端节点进行服务调用:
  

MyService service = ignite.services().serviceProxy("myservice.version.1",MyService.class, false);  
//test async service
  
service.sayHelloAsync("nathan").thenAccept(r->{
  System.out.println(r);
  
});
  
//test sync service
  
System.out.println(service.sayHelloSync("nathan"));
  
...
  

  输出结果:
  

sync hello nathan  
async hello nathan
  

  可以看到先输出了sync的结果,大约3秒后输出async的结果。


页: [1]
查看完整版本: Apache Ignite 改装(一) -- 服务异步化支持