设为首页 收藏本站
查看: 575|回复: 0

[经验分享] Python多线程 VS JAVA多线程

[复制链接]

尚未签到

发表于 2017-5-3 09:59:47 | 显示全部楼层 |阅读模式
      最近在学习Python,在网上遇到一哥们问一个关于用Python多线程的问题。具体就是读取一个文件中的IP信息,然利用百度地图open api获取IP所在地的经纬度,最后再将取回的结果分类为“请求成功”和“请求失败”分别存入两个文件中。由于自己也在学,所以也就拿来做了做,并分别用Python3和Java实现。
    稍稍分析了一下需求能看的出来里面性能瓶颈有两个:
    1、一个是查询百度地图OPEN API;
    2、写磁盘文件;
    于是想到把这两部分分开处理,自然的想到了生产-消费者模式来实现。
    对于IP文件信息格式如下(我的测试文件有41255行数据):
   
  61.0.0.0__61.255.255.255__亚洲____
  61.157.0.0__61.157.255.255__四川____
  61.139.0.0__61.139.127.255__四川____
  61.130.93.0__61.130.254.255__浙江____
    下面是Python3实现代码:

#coding:utf-8
import urllib.request
import json
import threading
import queue
FILE_ENCODING = "utf-8"
successList = []
failList = []
successPath = r"success.txt"
failPath = r"failed.txt"
ipPath = "ip.txt"
queryCount = 5
threadCount = 20
lock = threading.RLock()
separator = "|"
saveCount = 100
successIndex = 0
failIndex = 0
theCount = 0

queryQueue = queue.Queue()
processedQueue = queue.Queue()
class IPStruct(object):
def __init__(self):
self.startIP = None
self.endIP = None
self.address = ""
self.position = ""
self.status = ""

class QueryIPLocation(object):
_retry = 5
def query(self, objList):
for obj in objList:
url = self.get_url(obj)#.encode('utf-8')
#            print("url:%s" % url)
for j in range(0, self._retry + 1):
try:
result = urllib.request.urlopen(url,timeout=5)
except Exception:
if j == self._retry:
obj.status = "Failed"
break
continue
if result.getcode() != 200:
if j == self._retry:
obj.status = "Failed"
else:
jsonRet = json.loads(result.read().decode())
if jsonRet['status'] == 0:
obj.status = "SUCCESS"
self.proc_ok(jsonRet, obj)
break
elif jsonRet['status'] == 1:
obj.status = "SERVER_ERROR"
break
elif jsonRet['status'] == 2:
obj.status = "INVALID_PARAMETER"
break
elif jsonRet["status"] == 5:
obj.status = "INVALID_AK"
break
elif jsonRet["status"] == 101:
obj.status = "DISABLE_SERVICE"
break
elif jsonRet['status'] == 345:
if j == self._retry:
obj.status = "MINUTE_QUERY_LIMIT"
elif jsonRet["status"] == 355:
obj.status = "DAY_QUERY_LIMIT"
break
def get_url(self, obj):
base = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll"
return base + "&ip=" + str(obj.startIP)
def proc_ok(self, result, obj):
point = result["content"]["point"]
address = result["content"]["address"]
obj.address = address
obj.position = str(point['x']) + "," + str(point['y'])
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self,queryQueue,processedQueue):
threading.Thread.__init__(self)
self.queryQueue = queryQueue
self.processedQueue = processedQueue
def run(self):
while True:
queryList = self.queryQueue.get()
app = QueryIPLocation()
try:
app.query(queryList)
for item in queryList:
self.processedQueue.put(item)
except Exception:
print("there has error..........")
raise
finally:
self.queryQueue.task_done()
class ThreadFile(threading.Thread):
"""Threaded save File"""
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
global successList
global failList
global successIndex
global failIndex
global theCount
while True:
item = self.queue.get()
if item.status == "SUCCESS":
lock.acquire()
try:
theCount += 1
print("theCount:",theCount)
successList.append(item)
successIndex += 1
if successIndex == saveCount:
save_result(successPath,resultSequence=successList)
successIndex = 0
del successList[:]
except Exception:
raise
finally:
lock.release()
else:
lock.acquire()
try:
theCount += 1
print("theCount:",theCount)
failList.append(item)
failIndex += 1
if failIndex == saveCount:
save_result(failPath,resultSequence=failList)
failIndex = 0
del failList[:]
except Exception:
raise
finally:
lock.release()
self.queue.task_done()
def save_result(filePath, resultSequence=[]):
"""save the success result to successPath"""
line = ""
file = open(filePath, "a")
if not file:
print("error open file %s to write" % filePath)
for item in resultSequence:
if item.status == "SUCCESS":
line += item.startIP + separator + item.endIP + separator + item.address + separator + item.position + "\n"
else:
line += item.startIP + separator + item.endIP + separator + item.address + separator + item.status + "\n"
file.write(line)
file.close()
def get_ip_iter(file):
ipList = []
for line in file:
addr = line.split("__")
count = len(addr)
if count <= 2 or "." not in addr[0] or "." not in addr[1]:
continue
obj = IPStruct()
obj.startIP = addr[0]
obj.endIP = addr[1]
obj.address = addr[2]
ipList.append(obj)
if len(ipList) == queryCount:
yield ipList
del ipList[:]
else:
if len(ipList) > 0:
yield ipList
def main():
print("start read file")
ipFile = open(ipPath,"r",encoding="utf-8")
if not ipFile:
print("errror open file %s" % ipPath)
return
iterIpFile = get_ip_iter(ipFile)
for i in range(threadCount):
t = ThreadUrl(queryQueue,processedQueue)
t.setDaemon(True)
t.start()
for queryList in iterIpFile:
queryQueue.put(list(queryList))
for i in range(2):
dt = ThreadFile(processedQueue)
dt.setDaemon(True)
dt.start()
queryQueue.join()
processedQueue.join()
save_result(successPath,resultSequence=successList)
save_result(failPath,resultSequence=failList)
    使用 time python3 ***.py 执行:

