设为首页 收藏本站
查看: 837|回复: 0

[经验分享] Apache Ignite 改装(一) -- 服务异步化支持

[复制链接]

尚未签到

发表于 2018-11-17 12:48:55 | 显示全部楼层 |阅读模式
本文假设读者了解Apache Ignite,阅读过ignite service grid的官方文档,或使用过ignite的service grid,本文同样假设读者了解 java的CompletionStage的相关用法。本文涉及的ignite版本为2.4.0。
  使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的:

服务接口:
  

public interface MyService {  public String sayHello(String to);
  
}
  

  本文将实现如下样式的Service,使其异步化:

异步化的服务接口:
  

public interface MyServiceAsync {  public CompletionStage sayHello(String to);
  
}
  

  当前ignite对上边这样的异步的service方法并没有remote支持。当调用端与服务部署再同一节点时,ignite会发起一个本地方法调用,这样是没有问题的,但是当服务部署端与调用端在不同节点时,ignite通过发起一个distributed task,将调用通过消息方式发布到服务部署节点,由于服务实现是异步的,通常来说,会返回一个未完成状态的CompletionStage,后续当真正complete的时候,调用端的CompletionStage并不会被notify,即调用端永远无法得到真正的调用结果。
  为了能够支持CompletionStage的远程状态专递,我们需要对ignite进行如下改动:

org/apache/ignite/internal/processors/service/GridServiceProxy.java
  

...  
// line 192
  
if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) {
  //call async and notify completion stage
  final IgniteInternalFuture future = ctx.closure().callAsyncNoFailover(
  GridClosureCallMode.BROADCAST,
  new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
  Collections.singleton(node),
  false,
  waitTimeout,
  true);
  CompletableFuture cs = new CompletableFuture() {
  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
  try {
  future.cancel();
  } catch (Exception ignore) {}
  return super.cancel(mayInterruptIfRunning);
  }
  };
  future.listen(f -> {
  if(f.error() != null) {
  cs.completeExceptionally(f.error());
  }else if(f.isCancelled()) {
  cs.cancel(false);
  }
  if(f.isDone()) {
  try {
  Object result = f.get();
  if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) {
  cs.completeExceptionally((IgniteException)result);
  }else {
  cs.complete(f.get());
  }
  } catch (IgniteCheckedException e) {
  cs.completeExceptionally(e);
  }
  }
  });
  return cs;
  
}
  
...
  

  这段代码做了如下的事情:检测当服务方法返回值是一个CompletionStage的时候,则创建一个CompletableFuture作为代理对象返回给调用端。随后监听服务的远程调用的结果,并且用这个结果来更新这个CompletableFuture。到这里,调用端的service proxy的改造就完成了。接下来,我们还需要改造服务节点这一端:

org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的这个finally块),改造前:
  

finally {  // Finish here only if not held by this thread.
  if (!HOLD.get())
  finishJob(res, ex, sndRes);
  else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  
}
  

改造后:
  

finally {  if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
  final boolean sendResult = sndRes;
  final IgniteException igException = ex;
  @SuppressWarnings("unchecked")
  CompletionStage cs = (CompletionStage)res;
  cs.exceptionally(t->{
  return new IgniteException(t);
  }).thenAccept(r->{
  if (!HOLD.get()) {
  IgniteException e = igException;
  finishJob(r, e, sendResult);
  } else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  });
  } else {
  // Finish here only if not held by this thread.
  if (!HOLD.get())
  finishJob(res, ex, sndRes);
  else
  // Make sure flag is not set for current thread.
  // This may happen in case of nested internal task call with continuation.
  HOLD.set(false);
  ctx.job().currentTaskSession(null);
  if (reqTopVer != null)
  GridQueryProcessor.setRequestAffinityTopologyVersion(null);
  }
  
}
  

  这里做的事情是:当在服务部署节点上拿到执行结果的时候,如果发现服务返回结果是一个CompletionStage,那么处理这个CompletionStage的exceptionally和thenAccept, 把结果发送给remote的调用端。
  就这样,通过简单的改装,我们使ignite有了处理异步服务方法调用的能力。下边我们实现一个服务来看看改装结果:

服务定义与实现:
  

import java.util.concurrent.CompletionStage;  
import org.apache.ignite.services.Service;
  
public interface MyService extends Service {
  public CompletionStage sayHelloAsync(String to);
  public String sayHelloSync(String to);
  
}
  

import java.util.concurrent.CompletionStage;
  
public>  private ScheduledExecutorService es;
  @Override public void init(ServiceContext ctx) throws Exception {
  es = Executors.newSingleThreadScheduledExecutor();
  }
  @Override public CompletionStage sayHelloAsync(String to){
  CompletableFuture ret = new CompletableFuture();
  //return "async hello $to" after 3 secs
  es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
  return ret;
  }
  @Override public String sayHelloSync(String to){
  return "sync hello " + to;
  }
  ...
  
}
  

  然后将服务部署在Service grid中:
  

...  
ServiceConfiguration sConf = new ServiceConfiguration();
  
sConf.setName("myservice.version.1");
  
sConf.setService(new MyServiceImpl());
  
sConf.setMaxPerNodeCount(2);
  
sConf.setTotalCount(4);
  
ignite.services().deploy(sConf);
  
...
  

  然后启动一个客户端节点进行服务调用:
  

MyService service = ignite.services().serviceProxy("myservice.version.1",  MyService.class, false);  
//test async service
  
service.sayHelloAsync("nathan").thenAccept(r->{
  System.out.println(r);
  
});
  
//test sync service
  
System.out.println(service.sayHelloSync("nathan"));
  
...
  

  输出结果:
  

sync hello nathan  
async hello nathan
  

  可以看到先输出了sync的结果,大约3秒后输出async的结果。



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-636239-1-1.html 上篇帖子: CentOS7 Apache调优之隐藏版本号 下篇帖子: Linux学习总结(三十一)lamp之mariadb apache 安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表