def remot(self):
sql = "SELECT PORT FROM center.host_info WHERE flag='1' AND del_info!=0 AND ip='%s' AND host_name IN ('a','b','c')" % self.host
alldata = center(sql)
cent_port = []
if alldata != 0:
for i in alldata:
cent_port.append(str(i[0]))
return cent_port
else:
return cent_port
def local(self):#获取本地mysql有多少个实例运行
psinfo = os.popen("ps auxww|grep mysqld|grep -v root|grep -v grep").readlines()
local_port = []
if not psinfo:
return local_port
for i in psinfo:
for j in i.split("--"):
if j.find("port") != -1:
port = j.split("=")[1].strip()
local_port.append(port)
return local_port
def main(self):
local_port = self.local()
cent_port = self.remot()
cent_port.sort()
local_port.sort()
if local_port == cent_port and len(local_port) != 0 and len(cent_port) != 0:
print 0
else:
error = ""
diff_list = list(set(local_port) ^ set(cent_port))
for port in diff_list:
sql = "SELECT CONCAT(a.main_name,'_',b.sub_name,'_',c.app_name,'_',c.port) AS used FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND b.main_id='2' AND c.flag='1' AND c.del_info!=0 AND c.ip='%s' and c.port='%s'" % (self.host,port)
alldata = center(sql)
if error == "":
error = error + alldata[0][0]
else:
error = error + ";" + alldata[0][0]
print error
if __name__ == "__main__":
boss = check_port()
boss.main()
如果用第二种方法的话,很简单,用下面的函数可以实现这个端口测试:
ipmort socket
def test_port()
s = socket.socket()
address = '127.0.0.1'
port = 80
try:
s.connect((address,port))
s.close()
return True
except Exception,e:
return False