国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

腳本之家,腳本語(yǔ)言編程技術(shù)及教程分享平臺(tái)!
分類導(dǎo)航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務(wù)器之家 - 腳本之家 - Python - 利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列

利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列

2020-08-04 10:52蔚藍(lán)行 Python

RabbitMQ和郵局的主要區(qū)別就是RabbitMQ接收、存儲(chǔ)和發(fā)送的是二進(jìn)制數(shù)據(jù)----消息,本篇文章給大家介紹利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列,對(duì)python消息隊(duì)列相關(guān)知識(shí)感興趣的朋友參考下

RabbitMQ可以當(dāng)做一個(gè)消息代理,它的核心原理非常簡(jiǎn)單:即接收和發(fā)送消息,可以把它想象成一個(gè)郵局:我們把信件放入郵箱,郵遞員就會(huì)把信件投遞到你的收件人處,RabbitMQ就是一個(gè)郵箱、郵局、投遞員功能綜合體,整個(gè)過(guò)程就是:郵箱接收信件,郵局轉(zhuǎn)發(fā)信件,投遞員投遞信件到達(dá)收件人處。

RabbitMQ和郵局的主要區(qū)別就是RabbitMQ接收、存儲(chǔ)和發(fā)送的是二進(jìn)制數(shù)據(jù)----消息。

rabbitmq基本管理命令:

一步啟動(dòng)Erlang node和Rabbit應(yīng)用:sudo rabbitmq-server

在后臺(tái)啟動(dòng)Rabbit node:sudo rabbitmq-server -detached

關(guān)閉整個(gè)節(jié)點(diǎn)(包括應(yīng)用):sudo rabbitmqctl stop

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]

Demo:

producer.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚藍(lán)行
# 博客 http://www.cnblogs.com/duanv/
import pika
import sys
#創(chuàng)建連接connection到localhost
con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#創(chuàng)建虛擬連接channel
cha = con.channel()
#創(chuàng)建隊(duì)列anheng,durable參數(shù)為真時(shí),隊(duì)列將持久化;exclusive為真時(shí),建立臨時(shí)隊(duì)列
result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
#創(chuàng)建名為yanfa,類型為fanout的exchange,其他類型還有direct和topic,如果指定durable為真,exchange將持久化
cha.exchange_declare(durable=False,
          exchange='yanfa',
          type='direct',)
#綁定exchange和queue,result.method.queue獲取的是隊(duì)列名稱
cha.queue_bind(exchange='yanfa',
       queue=result.method.queue,
       routing_key='',)
#公平分發(fā),使每個(gè)consumer在同一時(shí)間最多處理一個(gè)message,收到ack前,不會(huì)分配新的message
cha.basic_qos(prefetch_count=)
#發(fā)送信息到隊(duì)列‘anheng'
message = ' '.join(sys.argv[:])
#消息持久化指定delivery_mode=;
cha.basic_publish(exchange='',
         routing_key='anheng',
         body=message,
         properties=pika.BasicProperties(
          delivery_mode = ,
        ))
print '[x] Sent %r' % (message,)
#關(guān)閉連接
con.close()

consumer.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚藍(lán)行
# 博客 http://www.cnblogs.com/duanv/
import pika
#建立連接connection到localhost
con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#創(chuàng)建虛擬連接channel
cha = con.channel()
#創(chuàng)建隊(duì)列anheng
result=cha.queue_declare(queue='anheng',durable=True)
#創(chuàng)建名為yanfa,類型為fanout的交換機(jī),其他類型還有direct和topic
cha.exchange_declare(durable=False,
          exchange='yanfa',
          type='direct',)
#綁定exchange和queue,result.method.queue獲取的是隊(duì)列名稱
cha.queue_bind(exchange='yanfa',
       queue=result.method.queue,
       routing_key='',)
#公平分發(fā),使每個(gè)consumer在同一時(shí)間最多處理一個(gè)message,收到ack前,不會(huì)分配新的message
cha.basic_qos(prefetch_count=)
print ' [*] Waiting for messages. To exit press CTRL+C'
#定義回調(diào)函數(shù)
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  ch.basic_ack(delivery_tag = method.delivery_tag)
cha.basic_consume(callback,
         queue='anheng',
         no_ack=False,)
cha.start_consuming()

一、概念:

Connection: 一個(gè)TCP的連接。Producer和Consumer都是通過(guò)TCP連接到RabbitMQ Server的。程序的起始處就是建立這個(gè)TCP連接。

Channels: 虛擬連接。建立在上述的TCP連接中。數(shù)據(jù)流動(dòng)都是在Channel中進(jìn)行的。一般情況是程序起始建立TCP連接,第二步就是建立這個(gè)Channel。

二、隊(duì)列:

首先建立一個(gè)Connection,然后建立Channels,在channel上建立隊(duì)列

