使用python asyncio實現了一個異步代理池,根據規則爬取代理網站上的免費代理,在驗證其有效后存入redis中,定期擴展代理的數量并檢驗池中代理的有效性,移除失效的代理。同時用aiohttp實現了一個server,其他的程序可以通過訪問相應的url來從代理池中獲取代理。
源碼
環境
- Python 3.5+
- Redis
- PhantomJS(可選)
- Supervisord(可選)
因為代碼中大量使用了asyncio的async和await語法,它們是在Python3.5中才提供的,所以最好使用Python3.5及以上的版本,我使用的是Python3.6。
依賴
- redis
- aiohttp
- bs4
- lxml
- requests
- selenium
selenium包主要是用來操作PhantomJS的。
下面來對代碼進行說明。
1. 爬蟲部分
核心代碼
1
2
3
4
5
6
7
8
9
10
11
12
|
async def start( self ): for rule in self ._rules: parser = asyncio.ensure_future( self ._parse_page(rule)) # 根據規則解析頁面來獲取代理 logger.debug( '{0} crawler started' . format (rule.__rule_name__)) if not rule.use_phantomjs: await page_download(ProxyCrawler._url_generator(rule), self ._pages, self ._stop_flag) # 爬取代理網站的頁面 else : await page_download_phantomjs(ProxyCrawler._url_generator(rule), self ._pages, rule.phantomjs_load_flag, self ._stop_flag) # 使用PhantomJS爬取 await self ._pages.join() parser.cancel() logger.debug( '{0} crawler finished' . format (rule.__rule_name__)) |
上面的核心代碼實際上是一個用asyncio.Queue實現的生產-消費者模型,下面是該模型的一個簡單實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import asyncio from random import random async def produce(queue, n): for x in range ( 1 , n + 1 ): print ( 'produce ' , x) await asyncio.sleep(random()) await queue.put(x) # 向queue中放入item async def consume(queue): while 1 : item = await queue.get() # 等待從queue中獲取item print ( 'consume ' , item) await asyncio.sleep(random()) queue.task_done() # 通知queue當前item處理完畢 async def run(n): queue = asyncio.Queue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue, n) # 等待生產者結束 await queue.join() # 阻塞直到queue不為空 consumer.cancel() # 取消消費者任務,否則它會一直阻塞在get方法處 def aio_queue_run(n): loop = asyncio.get_event_loop() try : loop.run_until_complete(run(n)) # 持續運行event loop直到任務run(n)結束 finally : loop.close() if __name__ = = '__main__' : aio_queue_run( 5 ) |
運行上面的代碼,一種可能的輸出如下:
1
2
3
4
5
6
7
8
9
10
|
produce 1 produce 2 consume 1 produce 3 produce 4 consume 2 produce 5 consume 3 consume 4 consume 5 |
爬取頁面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
async def page_download(urls, pages, flag): url_generator = urls async with aiohttp.ClientSession() as session: for url in url_generator: if flag.is_set(): break await asyncio.sleep(uniform(delay - 0.5 , delay + 1 )) logger.debug( 'crawling proxy web page {0}' . format (url)) try : async with session.get(url, headers = headers, timeout = 10 ) as response: page = await response.text() parsed = html.fromstring(decode_html(page)) # 使用bs4來輔助lxml解碼網頁:http://lxml.de/elementsoup.html#Using only the encoding detection await pages.put(parsed) url_generator.send(parsed) # 根據當前頁面來獲取下一頁的地址 except StopIteration: break except asyncio.TimeoutError: logger.error( 'crawling {0} timeout' . format (url)) continue # TODO: use a proxy except Exception as e: logger.error(e) |
使用aiohttp實現的網頁爬取函數,大部分代理網站都可以使用上面的方法來爬取,對于使用js動態生成頁面的網站可以使用selenium控制PhantomJS來爬取——本項目對爬蟲的效率要求不高,代理網站的更新頻率是有限的,不需要頻繁的爬取,完全可以使用PhantomJS。
解析代理
最簡單的莫過于用xpath來解析代理了,使用Chrome瀏覽器的話,直接通過右鍵就能獲得選中的頁面元素的xpath:
安裝Chrome的擴展“XPath Helper”就可以直接在頁面上運行和調試xpath,十分方便:
BeautifulSoup不支持xpath,使用lxml來解析頁面,代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
async def _parse_proxy( self , rule, page): ips = page.xpath(rule.ip_xpath) # 根據xpath解析得到list類型的ip地址集合 ports = page.xpath(rule.port_xpath) # 根據xpath解析得到list類型的ip地址集合 if not ips or not ports: logger.warning( '{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network' . format ( len (ips), len (ports), rule.__rule_name__)) return proxies = map ( lambda x, y: '{0}:{1}' . format (x.text.strip(), y.text.strip()), ips, ports) if rule.filters: # 根據過濾字段來過濾代理,如“高匿”、“透明”等 filters = [] for i, ft in enumerate (rule.filters_xpath): field = page.xpath(ft) if not field: logger.warning( '{1} crawler could not get {0} field, please check the filter xpath' . format (rule.filters[i], rule.__rule_name__)) continue filters.append( map ( lambda x: x.text.strip(), field)) filters = zip ( * filters) selector = map ( lambda x: x = = rule.filters, filters) proxies = compress(proxies, selector) for proxy in proxies: await self ._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中 |
爬蟲規則
網站爬取、代理解析、濾等等操作的規則都是由各個代理網站的規則類定義的,使用元類和基類來管理規則類。基類定義如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
class CrawlerRuleBase( object , metaclass = CrawlerRuleMeta): start_url = None page_count = 0 urls_format = None next_page_xpath = None next_page_host = '' use_phantomjs = False phantomjs_load_flag = None filters = () ip_xpath = None port_xpath = None filters_xpath = () |
各個參數的含義如下:
start_url
(必需)
爬蟲的起始頁面。
ip_xpath
(必需)
爬取IP的xpath規則。
port_xpath
(必需)
爬取端口號的xpath規則。
page_count
爬取的頁面數量。
urls_format
頁面地址的格式字符串,通過urls_format.format(start_url, n)來生成第n頁的地址,這是比較常見的頁面地址格式。
next_page_xpath
,next_page_host
由xpath規則來獲取下一頁的url(常見的是相對路徑),結合host得到下一頁的地址:next_page_host + url。
use_phantomjs
, phantomjs_load_flag
use_phantomjs用于標識爬取該網站是否需要使用PhantomJS,若使用,需定義phantomjs_load_flag(網頁上的某個元素,str類型)作為PhantomJS頁面加載完畢的標志。
filters
過濾字段集合,可迭代類型。用于過濾代理。
爬取各個過濾字段的xpath規則,與過濾字段按順序一一對應。
元類CrawlerRuleMeta用于管理規則類的定義,如:如果定義use_phantomjs=True,則必須定義phantomjs_load_flag,否則會拋出異常,不在此贅述。
目前已經實現的規則有西刺代理、快代理、360代理、66代理和 秘密代理。新增規則類也很簡單,通過繼承CrawlerRuleBase來定義新的規則類YourRuleClass,放在proxypool/rules目錄下,并在該目錄下的__init__.py中添加from . import YourRuleClass(這樣通過CrawlerRuleBase.__subclasses__()就可以獲取全部的規則類了),重啟正在運行的proxy pool即可應用新的規則。
2. 檢驗部分
免費的代理雖然多,但是可用的卻不多,所以爬取到代理后需要對其進行檢驗,有效的代理才能放入代理池中,而代理也是有時效性的,還要定期對池中的代理進行檢驗,及時移除失效的代理。
這部分就很簡單了,使用aiohttp通過代理來訪問某個網站,若超時,則說明代理無效。
1
2
3
4
5
6
7
8
9
10
11
12
|
async def validate( self , proxies): logger.debug( 'validator started' ) while 1 : proxy = await proxies.get() async with aiohttp.ClientSession() as session: try : real_proxy = 'http://' + proxy async with session.get( self .validate_url, proxy = real_proxy, timeout = validate_timeout) as resp: self ._conn.put(proxy) except Exception as e: logger.error(e) proxies.task_done() |
3. server部分
使用aiohttp實現了一個web server,啟動后,訪問http://host:port即可顯示主頁:
- 訪問http://host:port/get來從代理池獲取1個代理,如:'127.0.0.1:1080';
- 訪問http://host:port/get/n來從代理池獲取n個代理,如:"['127.0.0.1:1080', '127.0.0.1:443', '127.0.0.1:80']";
- 訪問http://host:port/count來獲取代理池的容量,如:'42'。
因為主頁是一個靜態的html頁面,為避免每來一個訪問主頁的請求都要打開、讀取以及關閉該html文件的開銷,將其緩存到了redis中,通過html文件的修改時間來判斷其是否被修改過,如果修改時間與redis緩存的修改時間不同,則認為html文件被修改了,則重新讀取文件,并更新緩存,否則從redis中獲取主頁的內容。
返回代理是通過aiohttp.web.Response(text=ip.decode('utf-8'))
實現的,text要求str類型,而從redis中獲取到的是bytes類型,需要進行轉換。返回的多個代理,使用eval即可轉換為list類型。
返回主頁則不同,是通過aiohttp.web.Response(body=main_page_cache, content_type='text/html')
,這里body要求的是bytes類型,直接將從redis獲取的緩存返回即可,conten_type='text/html'
必不可少,否則無法通過瀏覽器加載主頁,而是會將主頁下載下來——在運行官方文檔中的示例代碼的時候也要注意這點,那些示例代碼基本上都沒有設置content_type。
這部分不復雜,注意上面提到的幾點,而關于主頁使用的靜態資源文件的路徑,可以參考之前的博客《aiohttp之添加靜態資源路徑》。
4. 運行
將整個代理池的功能分成了3個獨立的部分:
proxypool
定期檢查代理池容量,若低于下限則啟動代理爬蟲并對代理檢驗,通過檢驗的爬蟲放入代理池,達到規定的數量則停止爬蟲。
proxyvalidator
用于定期檢驗代理池中的代理,移除失效代理。
proxyserver
啟動server。
這3個獨立的任務通過3個進程來運行,在Linux下可以使用supervisod來=管理這些進程,下面是supervisord的配置文件示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
; supervisord.conf [unix_http_server] file = / tmp / supervisor.sock [inet_http_server] port = 127.0 . 0.1 : 9001 [supervisord] logfile = / tmp / supervisord.log logfile_maxbytes = 5MB logfile_backups = 10 loglevel = debug pidfile = / tmp / supervisord.pid nodaemon = false minfds = 1024 minprocs = 200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl = unix: / / / tmp / supervisor.sock [program:proxyPool] command = python / path / to / ProxyPool / run_proxypool.py redirect_stderr = true stdout_logfile = NONE [program:proxyValidator] command = python / path / to / ProxyPool / run_proxyvalidator.py redirect_stderr = true stdout_logfile = NONE [program:proxyServer] command = python / path / to / ProxyPool / run_proxyserver.py autostart = false redirect_stderr = true stdout_logfile = NONE |
因為項目自身已經配置了日志,所以這里就不需要再用supervisord捕獲stdout和stderr了。通過supervisord -c supervisord.conf啟動supervisord,proxyPool和proxyServer則會隨之自動啟動,proxyServer需要手動啟動,訪問http://127.0.0.1:9001即可通過網頁來管理這3個進程了:
supervisod的官方文檔說目前(版本3.3.1)不支持python3,但是我在使用過程中沒有發現什么問題,可能也是由于我并沒有使用supervisord的復雜功能,只是把它當作了一個簡單的進程狀態監控和啟停工具了。
以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!
原文鏈接:http://www.cnblogs.com/xmwd/p/python_asyncio_proxy_crawler_and_peoxy_pool.html