問題背景
在使用celery執行我們的異步任務時,為了提高效率,celery可以開啟多個進程來啟動對應的worker。
但是會出現這么一種情況:在獲取到數據源之后要對數據庫進行掃描,根據UUID來斷定是插入還是更新,兩個worker 同時 (相差0.001S)拿到了UUID但是在其中一個沒插入時,另一個也掃描完了數據庫,這時這兩個worker都會認為自己拿到的UUID是在數據庫中沒有存在過的,所以都會調用INSERT方法來進行插入操作。
幾種解決方案
為了解決這個問題,一般有如下解決方案.
分布式鎖家族:
數據庫:
- 排它鎖(悲觀鎖)
- 樂觀鎖
Redis
- 自己實現Redis SET SETNX 操作,結合Lua腳本確保原子操作
- RedLock Redis里分布式鎖實現的算法,爭議比較大,謹慎使用
- python-redis-lock 本文將要介紹的技術。這個庫提供的分布式鎖很靈活,是否需要超時?是否需要自動刷新?是否要阻塞?都是可選的。沒有最好的算法,只有最合適的算法,開發人員應該根據實際需求場景謹慎選擇具體用哪一種技術去實現。
設計思路:
Zookeeper
這個應該是功能最強大的,比較專業,穩定性好。我還沒使用過,日后玩明白了再寫篇文章總結一下。
擴展思路
在celery的場景下也可以使用celery_once進行任務去重操作, celery_once底層也是使用redis進行實現的。
可以參考這篇
Talk is cheap, show me your code!
一個簡單的demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import random import time import threading import redis_lock import redis HOST = 'YOUR IP LOCATE' PORT = '6379' PASSWORD = 'password' def get_redis(): pool = redis.ConnectionPool(host = HOST, port = PORT, password = PASSWORD, decode_responses = True , db = 2 ) r = redis.Redis(connection_pool = pool) return r def ask_lock(uuid): lock = redis_lock.Lock(get_redis(), uuid) if lock.acquire(blocking = False ): print ( " %s Got the lock." % uuid) time.sleep( 5 ) lock.release() print ( " %s Release the lock." % uuid) else : print ( " %s Someone else has the lock." % uuid) def simulate(): for i in range ( 10 ): id = random.randint( 0 , 5 ) t = threading.Thread(target = ask_lock, args = ( str ( id ))) t.start() simulate() |
Output:
4 Got the lock.
5 Got the lock.
3 Got the lock.
5 Someone else has the lock.
5 Someone else has the lock.
2 Got the lock.
5 Someone else has the lock.
4 Someone else has the lock.
3 Someone else has the lock.
3 Someone else has the lock.
2 Release the lock.
5 Release the lock.
4 Release the lock.
3 Release the lock.
到此這篇關于python-redis-lock分布式鎖的文章就介紹到這了,更多相關python分布式鎖內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/wyh1618/article/details/120973410