建立時(shí)指定durable參數(shù)為真,隊(duì)列將持久化;指定exclusive為真,隊(duì)列為臨時(shí)隊(duì)列,關(guān)閉consumer后該隊(duì)列將不再存在,一般情況下建立臨時(shí)隊(duì)列并不指定隊(duì)列名稱,rabbitmq將隨機(jī)起名,通過(guò)result.method.queue來(lái)獲取隊(duì)列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

區(qū)別:durable是隊(duì)列持久化與否,如果為真,隊(duì)列將在rabbitmq服務(wù)重啟后仍存在,如果為假,rabbitmq服務(wù)重啟前不會(huì)消失,與consumer關(guān)閉與否無(wú)關(guān);

而exclusive是建立臨時(shí)隊(duì)列,當(dāng)consumer關(guān)閉后,該隊(duì)列就會(huì)被刪除

三、exchange和bind

Exchange中durable參數(shù)指定exchange是否持久化,exchange參數(shù)指定exchange名稱,type指定exchange類型。Exchange類型有direct,fanout和topic。

Bind是將exchange與queue進(jìn)行關(guān)聯(lián),exchange參數(shù)和queue參數(shù)分別指定要進(jìn)行bind的exchange和queue,routing_key為可選參數(shù)。

Exchange的三種模式:

Direct:

任何發(fā)送到Direct Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到routing_key中指定的Queue

1.一般情況可以使用rabbitMQ自帶的Exchange:””(該Exchange的名字為空字符串);

2.這種模式下不需要將Exchange進(jìn)行任何綁定(bind)操作;

3.消息傳遞時(shí)需要一個(gè)“routing_key”,可以簡(jiǎn)單的理解為要發(fā)送到的隊(duì)列名字;

4.如果vhost中不存在routing_key中指定的隊(duì)列名,則該消息會(huì)被拋棄。

Demo中雖然聲明了一個(gè)exchange='yanfa'和queue='anheng'的bind,但是在后面發(fā)送消息時(shí)并沒(méi)有使用該exchange和bind,而是采用了direct的模式,沒(méi)有指定exchange,而是指定了routing_key的名稱為隊(duì)列名,消息將發(fā)送到指定隊(duì)列。

如果一個(gè)exchange 聲明為direct,并且bind中指定了routing_key,那么發(fā)送消息時(shí)需要同時(shí)指明該exchange和routing_key.

Fanout:

任何發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange綁定(Binding)的所有Queue上

1.可以理解為路由表的模式

2.這種模式不需要routing_key

3.這種模式需要提前將Exchange與Queue進(jìn)行綁定,一個(gè)Exchange可以綁定多個(gè)Queue,一個(gè)Queue可以同多個(gè)Exchange進(jìn)行綁定。

4.如果接受到消息的Exchange沒(méi)有與任何Queue綁定,則消息會(huì)被拋棄。

Demo中創(chuàng)建了一個(gè)將一個(gè)exchange和一個(gè)queue進(jìn)行fanout類型的bind.但是發(fā)送信息時(shí)沒(méi)有用到它,如果要用到它,只要在發(fā)送消息時(shí)指定該exchange的名稱即可,該exchange就會(huì)將消息發(fā)送到所有和它bind的隊(duì)列中。在fanout模式下,指定的routing_key是無(wú)效的 。

Topic:

任何發(fā)送到Topic Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到所有關(guān)心routing_key中指定話題的Queue上

1.這種模式較為復(fù)雜,簡(jiǎn)單來(lái)說(shuō),就是每個(gè)隊(duì)列都有其關(guān)心的主題,所有的消息都帶有一個(gè)“標(biāo)題”(routing_key),Exchange會(huì)將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與routing_key模糊匹配的隊(duì)列。

2.這種模式需要routing_key,也許要提前綁定Exchange與Queue。

3.在進(jìn)行綁定時(shí),要提供一個(gè)該隊(duì)列關(guān)心的主題,如“#.log.#”表示該隊(duì)列關(guān)心所有涉及l(fā)og的消息(一個(gè)routing_key為”MQ.log.error”的消息會(huì)被轉(zhuǎn)發(fā)到該隊(duì)列)。

4.“#”表示0個(gè)或若干個(gè)關(guān)鍵字,“*”表示一個(gè)關(guān)鍵字。如“log.*”能與“log.warn”匹配,無(wú)法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。

5.同樣,如果Exchange沒(méi)有發(fā)現(xiàn)能夠與routing_key匹配的Queue,則會(huì)拋棄此消息。

四、任務(wù)分發(fā)

1.Rabbitmq的任務(wù)是循環(huán)分發(fā)的,如果開(kāi)啟兩個(gè)consumer,producer發(fā)送的信息是輪流發(fā)送到兩個(gè)consume的。

