设为首页 收藏本站
查看: 461|回复: 0

[经验分享] python实现: protobuf解释器

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-29 08:42:05 | 显示全部楼层 |阅读模式
  之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。
  当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。
  https://developers.google.com/protocol-buffers/docs/reference/cpp-generated
  
  这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。
  使用python 2.7脚本进行文本的处理。
  程序分成4个模块:

  expression: 格式的解析


  symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。


  typecollection:基础类型定义和收集message等对象。


  builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。这次就不演示了。

  
  1 测试用protobuf文件。(来源于google示例)



package tutorial;
message Person {
required string name = 1;
required int32 id = 2 ;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phone = 4;
}
message AddressBook {
repeated Person person = 1;
}
  2 expression实现---最简单的扫描方法,分析每一个word。



# -*- coding: UTF-8 -*-
# pb_expression.py
import sys
import os
import string
import shutil
import io
import pb_symbol
class StringBuffer(object):
def __init__(self,src):
self.src  = src;         
pass;

def __del__(self):
self.buf = None;
pass;   
def OpenFile(self):
self.Data = open(self.src).read()   
pass;
class Expression(object):
desc_set = set(['required','optional','repeated'])
b_char_set = set(['A','B','C','D','E'
,'F','G','H','I','J'
,'K','L','M','N','O'
,'P','Q','R','S','T'
,'U','V','W','X','Y','Z'])
l_char_set = set (['a','b','c','d','e'
,'f','g','h','i','j'
,'k','l','m','n','o'
,'p','q','r','s','t'
,'u','v','w','x','y','z'])
digit_set = set([0,1,2,3,4,5,6,7,8,9])
equals_char = '='
space_char = ' '
openbrace_char = '{'
closebrace_char = '}'
semicolon_char = ';'
tab_char = chr(9)
newline_char = chr(10)
return_char = chr(13)
slash_char = chr(47)
ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,'\n','\r','\t','=',';',space_char])
empty_char_set = set ([space_char,tab_char,newline_char,return_char])
symbol_char_set = b_char_set | l_char_set | digit_set
all_char_set = symbol_char_set | ctl_char_set
def backup(self):
return self.index;
def restore(self,prevIndex):
self.index = prevIndex;

def forwardChar(self):
if(self.index < self.count):
self.index = self.index +1
def backChar(self):
if(self.index > 0):
self.index = self.index -1
def getchar(self):
if( self.index < self.count):
char = self.Buf.Data[self.index]
self.forwardChar()
return char
return None;
def skipComment(self):
bkIndex = self.backup();
while 1:
char = self.getchar()            
next_char = self.getchar()
if(char != self.slash_char or next_char != self.slash_char):
self.restore(bkIndex)
return;
while 1:
char = self.getchar()
if(char == None):
self.restore(bkIndex)
return;   
if(char == self.newline_char):
return;
def getSpecialChar(self,currentchar):  
while 1:
self.skipComment()
char = self.getchar();
if(char == None):
break;
else:
if(char == currentchar):
break;   
return char;     
def getVisibleChar(self):
while 1:
self.skipComment()
char = self.getchar();
if(char is None):
break;
else:
if(char not in self.empty_char_set):
break;   
return char;         
def getNextword(self):
word = None
got1st = 0
while 1:
self.skipComment()
char = self.getchar()
if(char == None):
break;
if(got1st == 0):
if(char not in self.ctl_char_set):
word = char
got1st = 1
else:
if(char in self.ctl_char_set):
self.backChar()
break;
else:
word = word + char
return word;
def do_enum_item(self,pbEnum):
memText = self.getNextword();
self.getSpecialChar(self.equals_char);
memValue = self.getNextword();
self.getSpecialChar(self.semicolon_char);  
pbEnum.append_Member(memText,memValue)
def do_enum_proc(self):  
symbol = self.getNextword();         
pbEnum = pb_symbol.PBEnum(symbol)
while 1:     
currentIndex = self.backup()
word = self.getNextword();      
if(word == None):
break;
self.restore(currentIndex)
self.do_enum_item(pbEnum)   
end_char_Index = self.backup();
char = self.getVisibleChar();      
if(char == self.closebrace_char):
break;
else:
self.restore(end_char_Index);   
self.symbol.append_enum(pbEnum)
def do_message_proc(self):  
symbol = self.getNextword();     
pbMsg = pb_symbol.PBMessage(symbol)
while 1:
currentIndex = self.backup()
word = self.getNextword();
if(word == None):
break;
if(word in self.token_set):      
subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False);
subSymbol.update_namespace(symbol);
self.restore(currentIndex);
subExp = Expression(self.Buf,subSymbol);
subExp.index = self.index;
subExp.do_expression();               
self.index = subExp.index
self.symbol.append_symbol(subSymbol)
pbMsg.enableSymbol = 1
else:
if(word in self.desc_set):
memType = self.getNextword();  
memText = self.getNextword();  
pbMsg.append_Member(word,memType,memText)
self.getSpecialChar(self.semicolon_char);
end_char_Index = self.backup();
char = self.getVisibleChar();   
if(char == self.closebrace_char):
break;
else:
self.restore(end_char_Index);
self.symbol.append_message(pbMsg)
def do_import_proc(self):   
self.getSpecialChar(self.semicolon_char);
def do_package_proc(self):
word = self.getNextword();   
self.symbol.update_namespace(word)
self.getSpecialChar(self.semicolon_char);
token_set = { 'message':do_message_proc
,'enum':do_enum_proc
,'import':do_import_proc
,'package':do_package_proc
}   
def do_expression(self):
while 1:   
current_index = self.backup();
token = self.getNextword();         
if(token == None):
break;      
if(token in self.token_set):
proc = self.token_set[token];
proc(self);
else:
self.restore(current_index)
break;
     def __init__(self,sBuf,symbol):
