.Net环境下操作IBM WebShpere MQ
大约在两年前项目使用了IBM MQ,本人积累了部分在.Net环境下操作IBM MQ的经验。现将经验与大家分享。
IBM WebShpere MQ 简单介绍:
具体的详细信息大家可以在IBM的网站和网上有许多的学习资料。
IBM MQ是实现了消息队列的一个中间件,它可以运行在现在所有流行的平台上。在我们的项目中主要使用的是AS400和UNIX平台上。
在IBM MQ中主要分为以下几个对象:
Queue Manager
| 队列管理器
| 主要负责管理队列、通道等,类似与Oracle中的Oracle实例的概念,在一台服务器中可以定义多个Queue Manager。
| Queue
| 队列
| 是存放Message的对象,是我们主要操作的对象。类似与Oracle中的表的概念。
| Message
| 消息
| 实际放入Queue中的消息,该消息可以存储字符串、Stream等对象。
| Channel
| 通道
| 是我们的应用以及两个Queue Manager操作的链接对象
| Process
| 过程
|
|
Queue的分类:
Local Queue
| 本地队列
| 主要存储消息的对象
| Remote Queue
| 远程队列
| 和另外一个Queue Manager通讯的队列
| Mode Queue
| 模板队列
| 建立一个队列模板,通过这个模板可以建立与模板相同属性的动态队列。
| Dynamic Queue
| 动态对了(临时队列)
| 可以在程序中创建和删除的临时队列
|
Channel类型:
在我们介绍中只使用了类型为SVRCONN的通道。
.Net连接Queue Manager的关键要素:
.Net的程序需要以下几个关键要素才能连接的一个Queue Manager上。
1、 Address & Port
MQ的服务器地址以及要连接的Queue Manager的监听端口,默认端口1414。
2、 Queue Manager Name
要连接的Queue Manager的名字,区分大小写,默认全部大写。
3、 Channel Name
SVRCONN类型的Channel的名字,需要注意的是该Channel的属性MCAUSER必须设置为一个足够权限的系统用户,否则不能连接成功。
4、 CCSID
CCSID是Queue Manager的编码字符集,需要Queue Manager的CCSID与你的.Net操作系统的CCSID互相兼容或者一致才能成功连接。默认情况下UNIX平台的Queue Manager的CCSID为819,而简体中文版的Windows的CCSID为1381。这两个编码字符集是不能兼容的,所以需要调整。
调整有两种方式:
1) 调整Queue Manager的CCSID,调整成与1381兼容的即可,不一定需要调整成1381
2) 在不能调整Queue Manager时需要配置Windows的环境变量,在Windows的环境变量中增加MQCCSID的环境变量,值需要与Queue Manager的CCSID一致或兼容。
在我们介绍的例子中使用环境如下:
Server:Sun OS 5.9
Address
| 192.168.128.115
| Port
| 1414(默认端口)
| Queue Manager Name
| SGS.MGR
| Channel Name
| SGS.CHANNEL
|
Queue Manager的属性如下(注意红色部分):
Display Queue Manager details.
DESCR( ) DEADQ( )
DEFXMITQ( ) CHADEXIT( )
CLWLEXIT( ) CLWLDATA( )
REPOS( ) REPOSNL( )
COMMANDQ(SYSTEM.ADMIN.COMMAND.QUEUE) QMNAME(SGS.MGR)
CRDATE(2007-04-02) CRTIME(16.24.23)
ALTDATE(2007-04-02) ALTTIME(16.24.23)
QMID(SGS.MGR_2007-04-02_16.24.23) TRIGINT(999999999)
MAXHANDS(256) MAXUMSGS(10000)
AUTHOREV(DISABLED) INHIBTEV(DISABLED)
LOCALEV(DISABLED) REMOTEEV(DISABLED)
PERFMEV(DISABLED) STRSTPEV(ENABLED)
CHAD(DISABLED) CHADEV(DISABLED)
CLWLLEN(100) MAXMSGL(4194304)
CCSID(819) MAXPRTY(9)
CMDLEVEL(510) PLATFORM(UNIX)
SYNCPT DISTL(YES)
Channel的属性如下(注意红色部分):
Display Channel details.
CHANNEL(SGS.CHANNEL) CHLTYPE(SVRCONN)
TRPTYPE(TCP) DESCR( )
SCYEXIT( ) MAXMSGL(4194304)
SCYDATA( ) HBINT(300)
MCAUSER(mqm) ALTDATE(2007-04-02)
ALTTIME(16.30.23)
SENDEXIT( )
RCVEXIT( )
SENDDATA( )
RCVDATA( )
由于Queue Manager的CCSID使用819,故增加系统的环境变量:
环境准备:
1、 安装IBM WebSphere MQ Client for Windows。(MQ客户端)
2、 安装WebSphere MQ classes for Microsoft .NET.msi。(.Net插件)
3、 在你的.Net项目中引用 amqmdnet.dll,该dll在.Net插件安装目录中可以找到。
开始编写代码:
连接Queue Manager
//MQ管理实例
private MQQueueManager mqQMgr=null;// MQQueueManager instance
private void CreateMng()
{
if(this.mqQMgr==null)
{
try
{ this.log("Create Queue Manager");
//通过MQ管理名创建MQ管理实例
//判断是远程连接还是本地连接
if(!this.checkBox1.Checked) //本地连接 (服务器连接)
this.mqQMgr = new MQQueueManager(this.tbQMng.Text);
else //远程连接(客户端连接)
{
string hostname = this.tbConnection.Text; //Server地址
string channel = this.tbChannel.Text; //Channel 名
string qManager = this.tbQMng.Text; //Queue Manager名
//初始化环境变量
MQEnvironment.Hostname = hostname;
MQEnvironment.Channel = channel;
MQEnvironment.Port = Int32.Parse(this.textBox1.Text); //端口号
this.mqQMgr =new MQQueueManager(qManager);
}
this.log("Create Queue Manager Sucess");
}
catch(MQException ex)
{
this.mqQMgr=null;
this.log("Create Queue Manager Failed!"+ex.Message+"reason:" + Convert.ToString( ex.Reason,16));
}
}
}
向本地队列(SGS.IN.01)放入消息:
1private void button1_Click(object sender, System.EventArgs e)
2
3 {
4
5 //定义队列
6
7 MQQueue mqQueue=null;
8
9
10
11 this.CreateMng(); //连接Queue Manager
12
13
14
15 try
16
17 {
18
19 if(this.mqQMgr==null)
20
21 return;
22
23
24
25 this.log("Create MQQueue");
26
27
28
29 //通过MQ管理创建队列实例
30
31 mqQueue=this.mqQMgr.AccessQueue(this.tbQName.Text,MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INQUIRE);
32
33//本例中this.tbQName.Text=”SGS.IN.01”
34
35
36
37
38
39 this.log("Create MQQueue Sucess");
40
41
42
43 }
44
45 catch(MQException ex)
46
47 {
48
49
50
51 this.log("Create MQQueue Failed!!"+ex.Message+"reason:" + Convert.ToString( ex.Reason,16));
52
53 return;
54
55 }
56
57
58
59 //要放入队列中的消息
60
61MQMessage mqMsg =null;
62
63 try
64
65 {
66
67 int count=Int32.Parse(this.tbCount.Text);
68
69
70
71 //创建消息操作实例
72
73 MQPutMessageOptions mqPutMsgOpts = new MQPutMessageOptions();
74
75
76
77
78
79 for(int i=0;i<count;i++)
80
81 {
82
83 this.log("Create Message");
84
85 //创建消息实例
86
87 mqMsg = new MQMessage();
88
89
90
91 System.Xml.XmlDocument doc = new System.Xml.XmlDocument();
92
93 doc.Load(this.tbMsg.Text);
94
95 string msg=doc.OuterXml; //以字符串的形式放入一个XML
96
97
98
99 mqMsg.WriteString(msg);
100
101 //制定消息格式为String
102
103 mqMsg.Format = MQC.MQFMT_STRING;
104
105
106
107 this.log("Create Message Sucess. Message:"+msg);
108
109
110
111
112
113 this.log("Begin Put Message");
114
115 //发送消息
116
117 mqQueue.Put(mqMsg,mqPutMsgOpts);
118
119 this.log("Put Message Sucess");
120
121
122
123 }
124
125 }
126
127 catch (Exception ex)
128
129 {
130
131 this.log(ex.Message);
132
133 }
134
135 }
136
从本地队列(SGS.IN.01)获取消息
1 private void button2_Click(object sender, System.EventArgs e)
2
3 {
4
5 //定义队列
6
7 MQQueue mqQueue=null;
8
9
10
11 this.CreateMng();
12
13
14
15 try
16
17 {
18
19 this.log("Create MQQueue");
20
21 //通过MQ管理实例创建队列实例,制定队列操作类型
22
23 mqQueue= this.mqQMgr.AccessQueue(this.tbQName.Text,MQC.MQOO_INPUT_AS_Q_DEF+MQC.MQOO_FAIL_IF_QUIESCING);
24
25 this.log("Create MQQueue Sucess");
26
27 }
28
29 catch (Exception ex)
30
31 {
32
33 this.log("Create MQQueue Failed!!"+ex.Message);
34
35 return;
36
37 }
38
39
40
41 //定义队列消息
42
43 MQMessage mqMsg =null;
44
45 try
46
47 {
48
49 int count=Int32.Parse(this.tbCount.Text);
50
51 //定义消息操作
52
53 MQGetMessageOptions mqGetMsgOpts=null;
54
55
56
57 //队列消息操作类型实例化
58
59 mqGetMsgOpts = new MQGetMessageOptions();
60
61 //设置消息操作超时
62
63 mqGetMsgOpts.WaitInterval=15000;
64
65
66
67 bool isContinue = true;
68
69 while(isContinue && count>0)
70
71 {
72
73 //实例化消息
74
75 mqMsg = new MQMessage();
76
77 try
78
79 {
80
81 this.log("Begin get message");
82
83 //从消息队列得到消息
84
85 mqQueue.Get(mqMsg,mqGetMsgOpts);
86
87 this.log("Begin get message Success");
88
89
90
91 //比较消息格式
92
93 if(mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
94
95 {
96
97 //MQC.MQFMT_RF_HEADER_2
98
99 this.log("Message:"+mqMsg.ReadString(mqMsg.MessageLength));
100
101 }
102
103 else //if (mqMsg.Format.CompareTo(MQC.MQFMT_RF_HEADER_2)==0)
104
105 {
106
107 this.log("MessageType:"+mqMsg.Format+"\r\n");
108
109 string msg=mqMsg.ReadString(mqMsg.MessageLength);
110
111
112
113
114
115 this.log(msg);
116
117 }
118
119// else
120
121// {
122
123// this.log("MessageType:"+mqMsg.Format+"\r\n");
124
125// byte[] buffer = new byte[mqMsg.MessageLength];
126
127// buffer=mqMsg.ReadBytes(mqMsg.MessageLength);
128
129//
130
131// this.log("byteMessage:"+System.Text.Encoding.UTF8.GetString(buffer));
132
133// }
134
135 }
136
137 catch (MQException mqe)
138
139 {
140
141 // 判断异常原因,队列中没有消息
142
143 if ( mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE )
144
145 {
146
147 this.log("Have no message "+mqe.Message);
148
149 isContinue=false;
150
151 }
152
153
154
155 // treat truncated message as a failure for this sample
156
157 if ( mqe.Reason == MQC.MQRC_TRUNCATED_MSG_FAILED )
158
159 {
160
161
162
163 isContinue=true;
164
165 }
166
167 }
168
169
170
171 count--;
172
173 }
174
175 }
176
177 catch (Exception ex)
178
179 {
180
181 this.log(ex.Message);
182
183 }
184
185 }
186
187
188
189关闭Queue Manager的连接:
190
191 private void button3_Click(object sender, System.EventArgs e)
192
193 {
194
195 if (this.mqQMgr!=null)
196
197 {
198
199 this.mqQMgr.Disconnect();
200
201 this.mqQMgr=null;
202
203 }
204
205 }
206
207
现介绍这些,以后我会对模板对了和动态队列做一些介绍,希望对大家有所帮助。 |