2.在producer端使用cha.basic_publish()來(lái)發(fā)送消息,其中body參數(shù)就是要發(fā)送的消息,properties=pika.BasicProperties(delivery_mode = 2,)啟用消息持久化,可以防止RabbitMQ Server 重啟或者crash引起的數(shù)據(jù)丟失。

3.在接收端使用cha.basic_consume()無(wú)限循環(huán)監(jiān)聽(tīng),如果設(shè)置no-ack參數(shù)為真,每次Consumer接到數(shù)據(jù)后,而不管是否處理完成,RabbitMQ Server會(huì)立即把這個(gè)Message標(biāo)記為完成,然后從queue中刪除了。為了保證數(shù)據(jù)不被丟失,RabbitMQ支持消息確認(rèn)機(jī)制,即acknowledgments。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應(yīng)該是在處理完數(shù)據(jù)后發(fā)送ack。

在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收,處理完成,RabbitMQ可以去安全的刪除它了。如果Consumer退出了但是沒(méi)有發(fā)送ack,那么RabbitMQ就會(huì)把這個(gè)Message發(fā)送到下一個(gè)Consumer。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會(huì)丟失。

這里并沒(méi)有用到超時(shí)機(jī)制。RabbitMQ僅僅通過(guò)Consumer的連接中斷來(lái)確認(rèn)該Message并沒(méi)有被正確處理。也就是說(shuō),RabbitMQ給了Consumer足夠長(zhǎng)的時(shí)間來(lái)做數(shù)據(jù)處理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告訴rabbitmq消息已經(jīng)正確處理。如果沒(méi)有這條代碼,Consumer退出時(shí),Message會(huì)重新分發(fā)。然后RabbitMQ會(huì)占用越來(lái)越多的內(nèi)存,由于RabbitMQ會(huì)長(zhǎng)時(shí)間運(yùn)行,因此這個(gè)“內(nèi)存泄漏”是致命的。去調(diào)試這種錯(cuò)誤,可以通過(guò)一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分發(fā):設(shè)置cha.basic_qos(prefetch_count=1),這樣RabbitMQ就會(huì)使得每個(gè)Consumer在同一個(gè)時(shí)間點(diǎn)最多處理一個(gè)Message。換句話說(shuō),在接收到該Consumer的ack前,他它不會(huì)將新的Message分發(fā)給它。

五、注意:

生產(chǎn)者和消費(fèi)者都應(yīng)該聲明建立隊(duì)列,網(wǎng)上教程上說(shuō)第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會(huì)被修改。

可能因?yàn)榘姹締?wèn)題,在我的測(cè)試中如果第二次聲明建立的隊(duì)列屬性和第一次不完全相同,將報(bào)類似這種錯(cuò)406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次創(chuàng)建屬性不同,將報(bào)這種錯(cuò)406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次聲明建立隊(duì)列也出現(xiàn)這個(gè)錯(cuò)誤,說(shuō)明之前存在名字相同的隊(duì)列且本次聲明的某些屬性和之前聲明不同,可通過(guò)命令sudo rabbitmqctl list_queues查看當(dāng)前有哪些隊(duì)列。解決方法是聲明建立另一名稱的隊(duì)列或刪除原有隊(duì)列,如果原有隊(duì)列是非持久化的,可通過(guò)重啟rabbitmq服務(wù)刪除原有隊(duì)列,如果原有隊(duì)列是持久化的,只能刪除它所在的vhost,然后再重建vhost,再設(shè)置vhost的權(quán)限(先確認(rèn)該vhost中沒(méi)有其他有用隊(duì)列)。

?
1
2
3
sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

以上內(nèi)容是小編給大家介紹的利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列,希望大家喜歡。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产成人高清精品免费5388 | 国产综合久久 | 国产成人精品在线 | 日韩中文一区二区三区 | av在线资源网 | 日韩影院一区 | 色播开心网 | 国产免费自拍av | 午夜视频在线观看一区二区三区 | 日韩欧美国产一区二区三区 | 日韩操操| 天堂资源最新在线 | 精品久久99| 中文字幕视频一区 | 亚洲电影在线播放 | 99热69| 亚洲国产91 | 亚洲一区二区三区免费观看 | 国产在线a | 96久久久| 亚洲一级淫片 | 羞羞免费视频网站 | 国产欧美精品一区二区三区 | 香蕉大人久久国产成人av | 日本一区二区三区视频免费看 | 亚洲最新无码中文字幕久久 | 久草天堂 | 亚洲毛片在线观看 | 色爱区成人综合网 | 性爽视频| 亚洲成人av | 国产精品久久国产精品 | 精品动漫一区 | 91久久| 亚洲 成人 av | 午夜在线电影 | 中文字幕天堂在线 | 亚洲精品成人 | 国产成人精品一区二区三区四区 | 色婷婷影院 | 国产精品视频播放 |