本文内容:

python当中的异步并发编程以及编程实例

python相关模块

相关模块以及调用方法

Python 中 concurrent.futures 模块使用说明

gevent

python之gevent(1)

实例

这里我会根据实际的一个高并发爬虫来演示如何利用多进程、多线程以及协程进行组合从而实现一个并发数可控的高并发爬虫

我们这里目标选择https://www.52bqg.com/book_203/

即我以前很喜欢看的校花的贴身高手,目前差不多8300章,很适合测试爬虫效率

链接爬取

对于连接的爬取,这里采用一开始就先在主线程进行调用,将所有的章节的href存到一个列表之后进行下一步操作

这里也可以设计成一个单独的进程处理连接爬取任务,主要的额外操作在于需要设置Lock或者Semaphore来进行同步,让子进程处理完连接爬取任务后从而释放锁,进行下一步的并发爬虫操作,这里我们先简单使用列表进行连接存储,以后会进行改进使用队列

def getIndexUrl(indexUrl):
allUrls = []
try:
res = session.get(url=indexUrl)
bs = BeautifulSoup(res.content.decode("GBK"), "lxml")
lists = bs.find("div", id="list")
dd = lists.find_all("dd")
for eachdd in dd:
try:
a = eachdd.find("a")
href = a['href']
allUrls.append(indexUrl + href)
except:
pass
return allUrls
except:
pass

这里需要注意的是网页编码

链接分组

由于采用多进程、多线程和协程组合,所以需要对每一个协程分配任务,这里就需要对链接进行分组,要满足每一个协程都能分配到合适的任务,由于多进程和多线程都需要对链接列表进行分组切割,这里我写了一个splitUrl类用来处理连接分组任务

class splitUrl:
def __init__(self, splitnum, urls):
self.splitnum = splitnum
self.urls = urls

def run(self):
num_urls = len(self.urls)
if num_urls < self.splitnum:
return [self.urls]
perUrlNum = int(num_urls / self.splitnum)
splitted_urls = []
for i in range(self.splitnum):
if i == 0:
splitted_urls.append(self.urls[: (i + 1) * perUrlNum])
elif i == self.splitnum - 1:
splitted_urls.append(self.urls[i * perUrlNum:])
else:
splitted_urls.append(self.urls[i * perUrlNum: (i + 1) * perUrlNum])
return splitted_urls

逻辑很简单,根据设置的分组个数以及总的url列表进行分组,最终效果如下(这里显示为8进程分组)

image-20200414123646833

多进程设计

多进程我们这里选择采用concurrent.futures作为异步模块,调用其中的ProcessPoolExecutor作为进程池,方便操作系统对进程进行调度

from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
def multiProcessScheduler(splitted_urls):
global CPU_NUM
try:
executor = ProcessPoolExecutor(max_workers=CPU_NUM)
all_tasks = executor.map(threadScheduler, splitted_urls)
wait(all_tasks, return_when=ALL_COMPLETED)
except:
pass

这里我们提前在主线程讲所有链接分好组,然后直接在multiProcessScheduler进行调用

首先我们需要获得当前的CPU核数,超过CPU核数的多进程数是没有意义的,反而影响操作系统的进程间调度

from multiprocessing import cpu_count
CPU_NUM = cpu_count()

然后实例化ProcessPoolExecutor类,其中每一个进程要执行的是我们下一步要调用的threadScheduler函数,它的参数是我们已经初步分好组的url

采用map函数来进行批量设置(此map函数非python标准库的map函数)

对于单独要设置的任务要采用submit进行手工提交

submit(fn, *args, **kwargs)

安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())

这里要注意,必须设置wait来对主线程进行阻塞,退出条件是所有进程中的任务全部完毕时候才会释放对主进程的阻塞,否则主进程提前结束会导致所有的并发失败

多线程设计

同理,我们需要对多进程传进来的分组url列表进行二次分组,我这里偷懒写入了线程当中,如果想提高速度的话,建议在主线程一开始就分好线程的组

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
def threadScheduler(urlss):
global THREAD_NUM
try:
splitted_urls = splitUrl(THREAD_NUM, urlss).run()
executor = ThreadPoolExecutor(max_workers=THREAD_NUM)
all_tasks = executor.map(greenletScheduler, splitted_urls)
wait(all_tasks, return_when=ALL_COMPLETED)
except:
pass

这里选择我们开辟的线程池大小,对于每一个线程都调用我们的greenletScheduler函数,即我们的协程

同理需要设置wait需要阻塞线程防止提前结束

协程设计

from gevent import monkey

monkey.patch_all()

...
...

from gevent.pool import Pool as ge_pool
def greenletScheduler(urls):
global GEVENT_NUM
greenlet_pool = ge_pool(GEVENT_NUM)
for url in urls:
greenlet_pool.apply_async(getContent, (url,))
greenlet_pool.join()

这个是专门为了改造所有的模块使之能够变为异步操作,这个需要放到代码最开始的地方从而实现把socket、ssl、threading和 select等模块变为协程,这一过程需要在启动时通过monkey patch完成。