self.Buf = sBuf;
self.index = 0;
self.count = len(self.Buf.Data)
self.symbol = symbol;   
  
  3 symbol--定义对象类型以及层次



# -*- coding: UTF-8 -*-
# pb_symbol.py

import os
import string
import pb_typecollection
class PBEntity(object):
def __init__(self,entName,rtname):
self.entName = entName;   
self.orgName = entName   
self.rtname = rtname         
def outputDebug(self):
pass;
def create_impl(self,entity_indent,top_ns):
batch_list = list();
return batch_list;
def mem_include(self,entName):
return False;
class PBMessageMember(object):
def __init__(self,option,memType,memText):            
self.option = option;
self.memType = memType;
self.memText = memText;
def outputDebug(self):
print(self.option,self.memType,self.memText)
@property
def mem_option(self):
return self.option
@property
def mem_type(self):
return self.memType;
@property
def mem_text(self):
return self.memText      
class PBMessage(PBEntity):         
def __init__(self,entName):         
PBEntity.__init__(self,entName, entName );
self.members  = []            
self.enableSymbol = 0;   
self.rt_ns = '';
self.tpDict = None
@property
def Members(self):
return self.members           
def attach_tp_dict(self,tpDict):
self.tpDict = tpDict;     
def append_Member(self,option,memType,memText):      
msgMem = PBMessageMember(option,memType,memText)
self.members.append(msgMem)
def enable_Symbol(self,enable):
self.enableSymbol = enable;
def outputDebug(self,ns):
print(ns,'message',self.entName);
for entMsg in self.members:                  
entMsg.outputDebug();
print('');      
def attach_tp_dict(self,tpDict):
self.tpDict = tpDict;            
def set_rt_ns(self,rt_entity_full_path):
self.rt_ns = rt_entity_full_path
def mem_include(self,entName):
for entMsg in self.members:   
if(entName == entMsg.memType):
return True;
return False;
def detect_request(self):
if(self.members.count > 0 ):
return True;
return False;
class PBEnumMember(object):
def __init__(self,memText,memValue):                        
self.memText = memText;
self.memValue = memValue;           
def outputDebug(self):
print(self.memText,self.memValue)
class PBEnum( PBEntity):
def __init__(self,entName):
PBEntity.__init__(self,entName,entName);
self.members  = []
def append_Member(self,memText,memValue):      
msgMem = PBEnumMember(memText,memValue)
self.members.append(msgMem)      
def outputDebug(self,ns):
print(ns,'enum',self.entName);
for entEnum in self.members:                  
entEnum.outputDebug();
print('');      
class Symbol(object):
def __init__(self,tpDict,fullpath,rooted):
self.namespace = ''
self.tpDict = tpDict
self.rooted = rooted
self.entity_full_path = fullpath
self.rt_entity_full_path = fullpath            
self.entitylist = []
self.containerlist = []
def __del__(self):
pass;
def update_namespace(self,namespace):            
self.namespace = namespace;
if(self.rooted == False):
if(self.entity_full_path == ''):
self.entity_full_path = namespace     
self.rt_entity_full_path = namespace
else:      
self.entity_full_path = '%s_%s' %(self.entity_full_path,namespace)
self.rt_entity_full_path = '%s_%s' %(self.entity_full_path,namespace)
def append_type_dict(self,entity,isMsg):
if(isMsg == True):
if(self.entity_full_path == ''):
self.tpDict.insert_type(entity.entName                                            
,entity.rtname
,entity
,'')
else:
self.tpDict.insert_type(entity.entName
,'%s::%s' % (self.rt_entity_full_path, entity.rtname)
,entity
,'')
else:
if(self.entity_full_path == ''):
self.tpDict.insert_type(entity.entName                                            
,entity.rtname
,entity
,entity.rtname)
else:
self.tpDict.insert_type(entity.entName
,'%s::%s' % (self.rt_entity_full_path, entity.rtname)
,entity
,'%s::%s' % (self.entity_full_path, entity.rtname))