real    15m46.962s
user    0m38.042s
sys     1m0.162s
    可以看到运行时间15分46秒
 
    由于自己以前一直学的是Java,随即就对用Python和用Java的多线程来实现谁的执行速度会更高一些产生一些性趣,索性就会就Java也实现了同样的需求,并使用了同样的线程数。
    以下是JAVA实现的代码:
    

package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.sf.json.JSONObject;
import util.HttpUtil;
import util.JSONUtil;
public class IPQueryTest {

public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
BlockingQueue<IPStruct> fileQueue = new LinkedBlockingQueue<IPStruct>();
BlockingQueue<IPStruct> ipQueue = new LinkedBlockingQueue<IPStruct>();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ThreadReadFile(fileQueue));
for(int i=0;i<20;i++){
exec.execute(new ThreadQueryIPLocation(fileQueue,ipQueue));
}
for(int i=0;i<2 ;i++){
exec.execute(new ThreadWriteFile(ipQueue));
}
while(true){
Thread.sleep(10);
if(fileQueue.size() == 0 && ipQueue.size() == 0){
ThreadWriteFile.saveResult(ThreadWriteFile.SUCC_PATH, ThreadWriteFile.succIPStructs);
ThreadWriteFile.saveResult(ThreadWriteFile.FAIL_PATH, ThreadWriteFile.failIPStructs);
long stend = System.currentTimeMillis();
System.out.println("total cost:" + (stend - start));
break;
}
/*else{
System.out.println("fileQueue size:" + fileQueue.size() + ",ipQueue size:" + ipQueue.size());
}*/
}
}
}
class IPStruct {
private String startIP;
private String endIP;
private String address;
private String position;
private String status;
public String getStartIP() {
return startIP;
}
public void setStartIP(String startIP) {
this.startIP = startIP;
}
public String getEndIP() {
return endIP;
}
public void setEndIP(String endIP) {
this.endIP = endIP;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getPosition() {
return position;
}
public void setPosition(String position) {
this.position = position;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
class ThreadReadFile implements Runnable{
private final BlockingQueue<IPStruct> queue;
private static final String FILE_PATH = "D:\\ip.txt";
ThreadReadFile(BlockingQueue<IPStruct> q){
this.queue = q;
}
@Override
public void run() {
try{
produceIPS(queue);
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
public void produceIPS(BlockingQueue<IPStruct> queue)throws InterruptedException{
File file = null;
BufferedReader reader = null;
try{
file = new File(FILE_PATH);
reader = new BufferedReader(new FileReader(file));
String line = null;
IPStruct ipStruct = null;
while((line = reader.readLine()) != null){
ipStruct = formatIPStruct(line);
if(null != ipStruct){
queue.add(ipStruct);
}
}
}catch(IOException e){
e.printStackTrace();
}finally{
if(null != reader){
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public IPStruct formatIPStruct(String line){
String[] addr = line.split("__");
if(addr.length <= 2 || addr[0].indexOf(".")== -1 || addr[1].indexOf(".") == -1){
return null;
}
IPStruct ipStruct = new IPStruct();
ipStruct.setStartIP(addr[0]);
ipStruct.setEndIP(addr[1]);
ipStruct.setAddress(addr[2]);
return ipStruct;
}
}
class ThreadQueryIPLocation implements Runnable{
private final BlockingQueue<IPStruct> inputQueue;
private final BlockingQueue<IPStruct> outputQueue;
ThreadQueryIPLocation(BlockingQueue<IPStruct> inputQueue,BlockingQueue<IPStruct> outputQueue){
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
try{
while(true){
IPStruct ipStruct = inputQueue.take();
queryIPLocation(ipStruct);
outputQueue.add(ipStruct);
}
}
catch(InterruptedException ex){
ex.printStackTrace();
}
}
private static final int RETRY = 5;
private static final String BASE_URL = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll";
public void queryIPLocation(IPStruct obj) {
String url = null;
String result = null;
url = BASE_URL + "&ip=" + obj.getStartIP();
//System.out.println("url-->" + url);
for (int j = 0; j <= RETRY; j++) {
try {
result = HttpUtil.getJsonStringFromUrl(url);
} catch (Exception e) {
if (j == RETRY) {
obj.setStatus("Failed");
break;
}
continue;
}
if (null != result) {
JSONObject jsonObject = JSONUtil.toJSONObject(result);
Integer status = (Integer) jsonObject.get("status");
if (status == 0) {
obj.setStatus("SUCCESS");
procOK(jsonObject, obj);
break;
} else if (status == 1) {
obj.setStatus("SERVER_ERROR");
break;
} else if (status == 5) {
obj.setStatus("INVALID_AK");
break;
} else if (status == 101) {
obj.setStatus("DISABLE_SERVICE");
break;
} else if (status == 345) {
if (j == RETRY) {
obj.setStatus("MINUTE_QUERY_LIMIT");
}
} else if (status == 355) {
obj.setStatus("DAY_QUERY_LIMIT");
break;
}
}
}
}
public void procOK(JSONObject result, IPStruct obj) {
JSONObject contentJSON = JSONUtil.toJSONObject(result.get("content"));
JSONObject pointJSON = JSONUtil.toJSONObject(contentJSON.get("point").toString());
obj.setAddress((String) contentJSON.get("address"));
obj.setPosition(pointJSON.get("x") + "," + pointJSON.get("y"));
}
}
class ThreadWriteFile implements Runnable{
public static final List<IPStruct> succIPStructs = new ArrayList<IPStruct>();
public static final List<IPStruct> failIPStructs = new ArrayList<IPStruct>();
public static final String SEPARATOR = "|";
public static final int FILE_SAVE_SIZE = 100;
public static final String SUCC_PATH = "D:\\success.txt";
public static final String FAIL_PATH = "D:\\fail.txt";
public static AtomicInteger counter = new AtomicInteger(0);

private final BlockingQueue<IPStruct> queue;
ThreadWriteFile(BlockingQueue<IPStruct> queue){
this.queue = queue;
}
@Override
public void run() {
try{
while(true){
IPStruct ipStruct = queue.take();
count();
if(ipStruct.getStatus() == "SUCCESS"){
saveSuccIPStruct(ipStruct);
}
else{
saveFailIPStruct(ipStruct);
}
}
}
catch(InterruptedException ex){
ex.printStackTrace();
}
}
private synchronized void count(){
counter.addAndGet(1);
System.out.println("count:" + counter.get());
}
public static void writeResult(String lines,String path){
try {
if(null == lines || lines.trim().equals("")){
return;
}
FileWriter writer = new FileWriter(path,true);
writer.write(lines);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public synchronized void saveSuccIPStruct(IPStruct ipStruct){
succIPStructs.add(ipStruct);
if(succIPStructs.size() >= FILE_SAVE_SIZE){
StringBuffer lines = new StringBuffer();
for(IPStruct succIPStruct : succIPStructs){
lines.append(succIPStruct.getStartIP()).append(SEPARATOR);
lines.append(succIPStruct.getEndIP()).append(SEPARATOR);
lines.append(succIPStruct.getAddress()).append(SEPARATOR);
lines.append(succIPStruct.getPosition()).append("\n");
}
writeResult(lines.toString(),SUCC_PATH);
succIPStructs.clear();
}
}
public synchronized void saveFailIPStruct(IPStruct ipStruct){
failIPStructs.add(ipStruct);
if(failIPStructs.size() >= FILE_SAVE_SIZE){
StringBuffer lines = new StringBuffer();
for(IPStruct failIPStruct : failIPStructs){
lines.append(failIPStruct.getStartIP()).append(SEPARATOR);
lines.append(failIPStruct.getEndIP()).append(SEPARATOR);
lines.append(failIPStruct.getAddress()).append(SEPARATOR);
lines.append(failIPStruct.getStatus()).append("\n");
}
writeResult(lines.toString(),FAIL_PATH);
failIPStructs.clear();
}
}
public static void saveResult(String path,List<IPStruct> ipStructs){
StringBuffer lines = new StringBuffer();
for(IPStruct ipStruct : ipStructs){
lines.append(ipStruct.getStartIP()).append(SEPARATOR);
lines.append(ipStruct.getEndIP()).append(SEPARATOR);
lines.append(ipStruct.getAddress()).append(SEPARATOR);
if(ipStruct.getStatus() == "SUCCESS"){
lines.append(ipStruct.getPosition()).append("\n");
}
else{
lines.append(ipStruct.getStatus()).append("\n");
}
}
writeResult(lines.toString(),path);
}
}

    JAVA代码是在windows下eclipse里执行:
    

total cost:1132948
    可以看出JAVA代码的运行时间大约在11分钟。单单从运行时间上看java的实现会快差不多4分钟,大师们也都说过Python很关注程序员的生产率而不是绝对的性能。当然很大的可能性是我代码本身问题,而造成这4分钟的时间差。
    写这些也并不想真的比较python和java那个好,只是想这样可以有对比的学习,做以记录。
    

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-372410-1-1.html 上篇帖子: 量化分析师的Python日记【第2天:再接着介绍一下Python呗】 下篇帖子: python unicode ascii编码在windows,*nix上的问题2010-01-05
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表