由于項目組現在用的rpc是基于google protobuf rpc協議實現的,所以花了點時間了解下protobuf rpc。rpc對于做分布式系統的人來說肯定不陌生,對于rpc不了解的童鞋可以自行google,這里只是做個簡單的介紹。rpc的主要功能是讓分布式系統的實現更為簡單,為提供強大的遠程調用而不損失本地調用語義的簡潔性。為了實現這個目標,rpc框架需要提供一種透明調用機制讓使用者不必顯示區分本地調用還是遠程調用。rpc架構涉及的組件如下:
客戶方像調用本地方法一樣去調用遠程接口方法,RPC 框架提供接口的代理實現,實際的調用將委托給代理RpcProxy 。代理封裝調用信息并將調用轉交給RpcInvoker 去實際執行。在客戶端的RpcInvoker 通過連接器RpcConnector 去維持與服務端的通道RpcChannel,并使用RpcProtocol 執行協議編碼(encode)并將編碼后的請求消息通過通道發送給服務方。RPC 服務端接收器 RpcAcceptor 接收客戶端的調用請求,同樣使用RpcProtocol 執行協議解碼(decode)。解碼后的調用信息傳遞給RpcProcessor 去控制處理調用過程,最后再委托調用給RpcInvoker 去實際執行并返回調用結果。
protobuf rpc在上面組件中主要扮演RpcProtocol的角色,使得我們省去了協議的設計,并且protobuf協議在編碼和空間效率都是上非常高效的,這也是很多公司采用protobuf作為數據序列化和通信協議的原因。同時protobuf rpc定義了一個抽象的rpc框架,如下圖所示:
RpcServiceStub和RpcService類是protobuf編譯器根據proto定義生成的類,RpcService定義了服務端暴露給客戶端的函數接口,具體實現需要用戶自己繼承這個類來實現。RpcServiceStub定義了服務端暴露函數的描述,并將客戶端對RpcServiceStub中函數的調用統一轉換到調用RpcChannel中的CallMethod方法,CallMethod通過RpcServiceStub傳過來的函數描述符和函數參數對該次rpc調用進行encode,最終通過RpcConnecor發送給服務方。對方以客戶端相反的過程最終調用RpcSerivice中定義的函數。事實上,protobuf rpc的框架只是RpcChannel中定義了空的CallMethod,所以具體怎樣進行encode和調用RpcConnector都要自己實現。RpcConnector在protobuf中沒有定義,所以這個完成由用戶自己實現,它的作用就是收發rpc消息包。在服務端,RpcChannel通過調用RpcService中的CallMethod來具體調用RpcService中暴露給客戶端的函數。
介紹了這么多,對于怎么樣用protobuf rpc來實現一個rpc肯定還是一頭霧水吧,下面就用protobuf rpc來實現一個簡單的python版rpc demo吧。
下面直接給出demo描述PRC的proto文件,至于proto文件的編寫規則可以參考protobuf官網。
common.proto文件:
1
2
3
4
5
6
7
8
9
10
11
|
package game; message RequestMessage { required string message = 1 ; } message ResponseMessage { required string message = 1 ; } |
game_service.proto文件:
1
2
3
4
5
6
7
8
9
|
package game; import "common.proto" ; option py_generic_services = true; service GameService { rpc connect_server(RequestMessage) returns(RequestMessage); } |
common.proto文件描述了RPC中收發的消息;game_service.proto描述了服務器導出的connect_server函數,該函數接受RequestMessage對象作為參數,并返回RequestMessage對象。在使用PRC協議時,必須加上option py_generic_services = true;可選項,要不然編譯器不會生成包含connect_server函數的GameService描述。
使用編譯器protoc編譯proto文件,具體命令為:
protoc.exe --python_out=. game_service.proto
編譯后生成的文件為game_service_pb2.py,該文件主要是實現了GameService和GameService_Stub類。GameService_Stub類用于客戶端調用者來調用GameService的服務。
前面已經說了,在客戶端,RpcChannel只實現了一個空的CallMethod,所以需要繼承RpcChannel重新這個函數來encode消息和發送消息。在服務端RpcChannel需要調用CallMethod來調用Service中的函數。具體實現如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
class MyRpcChannel(service.RpcChannel): def __init__( self , rpc_service, conn): super (MyRpcChannel, self ).__init__() self .logger = LogManager.get_logger( "MyRpcChannel" ) def CallMethod( self , method_descriptor, rpc_controller, request, response_class, done): """"protol buffer rpc 需要的函數,用來發送rpc調用""" self .logger.info( 'CallMethod' ) cmd_index = method_descriptor.index assert (cmd_index < 65535 ) data = request.SerializeToString() total_len = len (data) + 2 self .conn.send_data(' '.join([pack(' <I ', total_len), pack(' <H', cmd_index), data])) def from_request( self ): """"從網絡解析出一個完整的請求之后調的函數""" index_data = self .rpc_request.data[ 0 : 2 ] cmd_index = unpack( '<H' , index_data)[ 0 ] rpc_service = self .rpc_service s_descriptor = rpc_service.GetDescriptor() method = s_descriptor.methods[cmd_index] try : request = rpc_service.GetRequestClass(method)() serialized = self .rpc_request.data[ 2 :] request.ParseFromString(serialized) rpc_service.CallMethod(method, self .controller, request, None ) except : self .logger.error( "Call rpc method failed!" ) self .logger.log_last_except() return True |
最后就是繼承GameService,并實現connect_server函數了。
1
2
3
4
5
6
|
class GameService(game_service_pb2.GameService): def __init__( self ): self .logger = LogManager.get_logger( "GameService" ) def connect_server( self , rpc_controller, request, callback): self .logger.info( '%s' , request.message) |
至于用于網絡收發消息的RpcConnector,可以使用python的asyncore庫實現,具體實現在這就不討論了。
從上面的實現來看,protobuf rpc的實現主要包括編寫proto文件并編譯生成對應的service_pb2文件,繼承RpcChannel并實現CallMethod和調用Service的CallMethod,繼承Service來實現暴露給客戶端的函數。
以上就是本文的全部內容,希望對大家的學習有所幫助。