def append_message(self,msg):            
self.entitylist.append(msg)
self.containerlist.append(msg)
msg.attach_tp_dict(self.tpDict);
if(self.rt_entity_full_path == ''):
msg.set_rt_ns(self.rt_entity_full_path)
else:
msg.set_rt_ns(self.rt_entity_full_path + '_')
self.append_type_dict(msg,True)
def append_enum(self,enum):
self.entitylist.append(enum)
self.append_type_dict(enum,False)
def append_symbol(self,symbol):
self.entitylist.append(symbol)
self.containerlist.append(symbol)
def outputDebug(self,ns):
for entity in self.entitylist:                  
entity.outputDebug(ns +'::'+self.namespace);
def query_entitylist(self):
return self.entitylist;
def query_containerlist(self):
return self.containerlist;
def query_pb_ns(self):
return self.namespace;
def mem_include(self,entName):
for entity in self.entitylist:     
if(entity.mem_include(entName) == True):
return True;
return False;

class PBProxy(object):
def __init__(self,entity):
self.entity = entity
@property
def enableSymbol(self):
return self.entity.enableSymbol
def mem_include(self,entName):      
return self.entity.mem_include(entName)
def create_impl(self,entity_indent,top_ns):
return self.entity.create_impl(entity_indent,top_ns)            
@property
def entName(self):
return self.entity.entName;      
@property
def rtname(self):
return self.entity.rtname;
@property
def orgName(self):
return self.entity.orgName;
@property
def members(self):
return self.entity.members;
@property
def rt_ns(self):
return self.entity.rt_ns;      
@property
def namespace(self):
return self.entity.namespace;      
@property
def rooted(self):
return self.entity.rooted;
@property
def entity_full_path(self):
return self.entity.entity_full_path;      
@property
def rt_entity_full_path(self):
return self.entity.rt_entity_full_path;
@property
def entitylist(self):
return self.entity.entitylist     
@property
def containerlist(self):
return self.entity.containerlist   
@property
def tpDict(self):
return self.entity.tpDict;
def detect_request(self):
return self.entity.detect_request()
@property
def Members(self):
return self.entity.members
@property
def mem_option(self):
return self.entity.mem_option
@property
def mem_type(self):
return self.entity.mem_type;
@property
def mem_text(self):
return self.entity.mem_text      
  4 typecollection



