閱讀目錄
1、摘要
2、實現(xiàn)方法
3、一對一消息傳遞
4、多對多消息傳遞
1、摘要
消息傳遞這一應(yīng)用廣泛存在于各個網(wǎng)站中,這個功能也是一個網(wǎng)站必不可少的。常見的消息傳遞應(yīng)用有,新浪微博中的@我呀、給你評論然后的提示呀、贊贊贊提示、私信呀、甚至是發(fā)微博分享的新鮮事;知乎中的私信呀、live發(fā)送過來的消息、知乎團(tuán)隊消息呀等等。
2、實現(xiàn)方法
消息傳遞即兩個或者多個客戶端在相互發(fā)送和接收消息。
通常有兩種方法實現(xiàn):
第一種為消息推送。redis內(nèi)置有這種機(jī)制,publish往頻道推送消息、subscribe訂閱頻道。這種方法有一個缺點就是必須保證接收者時刻在線(即是此時程序不能停下來,一直保持監(jiān)控狀態(tài),假若斷線后就會出現(xiàn)客戶端丟失信息)
第二種為消息拉取。所謂消息拉取,就是客戶端自主去獲取存儲在服務(wù)器中的數(shù)據(jù)。redis內(nèi)部沒有實現(xiàn)消息拉取這種機(jī)制。因此我們需要自己手動編寫代碼去實現(xiàn)這個功能。
在這里我們,我們進(jìn)一步將消息傳遞再細(xì)分為一對一的消息傳遞,多對多的消息傳遞(群組消息傳遞)。
【注:兩個類的代碼相對較多,因此將其折疊起來了】
3、一對一消息傳遞
例子1:一對一消息發(fā)送與獲取
模塊要求:
1、提示有多少個聯(lián)系人發(fā)來新消息
2、信息包含發(fā)送人、時間、信息內(nèi)容
3、能夠獲取之前的舊消息
4、并且消息能夠保持7天,過期將會被動觸發(fā)刪除
redis實現(xiàn)思路:
1、新消息與舊消息分別采用兩個鏈表來存儲
2、原始消息的結(jié)構(gòu)采用數(shù)組的形式存放,并且含有發(fā)送人、時間戳、信息內(nèi)容
3、在推入redis的鏈表前,需要將數(shù)據(jù)轉(zhuǎn)換為json類型然后再進(jìn)行存儲
4、在取出新信息時應(yīng)該使用rpoplpush來實現(xiàn),將已讀的新消息推入舊消息鏈表中
5、取出舊消息時,應(yīng)該用舊消息的時間與現(xiàn)在的時間進(jìn)行對比,若超時,則直接刪除后面的全部數(shù)據(jù)(因為數(shù)據(jù)是按時間一個一個壓進(jìn)鏈表中的,所以對于時間是有序排列的)
數(shù)據(jù)存儲結(jié)構(gòu)圖:
php的實現(xiàn)代碼:
#singlepullmessage.class.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
<?php #單接接收者接收消息 class singlepullmessage { private $redis = '' ; #存儲redis對象 /** * @desc 構(gòu)造函數(shù) * * @param $host string | redis主機(jī) * @param $port int | 端口 */ public function __construct( $host , $port =6379) { $this ->redis= new redis(); $this ->redis->connect( $host , $port ); } /** * @desc 發(fā)送消息(一個人) * * @param $touser string | 接收人 * @param $messagearr array | 發(fā)送的消息數(shù)組,包含sender、message、time * * @return bool */ public function sendsingle( $touser , $messagearr ) { $json_message =json_encode( $messagearr ); #編碼成json數(shù)據(jù) return $this ->redis->lpush( $touser , $json_message ); #將數(shù)據(jù)推入鏈表 } /** * @desc 用戶獲取新消息 * * @param $user string | 用戶名 * * @return array 返回數(shù)組,包含多少個用戶發(fā)來新消息,以及具體消息 */ public function getnewmessage( $user ) { #接收新信息數(shù)據(jù),并且將數(shù)據(jù)推入舊信息數(shù)據(jù)鏈表中,并且在原鏈表中刪除 $messagearr = array (); while ( $json_message = $this ->redis->rpoplpush( $user , 'premessage_' . $user )) { $temp =json_decode( $json_message ); #將json數(shù)據(jù)變成對象 $messagearr [ $temp ->sender][]= $temp ; #轉(zhuǎn)換成數(shù)組信息 } if ( $messagearr ) { $arr [ 'count' ]= count ( $messagearr ); #統(tǒng)計有多少個用戶發(fā)來信息 $arr [ 'messagearr' ]= $messagearr ; return $arr ; } return false; } public function getpremessage( $user ) { ##取出舊消息 $messagearr = array (); $json_pre = $this ->redis->lrange( 'premessage_' . $user , 0, -1); #一次性將全部舊消息取出來 foreach ( $json_pre as $k => $v ) { $temp =json_decode( $v ); #json反編碼 $timeout = $temp ->time+60*60*24*7; #數(shù)據(jù)過期時間 七天過期 if ( $timeout <time()) #判斷數(shù)據(jù)是否過期 { if ( $k ==0) #若是最遲插入的數(shù)據(jù)都過期了,則將所有數(shù)據(jù)刪除 { $this ->redis->del( 'premessage_' . $user ); break ; } $this ->redis->ltrim( 'premessage_' . $user , 0, $k ); #若檢測出有過期的,則將比它之前插入的所有數(shù)據(jù)刪除 break ; } $messagearr [ $temp ->sender][]= $temp ; } return $messagearr ; } /** * @desc 消息處理,沒什么特別的作用。在這里這是用來處理數(shù)組信息,然后將其輸出。 * * @param $arr array | 需要處理的信息數(shù)組 * * @return 返回打印輸出 */ public function dealarr( $arr ) { foreach ( $arr as $k => $v ) { foreach ( $v as $k1 => $v2 ) { echo '發(fā)送人:' . $v2 ->sender. ' 發(fā)送時間:' . date ( 'y-m-d h:i:s' , $v2 ->time). '<br/>' ; echo '消息內(nèi)容:' . $v2 ->message. '<br/>' ; } echo "<hr/>" ; } } } |
測試:
1、發(fā)送消息
#建立test1.php
1
2
3
4
5
6
7
8
9
|
include './singlepullmessage.class.php' ; $object = new singlepullmessage( '192.168.95.11' ); #發(fā)送消息 $sender = 'boss' ; #發(fā)送者 $to = 'jane' ; #接收者 $message = 'how are you' ; #信息 $time =time(); $arr = array ( 'sender' => $sender , 'message' => $message , 'time' => $time ); echo $object ->sendsingle( $to , $arr ); |
2、獲取新消息
#建立test2.php
1
2
3
4
5
6
7
8
9
10
11
|
include './singlepullmessage.class.php' ; $object = new singlepullmessage( '192.168.95.11' ); #獲取新消息 $arr = $object ->getnewmessage( 'jane' ); if ( $arr ) { echo $arr [ 'count' ]. "個聯(lián)系人發(fā)來新消息<br/><hr/>" ; $object ->dealarr( $arr [ 'messagearr' ]); } else echo "無新消息" ; |
訪問結(jié)果:
3、獲取舊消息
#建立test3.php
1
2
3
4
5
6
7
8
9
10
|
include './singlepullmessage.class.php' ; $object = new singlepullmessage( '192.168.95.11' ); #獲取舊消息 $arr = $object ->getpremessage( 'jane' ); if ( $arr ) { $object ->dealarr( $arr ); } else echo "無舊數(shù)據(jù)" ; |
4、多對多消息傳遞
例子2:多對多消息發(fā)送與獲取(即是群組)
模塊要求:
1、用戶能夠自行創(chuàng)建群組,并成為群主
2、群主可以拉人進(jìn)來作為群組成員、并且可以踢人
3、用戶可以直接退出群組
4、可以發(fā)送消息,每一位成員都可以拉取消息
5、群組的消息最大容納量為5000條
6、成員可以拉取新消息,并提示有多少新消息
7、成員可以分頁獲取之前已讀的舊消息
。。。。。功能就寫這幾個吧,有需要或者想練習(xí)的同學(xué)們可以增加其他功能,例如禁言、匿名消息發(fā)送、文件發(fā)送等等。
redis實現(xiàn)思路:
1、群組的消息以及群組的成員組成采用有序集合進(jìn)行存儲。群組消息有序集合的member存儲用戶發(fā)送的json數(shù)據(jù)消息,score存儲唯一值,將采用原子操作incr獲取string中的自增長值進(jìn)行存儲;群組成員有序集合的member存儲user,score存儲非零數(shù)字(在這里這個score意義不大,我的例子代碼中使用數(shù)字1為群主的score,其他的存儲為2。當(dāng)然這使用這個數(shù)據(jù)還可以擴(kuò)展別的功能,例如群組中成員等級)可參考下面數(shù)據(jù)存儲結(jié)構(gòu)簡圖。
2、用戶所加入的群組也是采用有序集合進(jìn)行存儲。其中,member存儲群組id,score存儲用戶已經(jīng)獲取該群組的最大消息分值(對應(yīng)群組消息的score值)
3、用戶創(chuàng)建群組的時候,通過原子操作incr從而獲取一個唯一id
4、用戶在群中發(fā)送消息時,也是通過原子操作incr獲取一個唯一自增長有序id
5、在執(zhí)行incr時,為防止并發(fā)導(dǎo)致競爭關(guān)系,因此需要進(jìn)行加鎖操作【redis詳細(xì)鎖的講解可以參考:redis構(gòu)建分布式鎖】
6、創(chuàng)建群組方法簡要思路,任何一個用戶都可以創(chuàng)建群組聊天,在創(chuàng)建的同時,可以選擇時是否添加群組成員(參數(shù)通過數(shù)組的形式)。創(chuàng)建過程將會為這個群組建立一個群組成員有序集合(群組信息有序集合暫時不創(chuàng)建),接著將群主添加進(jìn)去,再將群id添加用戶所參加的群組有序集合中。
數(shù)據(jù)存儲結(jié)構(gòu)圖:
php的代碼實現(xiàn):
#manypullmessage.class.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
<?php class manypullmessage { private $redis = '' ; #存儲redis對象 /** * @desc 構(gòu)造函數(shù) * * @param $host string | redis主機(jī) * @param $port int | 端口 */ public function __construct( $host , $port =6379) { $this ->redis= new redis(); $this ->redis->connect( $host , $port ); } /** * @desc 用于創(chuàng)建群組的方法,在創(chuàng)建的同時還可以拉人進(jìn)群組 * * @param $user string | 用戶名,創(chuàng)建群組的主人 * @param $adduser array | 其他用戶構(gòu)成的數(shù)組 * * @param $lockname string | 鎖的名字,用于獲取群組id的時候用 * @return int 返回群組id */ public function creategroupchat( $user , $adduser = array (), $lockname = 'chatidlock' ) { $identifier = $this ->getlock( $lockname ); #獲取鎖 if ( $identifier ) { $id = $this ->redis->incr( 'groupchatid' ); #獲取群組id $this ->releaselock( $lockname , $identifier ); #釋放鎖 } else return false; $messagecount = $this ->redis->set( 'countmessage_' . $id , 0); #初始化這個群組消息計數(shù)器 #開啟非事務(wù)型流水線,一次性將所有redis命令傳給redis,減少與redis的連接 $pipe = $this ->redis->pipeline(); $this ->redis->zadd( 'groupchat_' . $id , 1, $user ); #創(chuàng)建群組成員有序集合,并添加群主 #將這個群組添加到user所參加的群組有序集合中 $this ->redis->zadd( 'hasgroupchat_' . $user , 0, $id ); foreach ( $adduser as $v ) #創(chuàng)建群組的同時需要添加的用戶成員 { $this ->redis->zadd( 'groupchat_' . $id , 2, $v ); $this ->redis->zadd( 'hasgroupchat_' . $v , 0, $id ); } $pipe -> exec (); return $id ; #返回群組id } /** * @desc 群主主動拉人進(jìn)群 * * @param $user string | 群主名 * @param $groupchatid int | 群組id * @param $addmembers array | 需要拉進(jìn)群的用戶 * * @return bool */ public function addmembers( $user , $groupchatid , $addmembers = array ()) { $groupmasterscore = $this ->redis->zscore( 'groupchat_' . $groupchatid , $user ); #將groupchatname的群主取出來 if ( $groupmasterscore ==1) #判斷user是否是群主 { $pipe = $this ->redis->pipeline(); #開啟非事務(wù)流水線 foreach ( $addmembers as $v ) { $this ->redis->zadd( 'groupchat_' . $groupchatid , 2, $v ); #添加進(jìn)群 $this ->redis->zadd( 'hasgroupchat_' . $v , 0, $groupchatid ); #添加群名到用戶的有序集合中 } $pipe -> exec (); return true; } return false; } /** * @desc 群主刪除成員 * * @param $user string | 群主名 * @param $groupchatid int | 群組id * @param $delmembers array | 需要刪除的成員名字 * * @return bool */ public function delmembers( $user , $groupchatid , $delmembers = array ()) { $groupmasterscore = $this ->redis->zscore( 'groupchat_' . $groupchatid , $user ); if ( $groupmasterscore ==1) #判斷user是否是群主 { $pipe = $this ->redis->pipeline(); #開啟非事務(wù)流水線 foreach ( $delmembers as $v ) { $this ->redis->zrem( 'groupchat_' . $groupchatid , $v ); $this ->redis->zrem( 'hasgroupchat_' . $v , $groupchatid ); } $pipe -> exec (); return true; } return false; } /** * @desc 退出群組 * * @param $user string | 用戶名 * @param $groupchatid int | 群組名 */ public function quitgroupchat( $user , $groupchatid ) { $this ->redis->zrem( 'groupchat_' . $groupchatid , $user ); $this ->redis->zrem( 'hasgroupchat_' . $user , $groupchatid ); return true; } /** * @desc 發(fā)送消息 * * @param $user string | 用戶名 * @param $groupchatid int | 群組id * @param $messagearr array | 包含發(fā)送消息的數(shù)組 * @param $prelockname string | 群消息鎖前綴,群消息鎖全名為countlock_群id * * @return bool */ public function sendmessage( $user , $groupchatid , $messagearr , $prelockname = 'countlock_' ) { $memberscore = $this ->redis->zscore( 'groupchat_' . $groupchatid , $user ); #成員score if ( $memberscore ) { $identifier = $this ->getlock( $prelockname . $groupchatid ); #獲取鎖 if ( $identifier ) #判斷獲取鎖是否成功 { $messagecount = $this ->redis->incr( 'countmessage_' . $groupchatid ); $this ->releaselock( $prelockname . $groupchatid , $identifier ); #釋放鎖 } else return false; $json_message =json_encode( $messagearr ); $this ->redis->zadd( 'groupchatmessage_' . $groupchatid , $messagecount , $json_message ); $count = $this ->redis->zcard( 'groupchatmessage_' . $groupchatid ); #查看信息量大小 if ( $count >5000) #判斷數(shù)據(jù)量有沒有達(dá)到5000條 { #數(shù)據(jù)量超5000,則需要清除舊數(shù)據(jù) $start =5000- $count ; $this ->redis->zremrangebyrank( 'groupchatmessage_' . $groupchatid , $start , $count ); } return true; } return false; } /** * @desc 獲取新信息 * * @param $user string | 用戶名 * * @return 成功則放回json數(shù)據(jù)數(shù)組,無新信息返回false */ public function getnewmessage( $user ) { $arrid = $this ->redis->zrange( 'hasgroupchat_' . $user , 0, -1, 'withscores' ); #獲取用戶擁有的群組id $json_message = array (); #初始化 foreach ( $arrid as $k => $v ) #遍歷循環(huán)所有群組,查看是否有新消息 { $messagecount = $this ->redis->get( 'countmessage_' . $k ); #群組最大信息分值數(shù) if ( $messagecount > $v ) #判斷用戶是否存在未讀新消息 { $json_message [ $k ][ 'message' ]= $this ->redis->zrangebyscore( 'groupchatmessage_' . $k , $v +1, $messagecount ); $json_message [ $k ][ 'count' ]= count ( $json_message [ $k ][ 'message' ]); #統(tǒng)計新消息數(shù)量 $this ->redis->zadd( 'hasgroupchat_' . $user , $messagecount , $k ); #更新已獲取消息 } } if ( $json_message ) return $json_message ; return false; } /** * @desc 分頁獲取群組信息 * * @param $user string | 用戶名 * @param $groupchatid int | 群組id * @param $page int | 第幾頁 * @param $size int | 每頁多少條數(shù)據(jù) * * @return 成功返回json數(shù)據(jù),失敗返回false */ public function getpartmessage( $user , $groupchatid , $page =1, $size =10) { $start = $page * $size - $size ; #開始截取數(shù)據(jù)位置 $stop = $page * $size -1; #結(jié)束截取數(shù)據(jù)位置 $json_message = $this ->redis->zrevrange( 'groupchatmessage_' . $groupchatid , $start , $stop ); if ( $json_message ) return $json_message ; return false; } /** * @desc 加鎖方法 * * @param $lockname string | 鎖的名字 * @param $timeout int | 鎖的過期時間 * * @return 成功返回identifier/失敗返回false */ public function getlock( $lockname , $timeout =2) { $identifier =uniqid(); #獲取唯一標(biāo)識符 $timeout = ceil ( $timeout ); #確保是整數(shù) $end =time()+ $timeout ; while (time()< $end ) #循環(huán)獲取鎖 { /* #這里的set操作可以等同于下面那個if操作,并且可以減少一次與redis通訊 if($this->redis->set($lockname, $identifier array('nx', 'ex'=>$timeout))) return $identifier; */ if ( $this ->redis->setnx( $lockname , $identifier )) #查看 $lockname 是否被上鎖 { $this ->redis->expire( $lockname , $timeout ); #為 $lockname 設(shè)置過期時間 return $identifier ; #返回一維標(biāo)識符 } elseif ( $this ->redis->ttl( $lockname )===-1) { $this ->redis->expire( $lockname , $timeout ); #檢測是否有設(shè)置過期時間,沒有則加上 } usleep(0.001); #停止0.001ms } return false; } /** * @desc 釋放鎖 * * @param $lockname string | 鎖名 * @param $identifier string | 鎖的唯一值 * * @param bool */ public function releaselock( $lockname , $identifier ) { if ( $this ->redis->get( $lockname )== $identifier ) #判斷是鎖有沒有被其他客戶端修改 { $this ->redis->multi(); $this ->redis->del( $lockname ); #釋放鎖 $this ->redis-> exec (); return true; } else { return false; #其他客戶端修改了鎖,不能刪除別人的鎖 } } } ?> |
測試:
1、建立creategroupchat.php(測試創(chuàng)建群組功能)
執(zhí)行代碼并創(chuàng)建568、569群組(群主為jack)
1
2
3
4
5
6
7
8
9
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); #創(chuàng)建群組 $user = 'jack' ; $arr = array ( 'jane1' , 'jane2' ); $a = $object ->creategroupchat( $user , $arr ); echo "<pre>" ; print_r( $a ); echo "</pre>" ; die ; |
2、建立addmembers.php(測試添加成員功能)
執(zhí)行代碼并添加新成員
1
2
3
4
5
6
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); $b = $object ->addmembers( 'jack' , '568' , array ( 'jane1' , 'jane2' , 'jane3' , 'jane4' )); echo "<pre>" ; print_r( $b ); echo "</pre>" ; die ; |
3、建立delete.php(測試群主刪除成員功能)
1
2
3
4
5
6
7
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); #群主刪除成員 $c = $object ->delmembers( 'jack' , '568' , array ( 'jane1' , 'jane4' )); echo "<pre>" ; print_r( $c ); echo "</pre>" ; die ; |
4、建立sendmessage.php(測試發(fā)送消息功能)
多執(zhí)行幾遍,568、569都發(fā)幾條
1
2
3
4
5
6
7
8
9
10
11
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); #發(fā)送消息 $user = 'jane2' ; $message = 'go go go' ; $groupchatid =568; $arr = array ( 'sender' => $user , 'message' => $message , 'time' =>time()); $d = $object ->sendmessage( $user , $groupchatid , $arr ); echo "<pre>" ; print_r( $d ); echo "</pre>" ; die ; |
5、建立getnewmessage.php(測試用戶獲取新消息功能)
1
2
3
4
5
6
7
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); #用戶獲取新消息 $e = $object ->getnewmessage( 'jane2' ); echo "<pre>" ; print_r( $e ); echo "</pre>" ; die ; |
6、建立getpartmessage.php(測試用戶獲取某個群組部分消息)
(多發(fā)送幾條消息,用于測試。568中共18條數(shù)據(jù))
1
2
3
4
5
6
7
|
include './manypullmessage.class.php' ; $object = new manypullmessage( '192.168.95.11' ); #用戶獲取某個群組部分消息 $f = $object ->getpartmessage( 'jane2' , 568, 1, 10); echo "<pre>" ; print_r( $f ); echo "</pre>" ; die ; |
page=1,size=10
page=2,size=10
測試完畢,還需要別的功能可以自己進(jìn)行修改添加測試。
這次整理這篇文章相對比較趕,心里已經(jīng)想著快點整理完趕緊學(xué)習(xí)其他的技術(shù)啦,哈哈。各位大神請留步,懇請各位給點學(xué)習(xí)redis的指導(dǎo)意見,本人職業(yè)方向是php
以上就是本文的全部內(nèi)容,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,同時也希望多多支持服務(wù)器之家!
原文鏈接:http://www.cnblogs.com/phpstudy2015-6/p/6629000.html