Python第七周 学习笔记(1)
import re from pathlib import Pathimport datetime
import time
import threading
from queue import Queue
from user_agents import parse
PATTERN = r'''(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"'''
pattern = re.compile(PATTERN)
def extract(text):
ops = {'datetime': (lambda x: datetime.datetime.strptime(x, '%d/%b/%Y:%H:%M:%S %z')), 'status': int, 'size': int,
'useragent': lambda x: parse(x)}
mat = pattern.match(text)
return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
def openfile(filename):
with open(filename) as f:
for text in f:
fields = extract(text)
time.sleep(2)
if fields:
yield fields
else:
continue
# producer
def load(*pathnames):
for path in pathnames:
pathname = Path(path)
if not pathname.exists():
continue
if pathname.is_file():
yield from openfile(pathname)
elif pathname.is_dir():
for filename in pathname.iterdir():
if filename.is_file():
yield from openfile(filename)
def sum_size_handler(iterable):
return sum(map(lambda x: x['size'], iterable))
def status_handler(iterable):
status = {}
for dic in iterable:
key = dic['status']
status = status.get(key, 0) + 1
return {k: v / len(iterable) for k, v in status.items()}
d = {}
def ua_handler(iterable):
ua_family = {}
for item in iterable:
val = item['useragent']
key = (val.browser.family, val.browser.version_string)
ua_family = ua_family.get(key, 0) + 1
d = d.get(key, 0) + 1
return ua_family, d
# consumer
def window(q: Queue, handler,> st_time = datetime.datetime.strptime('19700101 000000 +0800', '%Y%m%d %H%M%S %z')
cur_time = datetime.datetime.strptime('19700101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
while True:
# src = next(iterable)
src = q.get()
print(src)
buffer.append(src)
cur_time = src['datetime']
if (cur_time - st_time).total_seconds() > interval:
val = handler(buffer)
st_time = cur_time
b, d = val
d = sorted(d.items(), key=lambda x: x, reverse=True)
print(val)
print(d)
buffer = > (cur_time - datetime.timedelta(seconds=width - interval))]
def dispatcher(src):
reg_handler = []
queues = []
def reg(handler,> q = Queue()
queues.append(q)
thrd = threading.Thread(target=window, args=(q, handler,> reg_handler.append(thrd)
def run():
for i in reg_handler:
i.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
# path=sys.argv
path = 'test.log'
reg, run = dispatcher(load('test.log'))
# reg(sum_size_handler, 20, 5)
# reg(status_handler, 20, 5)
reg(ua_handler, 20, 5)
run()
页:
[1]