# -*- coding: UTF-8 -*-
# pb_typecollection.py
import os
import pb_symbol

class typeDict(object):
op_req_desc = 'required'
op_opt_desc = 'optional'
op_rep_desc = 'repeated'
def __init__(self):
self.collection  = dict()
self.insert_type('int32','__int32',pb_symbol.PBEntity('int32','int32'),'')
self.insert_type('int64','__int64',pb_symbol.PBEntity('int64','int64'),'')
self.insert_type('uint32','unsigned int',pb_symbol.PBEntity('uint32','uint32'),'')
self.insert_type('bool','bool',pb_symbol.PBEntity('bool','bool'),'')
self.insert_type('float','float',pb_symbol.PBEntity('float','float'),'')
self.insert_type('double','double',pb_symbol.PBEntity('double','double'),'')
self.insert_type('string','const char*',pb_symbol.PBEntity('string','string'),'')
self.insert_type('bytes','const char*',pb_symbol.PBEntity('bytes','bytes'),'')        

def insert_type(self, entName, rtType,entity,orgType):        
self.collection[entName] = (rtType,entity,orgType);        
def output_debug(self):
print('type collection')
for item in self.collection.items():
print(item);

  5 测试脚本



# -*- coding: UTF-8 -*-
import pb_symbol
import pb_expression
import pb_typecollection
if __name__ == '__main__':
pb_file = 'google_tutorial.proto'
sBuf = pb_expression.StringBuffer(pb_file);     
tpDict = pb_typecollection.typeDict()
symbol = pb_symbol.Symbol(tpDict,'',True);
try:
sBuf.OpenFile();
exp = pb_expression.Expression(sBuf,symbol);
exp.do_expression();
symbol.outputDebug('');   
tpDict.output_debug();      
except Exception as exc:     
print("%s",exc);
print("done");
  
  6 输出
  命名空间:::tutorial::Person
  类型名称:PhoneType
  ('::tutorial::Person', 'enum', 'PhoneType')   
('MOBILE', '0')
('HOME', '1')
('WORK', '2')
  ('::tutorial::Person', 'message', 'PhoneNumber')
('required', 'string', 'number')
('optional', 'PhoneType', 'type')
  ('::tutorial', 'message', 'Person')
('required', 'string', 'name')
('required', 'int32', 'id')
('optional', 'string', 'email')
('repeated', 'PhoneNumber', 'phone')
  ('::tutorial', 'message', 'AddressBook')
('repeated', 'Person', 'person')
  
  type collection
('PhoneNumber', ('Person::PhoneNumber', <pb_symbol.PBMessage object at 0x02B9DED0>, ''))

('int32', ('__int32', <pb_symbol.PBEntity object at 0x02BE3F70>, ''))
('string', ('const char*', <pb_symbol.PBEntity object at 0x02BEE0F0>, ''))
('double', ('double', <pb_symbol.PBEntity object at 0x02BEE0B0>, ''))
('float', ('float', <pb_symbol.PBEntity object at 0x02BEE070>, ''))
('bytes', ('const char*', <pb_symbol.PBEntity object at 0x02BEE130>, ''))
('Person', ('Person', <pb_symbol.PBMessage object at 0x02BEE210>, ''))

('bool', ('bool', <pb_symbol.PBEntity object at 0x02BEE050>, ''))
('PhoneType', ('Person::PhoneType', <pb_symbol.PBEnum object at 0x02BEE450>, 'Person::PhoneType'))

('int64', ('__int64', <pb_symbol.PBEntity object at 0x02BE3FB0>, ''))
('uint32', ('unsigned int', <pb_symbol.PBEntity object at 0x02BE3FF0>, ''))
('AddressBook', ('AddressBook', <pb_symbol.PBMessage object at 0x02BEE7B0>, ''))
  
  参考
  protobuf的git地址:https://github.com/google/protobuf

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144773-1-1.html 上篇帖子: Python开发入门与实战15-IIS部署 下篇帖子: Python开发入门与实战20-微信开发配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表