然后创建的我们的协程池,这里将url加入到我们的协程池当中,注意这里采用了apply_async,即异步非阻塞

apply是阻塞式的。

首先主进程开始运行,碰到子进程,操作系统切换到子进程,等待子进程运行结束后,在切换到另外一个子进程,直到所有子进程运行完毕。然后在切换到主进程,运行剩余的部分。

apply_async是异步非阻塞式的。

首先主进程开始运行,碰到子进程后,主进程说:让我先运行个够,等到操作系统进行进程切换的时候,在交给子进程运行。以为我们的程序太短,然而还没等到操作系统进行进程切换,主进程就运行完毕了。
想要子进程执行,就告诉主进程:你等着所有子进程执行完毕后,在运行剩余部分。

反正知道阻塞的会特别慢,直接调用apply_async即可

将所有的任务加到协程池后就join阻塞当前线程直到所有的协程任务完成

全部代码

from gevent import monkey

monkey.patch_all()

from multiprocessing import cpu_count
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import os
import requests
import timeit
from bs4 import BeautifulSoup
from gevent.pool import Pool as ge_pool

session = requests.session()

indexUrl = "https://www.52bqg.com/book_203/"

CPU_NUM = cpu_count()
THREAD_NUM = 4
GEVENT_NUM = 5


class splitUrl:
def __init__(self, splitnum, urls):
self.splitnum = splitnum
self.urls = urls

def run(self):
num_urls = len(self.urls)
if num_urls < self.splitnum:
return [self.urls]
perUrlNum = int(num_urls / self.splitnum)
splitted_urls = []
for i in range(self.splitnum):
if i == 0:
splitted_urls.append(self.urls[: (i + 1) * perUrlNum])
elif i == self.splitnum - 1:
splitted_urls.append(self.urls[i * perUrlNum:])
else:
splitted_urls.append(self.urls[i * perUrlNum: (i + 1) * perUrlNum])
return splitted_urls


def getIndexUrl(indexUrl):
allUrls = []
try:
res = session.get(url=indexUrl)
bs = BeautifulSoup(res.content.decode("GBK"), "lxml")
lists = bs.find("div", id="list")
dd = lists.find_all("dd")
for eachdd in dd:
try:
a = eachdd.find("a")
href = a['href']
allUrls.append(indexUrl + href)
except:
pass
return allUrls
except:
pass


def getContent(url):
try:
res = session.get(url)
bs = BeautifulSoup(res.content.decode("GBK"), "lxml")
content = bs.find("div", id="content").get_text()
title = bs.find("div", class_="bookname").find("h1").get_text()
writeContent(title, content)
except:
pass


def writeContent(title, content):
with open("content/{}.txt".format(title), "w+") as f:
f.write(content)
f.close()


def multiProcessScheduler(splitted_urls):
global CPU_NUM
try:
executor = ProcessPoolExecutor(max_workers=CPU_NUM)
all_tasks = executor.map(threadScheduler, splitted_urls)
wait(all_tasks, return_when=ALL_COMPLETED)
except:
pass


def threadScheduler(urlss):
global THREAD_NUM
try:
splitted_urls = splitUrl(THREAD_NUM, urlss).run()
executor = ThreadPoolExecutor(max_workers=THREAD_NUM)
all_tasks = executor.map(greenletScheduler, splitted_urls)
wait(all_tasks, return_when=ALL_COMPLETED)
except:
pass

def greenletScheduler(urls):
global GEVENT_NUM
greenlet_pool = ge_pool(GEVENT_NUM)
for url in urls:
greenlet_pool.apply_async(getContent, (url,))
greenlet_pool.join()


if __name__ == '__main__':
if not os.path.exists("content"):
os.mkdir("content")
else:
pass
start = timeit.default_timer()
allUrls = getIndexUrl(indexUrl)
splitted_urls = splitUrl(CPU_NUM, allUrls).run()
for each in splitted_urls:
print(each)
multiProcessScheduler(splitted_urls)
end = timeit.default_timer()
print(str(end - start))

效果

image-20200414133610745

image-20200414132119788

image-20200414134148216

可以看到一共27秒即可爬完所有的8300章的小说,比同步一章一章爬取快的不知道哪去了

所以说对于爬虫而言,最好采用异步来加快速度,

总结

  • 异步编程是一门巨坑,如果调度不当的话,很可能陷入巨坑无法自拔,而且多进程这里debug不了,我看网上建议是通过阻塞子进程或者print调试法-_-
  • 现在python3.6出现了asyncio、aiohttp两个异步IO模块,打算下一期使用这两个模块进行改造
  • 异步编程尤其要注意,不要随意增加print等IO操作,否则操作系统还得回来调度IO,增加开销时间
  • 感觉还是很初级,下一次试试采用队列等进行有序归并,顺便看看map_reduce?

最重要的是,多进程编程千万不要写递归!!!

否则就会这样↓↓↓↓↓↓↓↓

BAFB3972931871C4D9A49477A2326D7F