设为首页 收藏本站
查看: 2041|回复: 0

[经验分享] Docker部属Nsq集群

[复制链接]
累计签到:2 天
连续签到:1 天
发表于 2017-12-5 17:37:43 | 显示全部楼层 |阅读模式
  用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!
  nsq官网: http://nsq.io/
DSC0000.png

  为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。
  部署完后我会用go和c#写一些代码方便大家学习。
  准备工作:
  》两台服务器:192.168.0.49; 192.168.0.105.
  》需要在两台机器上安装好Docker
  》两台机器上镜像的拉取



docker pull nsqio/nsq
  我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。  



docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
DSC0001.png

  在105和49上启动nsqd, lookup的地址要写105



docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.105     --lookupd-tcp-address=192.168.0.105:4160


docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.49     --lookupd-tcp-address=192.168.0.105:4160
DSC0002.png

  到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin



docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.0.105:4161
DSC0003.png

  用浏览器看一下管理端:http://192.168.0.105:4171/nodes。找开Nodes标签里面有两个节点。192.168.0.105  和 192.168.0.49。其他的你可以点开看看。
DSC0004.png

  我用go语言 简单写一个发送信息的例子:
  go使用的库是 go-nsq 地址  : github.com/nsqio/go-nsq



func main() {
config := nsq.NewConfig()
// 随便给哪个ip发都可以
//w1, _ := nsq.NewProducer("192.168.0.105:4150", config)
w1, _ := nsq.NewProducer("192.168.0.49:4150", config)
err1 := w1.Ping()
if err1 != nil {
log.Fatal("should not be able to ping after Stop()")
return
}
defer w1.Stop()
topicName := "publishtest"
msgCount := 2
for i := 1; i < msgCount; i++ {
err1 := w1.Publish(topicName, []byte("测试测试publis test case"))
if err1 != nil {
log.Fatal("error")
}
}
}
  可以尝试给49和105都发送一次试试。再看一下我们的管理页面:
  publishtest被ip105和49都发送过。但是还没有channel:
DSC0005.png

  客户端golang代码



package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"strconv"
"time"
"sync"
)
func main() {
topicName := "publishtest"
msgCount := 2
for i := 0; i < msgCount; i++ {
//time.Sleep(time.Millisecond * 20)
        go readMessage(topicName, i)
}
//cleanup := make(chan os.Signal, 1)
cleanup := make(chan os.Signal)
signal.Notify(cleanup, os.Interrupt)
fmt.Println("server is running....")
quit := make(chan bool)
go func() {
select {
case <- cleanup:
fmt.Println("Received an interrupt , stoping service ...")
for _, ele := range consumers {
ele.StopChan <- 1
ele.Stop()
}
quit <- true
}
}()
<-quit
fmt.Println("Shutdown server....")
}
type ConsumerHandle struct {
q       *nsq.Consumer
msgGood int
}
var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0)
var mux *sync.Mutex = &sync.Mutex{}
func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {
msg := string(message.Body) + "  " + strconv.Itoa(h.msgGood)
fmt.Println(msg)
return nil
}
func readMessage(topicName string, msgCount int) {
defer func() {
if err := recover(); err != nil {
fmt.Println("error: ", err)
}
}()
config := nsq.NewConfig()
config.MaxInFlight = 1000
config.MaxBackoffDuration = 500 * time.Second
//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)
//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount) + "#ephemeral", config)
q, _ := nsq.NewConsumer(topicName, "ch"+strconv.Itoa(msgCount), config)
h := &ConsumerHandle{q: q, msgGood: msgCount}
q.AddHandler(h)
err := q.ConnectToNSQLookupd("192.168.0.105:4161")
//err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})
//err := q.ConnectToNSQD("192.168.0.49:4150")
//err := q.ConnectToNSQD("192.168.0.105:4415")
if err != nil {
fmt.Println("conect nsqd error")
log.Println(err)
}
mux.Lock()
consumers = append(consumers, q)
mux.Unlock()
<-q.StopChan
fmt.Println("end....")
}
  运行一下,会启动两个终端:
  用我们的发送代码发送信息,再看我们的客户端
DSC0006.png

  c# 使用的库为NsqSharp.Core 地址为:
  https://github.com/tonyredondo/NsqSharp
DSC0007.png

  简单客户端代码为:



class Program
{
static void Main()
{
// Create a new Consumer for each topic/channel
var consumerCount = 2;
var listC = new  List<Consumer>();
for (var i = 0; i < consumerCount; i++)
{
var consumer = new Consumer("publishtest", $"channel{i}" );
consumer.ChangeMaxInFlight(2500);
consumer.AddHandler(new MessageHandler());
consumer.ConnectToNsqLookupd("192.168.0.105:4161");
listC.Add(consumer);
}

var exitEvent = new ManualResetEvent(false);
Console.CancelKeyPress += (sender, eventArgs) => {
eventArgs.Cancel = true;
listC.ForEach(x => x.Stop());
exitEvent.Set();
};
exitEvent.WaitOne();

}
}
public class MessageHandler : IHandler
{
/// <summary>Handles a message.</summary>
public void HandleMessage(IMessage message)
{
string msg = Encoding.UTF8.GetString(message.Body);
Console.WriteLine(msg);
}
/// <summary>
/// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>.
/// </summary>
/// <param name="message">The failed message.</param>
public void LogFailedMessage(IMessage message)
{
// Log failed messages
        }
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-420904-1-1.html 上篇帖子: Mac OS X 上的安装nsq并使用 下篇帖子: Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quo
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表