NATS是一个开源的、轻量级的、高性能的分布式消息通信系统,使用的公司有百度、西门子、VMware、HTC和爱立信。NATS Streaming是以NATS为动力的数据流系统,是用go语言写的,NATS Streaming server可执行文件的名称是nats-streaming-server,NATS Streaming嵌入,扩展,并且与核心的NATS平台无缝的对接,NATS Streaming server是根据麻省理工学院许可提供的开源软件,Apcera积极维护和支持NATS Streaming server。
此图来自官网,其他特性也可在官网查看。
测试环境
os:CentOS>
Install the NATS Streaming server
我这里使用go环境安装,所以先部署go环境。
Install Go
wget https://storage.googleapis.com/golang/go1.8.3.linux-amd64.tar.gz
tar -C /usr/local -xzvf go1.8.3.linux-amd64.tar.gz
上面将go安装到/usr/local下。
配置环境变量
将下面这句添加到/etc/profile作为全局变量,或者$HOME/bash_profile作为此用户的变量。然后source此文件让环境变量生效。
export PATH=$PATH:/usr/local/go/bin
设置你的GOPATH
你的go工作目录(GOPATH)存储你的go代码。它可以是除了go安装目录(GOROOT)外的任何路径。
export GOPATH=$HOME/go
还设置GOPATH/bin变量,用于运行编译go程序。
export PATH=$PATH:$GOPATH/bin
然后使用go get来下载源代码并进行编译安装:
go get github.com/nats-io/nats-streaming-server
上面编译安装完成后,会在GOPATH/bin目录下生成一个nats-streaming-server的二进制文件。
Start the NATS Streaming server
可以直接运行上面生成的nats-streaming-server二进制文件,启动一个独立的server。
nats-streaming-server
当启动成功时,你可以看到nats-streaming-server在tcp端口4222上监听客户端连接:
[3132] 2017/07/30 20:10:33.943821 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.6.0
[3132] 2017/07/30 20:10:33.944007 [INF] STREAM: ServerID: TN2MjoOfZd83WZLJIGwtYh
[3132] 2017/07/30 20:10:33.944016 [INF] STREAM: Go version: go1.8.3
[3132] 2017/07/30 20:10:33.944226 [INF] Starting nats-server version 0.9.6
[3132] 2017/07/30 20:10:33.944424 [INF] Listening for client connections on 0.0.0.0:4222
[3132] 2017/07/30 20:10:33.944436 [INF] Server is ready
[3132] 2017/07/30 20:10:34.225391 [INF] STREAM: Message store is MEMORY
[3132] 2017/07/30 20:10:34.225513 [INF] STREAM: ---------- Store Limits ----------
[3132] 2017/07/30 20:10:34.225531 [INF] STREAM: Channels: 100 *
[3132] 2017/07/30 20:10:34.225543 [INF] STREAM: --------- Channels Limits --------
[3132] 2017/07/30 20:10:34.225555 [INF] STREAM: Subscriptions: 1000 *
[3132] 2017/07/30 20:10:34.225566 [INF] STREAM: Messages : 1000000 *
[3132] 2017/07/30 20:10:34.225577 [INF] STREAM: Bytes : 976.56 MB *
[3132] 2017/07/30 20:10:34.225588 [INF] STREAM: Age : unlimited *
[3132] 2017/07/30 20:10:34.225599 [INF] STREAM: ----------------------------------
启动带有NATS监控的NATS Streaming Server
NATS Streaming Server暴露监控界面在嵌入NATS Server(gnatsd)在端口8222。
nats-streaming-server -m 8222
运行可以看到如下信息:
[3143] 2017/07/30 20:20:06.961108 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.6.0
[3143] 2017/07/30 20:20:06.961250 [INF] STREAM: ServerID: EYubV70M1FW4xetDCi8cXJ
[3143] 2017/07/30 20:20:06.961268 [INF] STREAM: Go version: go1.8.3
[3143] 2017/07/30 20:20:06.963097 [INF] Starting nats-server version 0.9.6
[3143] 2017/07/30 20:20:06.963988 [INF] Starting http monitor on 0.0.0.0:8222
[3143] 2017/07/30 20:20:06.964162 [INF] Listening for client connections on 0.0.0.0:4222
[3143] 2017/07/30 20:20:06.964183 [INF] Server is ready
[3143] 2017/07/30 20:20:07.247235 [INF] STREAM: Message store is MEMORY
[3143] 2017/07/30 20:20:07.247366 [INF] STREAM: ---------- Store Limits ----------
[3143] 2017/07/30 20:20:07.247385 [INF] STREAM: Channels: 100 *
[3143] 2017/07/30 20:20:07.247397 [INF] STREAM: --------- Channels Limits --------
[3143] 2017/07/30 20:20:07.247408 [INF] STREAM: Subscriptions: 1000 *
[3143] 2017/07/30 20:20:07.247420 [INF] STREAM: Messages : 1000000 *
[3143] 2017/07/30 20:20:07.247430 [INF] STREAM: Bytes : 976.56 MB *
[3143] 2017/07/30 20:20:07.247457 [INF] STREAM: Age : unlimited *
[3143] 2017/07/30 20:20:07.247471 [INF] STREAM: ----------------------------------
到谷歌浏览器访问192.168.0.31:8222可看到如下监控界面:
使用Go NATS Streaming clients的例子
发布者
stan-pub.go
package main
import (
"github.com/nats-io/go-nats-streaming"
"runtime"
"fmt"
"time"
)
func main() {
//stan.Connect(clusterID, clientID, ops ...Option)
//默认clusterID为test-cluster
ns, err := stan.Connect("test-cluster", "myid", stan.NatsURL("nats://192.168.0.31:4222"))
if err != nil{
panic(err)
}
// Simple Synchronous Publisher
// does not return until an ack has been received from NATS Streaming
t1 :=time.Now().Format("2006-01-02 15:04:05")
//发布50000条消息
for i:=0;i<50000;i++ {
ns.Publish("logp", []byte("hello go"))
}
t2 :=time.Now().Format("2006-01-02 15:04:05")
// 打印发布开始时间和结束时间,可看出发布50000条消息耗时
fmt.Println("开始时间:"+t1, "结束时间"+t2)
ns.Close()
runtime.Goexit()
}
订阅者
stan-sub.go
package main
import (
"github.com/nats-io/go-nats-streaming"
"fmt"
"runtime"
"log"
)
func main() {
//stan.Connect(clusterID, clientID, ops ...Option)
ns, err := stan.Connect("test-cluster", "myid1", stan.NatsURL("nats://192.168.0.31:4222"))
if err != nil{
panic(err)
}
// Simple Synchronous Publisher
// does not return until an ack has been received from NATS Streaming
i:=0
_, err1 := ns.Subscribe("logp",func(msg *stan.Msg){
i++
fmt.Printf("Received a message: %s+%d\n", string(msg.Data),i)
},stan.DurableName("cdn1"))
if err1 != nil{
panic(err1)
}
log.Printf("Listening on [%s]\n", "logp")
runtime.Goexit()
}
我是在IDEA中同时的运行这两个程序,则会看到发布出去的消息被及时的订阅了。
参考网址:
官网
go客户端
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com