Python使用Pika庫(安裝:sudo pip install pika)可以操作RabbitMQ消息隊列服務器(安裝:sudo apt-get install rabbitmq-server),這里我們來看一下MQ相關的路由功能。
路由鍵的實現
比如有一個需要給所有接收端發送消息的場景,但是如果需要自由定制,有的消息發給其中一些接收端,有些消息發送給另外一些接收端,要怎么辦呢?這種情況下就要用到路由鍵了。
路由鍵的工作原理:每個接收端的消息隊列在綁定交換機的時候,可以設定相應的路由鍵。發送端通過交換機發送信息時,可以指明路由鍵 ,交換機會根據路由鍵把消息發送到相應的消息隊列,這樣接收端就能接收到消息了。
這邊繼上一篇,還是用send.py和receive.py來模擬實現路由鍵的功能。send.py表示發送端,receive.py表示接收端。實例的功能就是將info、warning、error三種級別的信息發送到不同的接收端。
send.py代碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() #定義交換機,設置類型為direct channel.exchange_declare(exchange = 'messages' , type = 'direct' ) #定義三個路由鍵 routings = [ 'info' , 'warning' , 'error' ] #將消息依次發送到交換機,并設置路由鍵 for routing in routings: message = '%s message.' % routing channel.basic_publish(exchange = 'messages' , routing_key = routing, body = message) print message connection.close() |
receive.py代碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#!/usr/bin/env python #coding=utf8 import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() #定義交換機,設置類型為direct channel.exchange_declare(exchange = 'messages' , type = 'direct' ) #從命令行獲取路由鍵參數,如果沒有,則設置為info routings = sys.argv[ 1 :] if not routings: routings = [ 'info' ] #生成臨時隊列,并綁定到交換機上,設置路由鍵 result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue for routing in routings: channel.queue_bind(exchange = 'messages' , queue = queue_name, routing_key = routing) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue = queue_name, no_ack = True ) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming() |
打開兩個終端,一個運行代碼python receive.py info warning,表示只接收info和warning的消息。另外一個終端運行send.py,可以觀察到接收終端只接收到了info和warning的消息。如果打開多個終端運行receive.py,并傳入不同的路由鍵參數,可以看到更明顯的效果。
當接收端正在運行時,可以使用rabbitmqctl list_bindings來查看綁定情況。
路由鍵模糊匹配
路由鍵模糊匹配,就是可以使用正則表達式,和常用的正則表示式不同,這里的話“#”表示所有、全部的意思;“*”只匹配到一個詞。看完示例就能明白了。
這邊繼上面的例子,還是用send.py和receive.py來實現路由鍵模糊匹配的功能。send.py表示發送端,receive.py表示接收端。實例的功能大概是這樣:比如你有個知心好朋友,不管開心、傷心、工作上的還是生活上的事情都可以和她說;還有一些朋友可以分享開心的事情;還有一些朋友,你可以把不開心的事情和她說。
send.py代碼分析
因為要進行路由鍵模糊匹配,所以交換機的類型要設置為topic,設置為topic,就可以使用#,*的匹配符號了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() #定義交換機,設置類型為topic channel.exchange_declare(exchange = 'messages' , type = 'topic' ) #定義路由鍵 routings = [ 'happy.work' , 'happy.life' , 'sad.work' , 'sad.life' ] #將消息依次發送到交換機,并設定路由鍵 for routing in routings: message = '%s message.' % routing channel.basic_publish(exchange = 'messages' , routing_key = routing, body = message) print message connection.close() |
上例中定義了四種類型的消息,容易理解,就不解釋了,然后依次發送出去。
receive.py代碼分析
同樣,交換機的類型要設定為topic就可以了。從命令行接收參數的功能稍微調整了一下,就是沒有參數時報錯退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#!/usr/bin/env python #coding=utf8 import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() #定義交換機,設置類型為topic channel.exchange_declare(exchange = 'messages' , type = 'topic' ) #從命令行獲取路由參數,如果沒有,則報錯退出 routings = sys.argv[ 1 :] if not routings: print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[ 0 ],) exit() #生成臨時隊列,并綁定到交換機上,設置路由鍵 result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue for routing in routings: channel.queue_bind(exchange = 'messages' , queue = queue_name, routing_key = routing) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue = queue_name, no_ack = True ) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming() |
打開四個終端,一個運行如下,表示任何事情都可以和她說:
1
|
python receive.py "#" |
另外一個終端 運行如下,表示可以和她分享開心的事:
1
|
python receive.py "happy.*" |
第三個運行如下,表示工作上的事情可以和她分享:
1
|
python receive.py "*.work" |
最后一個運行python send.py。結果不難想象出來,就不貼出來了。