如何用python写个串口通信的程序?
http://www.douban.com/group/topic/7400483/<script></script>
如何用python写个串口通信的程序?
http://t.douban.com/icon/u1466658-2.jpg
2009-07-26 16:08:18 来自: 如影随形(但行好事,莫问前程)
用tKinter做界面
[*]http://t.douban.com/icon/u1580710-2.jpg
2009-07-26 16:11:05 代码人 (敏而好学,杰而不傲)
http://pyserial.sourceforge.net/
使用pySerial模块
> 删除
[*]http://t.douban.com/icon/u1466658-2.jpg
2009-07-26 16:57:40 如影随形 (但行好事,莫问前程)
没有代码啊,我找到过一个http://currentlife.blog.sohu.com/53741351.html,他里面上来就调用 serial,我在我的IDLE中 import serial print serial.__doc__,显示没有这个模块。
> 删除
[*]http://t.douban.com/icon/u1466658-2.jpg
2009-07-26 17:18:53 如影随形 (但行好事,莫问前程)
搜狐博客 > 一二三事 > 日志 2007-07-05 | python 串口通信 标签: python 串口 Jeffu:
我用的是“线程轮寻”方式。
就是打开串口后,启动一个线程来监听串口数据的进入,有数据时,就做数据的处理(也可以发送一个事件,并携带接收到的数据)。
我没有用到串口处理太深的东西。
客户的原程序不能给你,不过我给你改一下吧。
里面的一些东西,已经经过了处理,要运行,可能你要自己改一下,把没有用的东西去掉。
我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过基本的东西已经有了。
=================================================================
#coding=gb18030
import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import socket;
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None;
self.alive = False;
self.waitEnd = None;
self.bFirstMethod = i_FirstMethod;
self.sendport = '';
self.log = Log;
self.output = Output;
self.port = Port;
self.re_num = None;
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait();
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set();
self.alive = False;
self.stop();
def start(self):
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = 9600;
self.l_serial.timeout = 2;
self.re_num = re.compile('\d');
try:
if not self.output is None:
self.output.WriteText(u'打开通讯端口\r\n');
if not self.log is None:
self.log.info(u'打开通讯端口');
self.l_serial.open();
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close();
self.l_serial = None;
if not self.output is None:
self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
return False;
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText(u'创建接收任务\r\n');
if not self.log is None:
self.log.info(u'创建接收任务');
self.waitEnd = threading.Event();
self.alive = True;
self.thread_read = None;
self.thread_read = threading.Thread(target=self.FirstReader);
self.thread_read.setDaemon(1);
self.thread_read.start();
return True;
else:
if not self.output is None:
self.output.WriteText(u'通讯端口未打开\r\n');
if not self.log is None:
self.log.info(u'通讯端口未打开');
return False;
def InitHead(self):
#串口的其它的一些处理
try:
time.sleep(3);
if not self.output is None:
self.output.WriteText(u'数据接收任务开始连接网络\r\n');
if not self.log is None:
self.log.info(u'数据接收任务开始连接网络');
self.l_serial.flushInput();
self.l_serial.write('\x11');
data1 = self.l_serial.read(1024);
except ValueError,ex:
if not self.output is None:
self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
self.SetStopEvent();
return;
if not self.output is None:
self.output.WriteText(u'开始接收数据\r\n');
if not self.log is None:
self.log.info(u'开始接收数据');
self.output.WriteText(u'===================================\r\n');
def SendData(self, i_msg):
lmsg = '';
isOK = False;
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030');
else:
lmsg = i_msg;
try:
#发送数据到相应的处理组件
pass
except Exception, ex:
pass;
return isOK;
def FirstReader(self):
data1 = '';
isQuanJiao = True;
isFirstMethod = True;
isEnd = True;
readCount = 0;
saveCount = 0;
RepPos = 0;
#read Head Infor content
self.InitHead();
while self.alive:
try:
data = '';
n = self.l_serial.inWaiting();
if n:
data = data + self.l_serial.read(n);
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data)==0x8E:
isQuanJiao = True;
continue;
if ord(data)==0x8F:
isQuanJiao = False;
continue;
if ord(data) == 0x80 or ord(data) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1;
if RepPos==0:
RepPos = self.output.GetInsertionPoint();
self.output.Remove(RepPos,self.output.GetLastPosition());
self.SendData(data1);
data1 = '';
continue;
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex);
self.waitEnd.set();
self.alive = False;
def stop(self):
self.alive = False;
self.thread_read.join();
if self.l_serial.isOpen():
self.l_serial.close();
if not self.output is None:
self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port);
if not self.log is None:
self.log.info(u'关闭通迅端口:[%d]' % self.port);
def printHex(self, s):
s1 = binascii.b2a_hex(s);
print s1;
#测试用部分
if __name__ == '__main__':
rt = ReadThread();
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting();
rt.stop();
else:
pass;
except Exception,se:
print str(se);
if rt.alive:
rt.stop();
print 'End OK .';
del rt;
> 删除
[*]http://t.douban.com/icon/u1466658-2.jpg
2009-07-26 17:20:45 如影随形 (但行好事,莫问前程)
这是个Tkinter界面,能不能把他们两个做个拼盘,
# -*- coding: utf-8 -*-
from Tkinter import *
import Tkinter
from tkMessageBox import *
class EntryDemo(Frame):
def __init__(self,parent):
Frame.__init__(self)
self.pack(expand=YES,fill=BOTH)
self.master.title("串口通信")
self.frame1=Frame(self)
self.frame1.pack(pady=5)
self.text1=Entry(self.frame1,name="显示")
self.text1.bind("<Return>",self.showContents)
self.text1.pack(side=LEFT,padx=5)
self.frame2=Frame(self)
self.frame2.pack(pady=5)
self.text2=Entry(self.frame2,name="显示")
self.text2.bind("<Return>",self.showContents)
self.text2.pack(side=LEFT,padx=5)
self.sendButton = Button(self.frame1, text="发送", command = self.pressedPlain)
self.sendButton.pack(side=LEFT, padx =5)
def showContents(self, event):
theName=event.widget.winfo_name()
theContents=event.widget.get()
showinfo("Message", theName +":"+ theContents)
def pressedPlain(self):
showinfo("Message","数据")
def main():
root = Tkinter.Tk()
widget = EntryDemo(root)
root.mainloop()
if __name__=="__main__":
main()
> 删除
[*]http://t.douban.com/icon/u1466658-2.jpg
2009-07-26 19:40:14 如影随形 (但行好事,莫问前程)
我下载了pyserial,解压开有example, serials,把serial文件夹里的.py文件拷贝到python安装目录下的lib文件家里面吗?
页:
[1]