上節(jié)回顧
主要講了協(xié)程、進(jìn)程、異步IO多路復(fù)用。
協(xié)程和IO多路復(fù)用都是單線程的。
epoll 在linux下通過這個模塊libevent.so實(shí)現(xiàn)
gevent 在底層也是用了libevent.so
gevent可以理解為一個更上層的封裝。
使用select或者selectors,每接收或發(fā)送數(shù)據(jù)一次都要select一次
twisted異步網(wǎng)絡(luò)框架,強(qiáng)大又龐大,不支持python3 (代碼量python中排top3)。幾乎把所有的網(wǎng)絡(luò)服務(wù)都重寫了一遍。
一、RabbitMQ 消息隊(duì)列介紹
RabbitMQ也是消息隊(duì)列,那RabbitMQ和之前python的Queue有什么區(qū)別么?
py 消息隊(duì)列:
線程 queue(同一進(jìn)程下線程之間進(jìn)行交互)
進(jìn)程 Queue(父子進(jìn)程進(jìn)行交互 或者 同屬于同一進(jìn)程下的多個子進(jìn)程進(jìn)行交互)
如果是兩個完全獨(dú)立的python程序,也是不能用上面兩個queue進(jìn)行交互的,或者和其他語言交互有哪些實(shí)現(xiàn)方式呢。
【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護(hù)好多個程序的隊(duì)列。
像這種公共的中間件有好多成熟的產(chǎn)品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang語言 開發(fā)的。
Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務(wù)隊(duì)列) 、haigha
可以維護(hù)很多的隊(duì)列
RabbitMQ 教程官網(wǎng):http://www.rabbitmq.com/getstarted.html
幾個概念說明:
Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個隊(duì)列。
Queue:消息隊(duì)列載體,每個消息都會被投入到一個或多個隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個broker里可以開設(shè)多個vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務(wù)
二、RabbitMQ基本示例.
1、Rabbitmq 安裝
ubuntu系統(tǒng)
1
|
install rabbitmq-server # 直接搞定 |
以下centos系統(tǒng)
1)Install Erlang
1
2
3
4
5
6
7
8
|
# For EL5: rpm -Uvh http: //download .fedoraproject.org /pub/epel/5/i386/epel-release-5-4 .noarch.rpm # For EL6: rpm -Uvh http: //download .fedoraproject.org /pub/epel/6/i386/epel-release-6-8 .noarch.rpm # For EL7: rpm -Uvh http: //download .fedoraproject.org /pub/epel/7/x86_64/e/epel-release-7-8 .noarch.rpm yum install erlang |
2)Install RabbitMQ Server
1
2
|
rpm -- import https: //www .rabbitmq.com /rabbitmq-release-signing-key .asc yum install rabbitmq-server-3.6.5-1.noarch.rpm |
3)use RabbitMQ Server
1
2
|
chkconfig rabbitmq-server on service rabbitmq-server stop/start |
2、基本示例
發(fā)送端 producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import pika # 建立一個實(shí)例 connection = pika.BlockingConnection( pika.ConnectionParameters( 'localhost' ,5672) # 默認(rèn)端口5672,可不寫 ) # 聲明一個管道,在管道里發(fā)消息 channel = connection.channel() # 在管道里聲明queue channel.queue_declare(queue= 'hello' ) # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange= '' , routing_key= 'hello' , # queue名字 body= 'Hello World!' ) # 消息內(nèi)容 print( " [x] Sent 'Hello World!'" ) connection.close() # 隊(duì)列關(guān)閉 |
接收端 consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import pika import time # 建立實(shí)例 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) # 聲明管道 channel = connection.channel() # 為什么又聲明了一個‘hello'隊(duì)列? # 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個機(jī)器先運(yùn)行,所以要聲明兩次。 channel.queue_declare(queue= 'hello' ) def callback(ch, method, properties, body): # 四個參數(shù)為標(biāo)準(zhǔn)格式 print(ch, method, properties) # 打印看一下是什么 # 管道內(nèi)存對象 內(nèi)容相關(guān)信息 后面講 print( " [x] Received %r" % body) time . sleep (15) ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成 channel.basic_consume( # 消費(fèi)消息 callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息 queue= 'hello' , # 你要從那個隊(duì)列里收消息 # no_ack=True # 寫的話,如果接收消息,機(jī)器宕機(jī)消息就丟了 # 一般不寫。宕機(jī)則生產(chǎn)者檢測到發(fā)給其他消費(fèi)者 ) print( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.start_consuming() # 開始消費(fèi)消息 |
3、RabbitMQ 消息分發(fā)輪詢
上面的只是一個生產(chǎn)者、一個消費(fèi)者,能不能一個生產(chǎn)者多個消費(fèi)者呢?
可以上面的例子,多啟動幾個消費(fèi)者consumer,看一下消息的接收情況。
采用輪詢機(jī)制;把消息依次分發(fā)
假如消費(fèi)者處理消息需要15秒,如果當(dāng)機(jī)了,那這個消息處理明顯還沒處理完,怎么處理?
(可以模擬消費(fèi)端斷了,分別注釋和不注釋 no_ack=True 看一下)
你沒給我回復(fù)確認(rèn),就代表消息沒處理完。
上面的效果消費(fèi)端斷了就轉(zhuǎn)到另外一個消費(fèi)端去了,但是生產(chǎn)者怎么知道消費(fèi)端斷了呢?
因?yàn)樯a(chǎn)者和消費(fèi)者是通過socket連接的,socket斷了,就說明消費(fèi)端斷開了。
上面的模式只是依次分發(fā),實(shí)際情況是機(jī)器配置不一樣。怎么設(shè)置類似權(quán)重的操作?
RabbitMQ怎么辦呢,RabbitMQ做了簡單的處理就能實(shí)現(xiàn)公平的分發(fā)。
就是RabbitMQ給消費(fèi)者發(fā)消息的時候檢測下消費(fèi)者里的消息數(shù)量,如果超過指定值(比如1條),就不給你發(fā)了。
只需要在消費(fèi)者端,channel.basic_consume前加上就可以了。
1
2
|
channel.basic_qos(prefetch_count=1) # 類似權(quán)重,按能力分發(fā),如果有一個消息,就不在給你發(fā) channel.basic_consume( # 消費(fèi)消息 |
三、RabbitMQ 消息持久化(durable、properties)
1、RabbitMQ 相關(guān)命令
1
|
rabbitmqctl list_queues # 查看當(dāng)前queue數(shù)量及queue里消息數(shù)量 |
2、消息持久化
如果隊(duì)列里還有消息,RabbitMQ 服務(wù)端宕機(jī)了呢?消息還在不在?
把RabbitMQ服務(wù)重啟,看一下消息在不在。
上面的情況下,宕機(jī)了,消息就久了,下面看看如何把消息持久化。
每次聲明隊(duì)列的時候,都加上durable,注意每個隊(duì)列都得寫,客戶端、服務(wù)端聲明的時候都得寫。
1
2
|
# 在管道里聲明queue channel.queue_declare(queue= 'hello2' , durable=True) |
測試結(jié)果發(fā)現(xiàn),只是把隊(duì)列持久化了,但是隊(duì)列里的消息沒了。
durable的作用只是把隊(duì)列持久化。離消息持久話還差一步:
發(fā)送端發(fā)送消息時,加上properties
1
2
3
|
properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ) |
發(fā)送端 producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' , 5672 )) # 默認(rèn)端口5672,可不寫 channel = connection.channel() #聲明queue channel.queue_declare(queue = 'hello2' , durable = True ) # 若聲明過,則換一個名字 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange = '', routing_key = 'hello2' , body = 'Hello World!' , properties = pika.BasicProperties( delivery_mode = 2 , # make message persistent ) ) print ( " [x] Sent 'Hello World!'" ) connection.close() |
接收端 consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() channel.queue_declare(queue = 'hello2' , durable = True ) def callback(ch, method, properties, body): print ( " [x] Received %r" % body) time.sleep( 10 ) ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產(chǎn)者,消息處理完成 channel.basic_qos(prefetch_count = 1 ) # 類似權(quán)重,按能力分發(fā),如果有一個消息,就不在給你發(fā) channel.basic_consume( # 消費(fèi)消息 callback, # 如果收到消息,就調(diào)用callback queue = 'hello2' , # no_ack=True # 一般不寫,處理完接收處理結(jié)果。宕機(jī)則發(fā)給其他消費(fèi)者 ) print ( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.start_consuming() |
四、RabbitMQ 廣播模式(exchange)
前面的效果都是一對一發(fā),如果做一個廣播效果可不可以,這時候就要用到exchange了
exchange必須精確的知道收到的消息要發(fā)給誰。exchange的類型決定了怎么處理,
類型有以下幾種:
- fanout: 所有綁定到此exchange的queue都可以接收消息
- direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic: 所有符合routingKey(此時可以是一個表達(dá)式)的routingKey所bind的queue可以接收消息
1、fanout 純廣播、all
需要queue和exchange綁定,因?yàn)橄M(fèi)者不是和exchange直連的,消費(fèi)者是連在queue上,queue綁定在exchange上,消費(fèi)者只會在queu里度消息
1
2
3
4
5
6
|
|------------------------| | /—— queue <—|—> consumer1 producer —|—exchange1 <bind | | —— queue <—|—> consumer2 -|-exchange2 …… | |------------------------| |
發(fā)送端 publisher 發(fā)布、廣播
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() # 注意:這里是廣播,不需要聲明queue channel.exchange_declare(exchange = 'logs' , # 聲明廣播管道 type = 'fanout' ) # message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange = 'logs' , routing_key = '', # 注意此處空,必須有 body = message) print ( " [x] Sent %r" % message) connection.close() |
接收端 subscriber 訂閱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'logs' , type = 'fanout' ) # 不指定queue名字,rabbit會隨機(jī)分配一個名字,exclusive=True會在使用此queue的消費(fèi)者斷開后,自動將queue刪除 result = channel.queue_declare(exclusive = True ) # 獲取隨機(jī)的queue名字 queue_name = result.method.queue print ( "random queuename:" , queue_name) channel.queue_bind(exchange = 'logs' , # queue綁定到轉(zhuǎn)發(fā)器上 queue = queue_name) print ( ' [*] Waiting for logs. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] %r" % body) channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
注意:廣播,是實(shí)時的,收不到就沒了,消息不會存下來,類似收音機(jī)。
2、direct 有選擇的接收消息
接收者可以過濾消息,只收我想要的消息
發(fā)送端publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'direct_logs' , type = 'direct' ) # 重要程度級別,這里默認(rèn)定義為 info severity = sys.argv[ 1 ] if len (sys.argv) > 1 else 'info' message = ' ' .join(sys.argv[ 2 :]) or 'Hello World!' channel.basic_publish(exchange = 'direct_logs' , routing_key = severity, body = message) print ( " [x] Sent %r:%r" % (severity, message)) connection.close() |
接收端subscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'direct_logs' , type = 'direct' ) result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue # 獲取運(yùn)行腳本所有的參數(shù) severities = sys.argv[ 1 :] if not severities: sys.stderr.write( "Usage: %s [info] [warning] [error] " % sys.argv[ 0 ]) sys.exit( 1 ) # 循環(huán)列表去綁定 for severity in severities: channel.queue_bind(exchange = 'direct_logs' , queue = queue_name, routing_key = severity) print ( ' [*] Waiting for logs. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
運(yùn)行接收端,指定接收級別的參數(shù),例:
python direct_sonsumer.py info warning
python direct_sonsumer.py warning error
3、topic 更細(xì)致的過濾
比如把error中,apache和mysql的分別或取出來
發(fā)送端publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'topic_logs' , type = 'topic' ) routing_key = sys.argv[ 1 ] if len (sys.argv) > 1 else 'anonymous.info' message = ' ' .join(sys.argv[ 2 :]) or 'Hello World!' channel.basic_publish(exchange = 'topic_logs' , routing_key = routing_key, body = message) print ( " [x] Sent %r:%r" % (routing_key, message)) connection.close() |
接收端 subscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'topic_logs' , type = 'topic' ) result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue binding_keys = sys.argv[ 1 :] if not binding_keys: sys.stderr.write( "Usage: %s [binding_key]... " % sys.argv[ 0 ]) sys.exit( 1 ) for binding_key in binding_keys: channel.queue_bind(exchange = 'topic_logs' , queue = queue_name, routing_key = binding_key) print ( ' [*] Waiting for logs. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
運(yùn)行接收端,指定接收哪些消息,例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
python topic_sonsumer.py * .info python topic_sonsumer.py * .error mysql. * python topic_sonsumer.py '#' # 接收所有消息 # 接收所有的 logs run: # python receive_logs_topic.py "#" # To receive all logs from the facility "kern": # python receive_logs_topic.py "kern.*" # Or if you want to hear only about "critical" logs: # python receive_logs_topic.py "*.critical" # You can create multiple bindings: # python receive_logs_topic.py "kern.*" "*.critical" # And to emit a log with a routing key "kern.critical" type: # python emit_log_topic.py "kern.critical" "A critical kernel error" |
4、RabbitMQ RPC 實(shí)現(xiàn)(Remote procedure call)
不知道你有沒有發(fā)現(xiàn),上面的流都是單向的,如果遠(yuǎn)程的機(jī)器執(zhí)行完返回結(jié)果,就實(shí)現(xiàn)不了了。
如果返回,這種模式叫什么呢,RPC(遠(yuǎn)程過程調(diào)用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎么返回呢?既是發(fā)送端又是接收端。
但是接收端返回消息怎么返回?可以發(fā)送到發(fā)過來的queue里么?不可以。
返回時,再建立一個queue,把結(jié)果發(fā)送新的queue里
為了服務(wù)端返回的queue不寫死,在客戶端給服務(wù)端發(fā)指令的的時候,同時帶一條消息說,你結(jié)果返回給哪個queue
RPC client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import pika import uuid import time class FibonacciRpcClient( object ): def __init__( self ): self .connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) self .channel = self .connection.channel() result = self .channel.queue_declare(exclusive = True ) self .callback_queue = result.method.queue self .channel.basic_consume( self .on_response, # 只要一收到消息就調(diào)用on_response no_ack = True , queue = self .callback_queue) # 收這個queue的消息 def on_response( self , ch, method, props, body): # 必須四個參數(shù) # 如果收到的ID和本機(jī)生成的相同,則返回的結(jié)果就是我想要的指令返回的結(jié)果 if self .corr_id = = props.correlation_id: self .response = body def call( self , n): self .response = None # 初始self.response為None self .corr_id = str (uuid.uuid4()) # 隨機(jī)唯一字符串 self .channel.basic_publish( exchange = '', routing_key = 'rpc_queue' , # 發(fā)消息到rpc_queue properties = pika.BasicProperties( # 消息持久化 reply_to = self .callback_queue, # 讓服務(wù)端命令結(jié)果返回到callback_queue correlation_id = self .corr_id, # 把隨機(jī)uuid同時發(fā)給服務(wù)器 ), body = str (n) ) while self .response is None : # 當(dāng)沒有數(shù)據(jù),就一直循環(huán) # 啟動后,on_response函數(shù)接到消息,self.response 值就不為空了 self .connection.process_data_events() # 非阻塞版的start_consuming() # print("no msg……") # time.sleep(0.5) # 收到消息就調(diào)用on_response return int ( self .response) if __name__ = = '__main__' : fibonacci_rpc = FibonacciRpcClient() print ( " [x] Requesting fib(7)" ) response = fibonacci_rpc.call( 7 ) print ( " [.] Got %r" % response) |
RPC server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import pika import time def fib(n): if n = = 0 : return 0 elif n = = 1 : return 1 else : return fib(n - 1 ) + fib(n - 2 ) def on_request(ch, method, props, body): n = int (body) print ( " [.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange = '', # 把執(zhí)行結(jié)果發(fā)回給客戶端 routing_key = props.reply_to, # 客戶端要求返回想用的queue # 返回客戶端發(fā)過來的correction_id 為了讓客戶端驗(yàn)證消息一致性 properties = pika.BasicProperties(correlation_id = props.correlation_id), body = str (response) ) ch.basic_ack(delivery_tag = method.delivery_tag) # 任務(wù)完成,告訴客戶端 if __name__ = = '__main__' : connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.queue_declare(queue = 'rpc_queue' ) # 聲明一個rpc_queue , channel.basic_qos(prefetch_count = 1 ) # 在rpc_queue里收消息,收到消息就調(diào)用on_request channel.basic_consume(on_request, queue = 'rpc_queue' ) print ( " [x] Awaiting RPC requests" ) channel.start_consuming() |
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/fgf00/article/details/52872730