目錄
- Actor
- Node
- ActorSystem
- ActorSystem初始化
- 創(chuàng)建Actor
- 發(fā)送消息
- 休眠Actor
- 定時(shí)器
- 小結(jié)
Actor模型是一種常見(jiàn)的并發(fā)模型,與最常見(jiàn)的并發(fā)模型——共享內(nèi)存(同步鎖)不同,它將程序分為許多獨(dú)立的計(jì)算單元——Actor,每個(gè)Actor獨(dú)立管理自己的資源,不同Actor之間通過(guò)消息傳遞來(lái)交互。它的好處是全異步執(zhí)行,不會(huì)造成線程阻塞,從而提升CPU使用率,另外由于線程之間是異步交互,所以也不用考慮加鎖和線程同步的問(wèn)題。
Actor模型在業(yè)界有許多應(yīng)用,例如游戲服務(wù)器框架Skynet、編程語(yǔ)言Erlang。
因?yàn)闅v史原因,Java下的Actor模型應(yīng)用較少,知名的只有基于Scala的Akka。而且Actor模型也不是萬(wàn)能的,異步編程會(huì)需要編寫(xiě)更多的回調(diào)代碼,原本的一步需要拆分成若干步來(lái)處理,無(wú)疑增加了代碼編寫(xiě)復(fù)雜度(callback hell)。
本文以學(xué)習(xí)和研究為目的,使用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單Actor模型,功能上模仿Skynet,支持的功能包括:
- Actor基礎(chǔ)功能:消息發(fā)送接收、異步處理等。
- 集群功能:支持多節(jié)點(diǎn)之間通信。
- 非阻塞的sleep和網(wǎng)絡(luò)通信。
完整的源代碼在可以在Github獲取。以下是部分關(guān)鍵代碼以及設(shè)計(jì)思路講解。
Actor
Actor是Actor模型中的核心概念,每個(gè)Actor獨(dú)立管理自己的資源,與其他Actor之間通信通過(guò)Message。
這里的每個(gè)Actor由單線程驅(qū)動(dòng),相當(dāng)于Skynet中的服務(wù)。Actor不斷從mailbox中獲取尚未處理的Message,mailbox使用的結(jié)構(gòu)是無(wú)界阻塞的LinkedBlockingQueue。
Actor類是抽象類,其中處理消息的handleMessage方法為抽象方法,需要每個(gè)具體類來(lái)重載實(shí)現(xiàn)。
public abstract class Actor { private Node node; private String name; private final BlockingQueue<Message> mailbox = new LinkedBlockingQueue<>(); private Thread actorThread; public Node getNode() { return node; } public void setNode(Node node) { this.node = node; } public void setName(String name) { this.name = name; } public String getName() { return name; } public void start() { actorThread = new Thread(() -> { ActorSystem.setThreadLocalActor(this); for(;;) { try { Message message = mailbox.take(); try { handleMessage(message); } catch (Exception e) { e.printStackTrace(); } } catch (InterruptedException ignore) { // ignore } } }); actorThread.start(); } public void act(Message msg) { mailbox.offer(msg); } protected abstract void handleMessage(Message message); }
Node
Node代表節(jié)點(diǎn),與Skynet中節(jié)點(diǎn)意義相同。它是一個(gè)獨(dú)立的Java進(jìn)程,有自己的IP和端口,Node之間通過(guò)異步的網(wǎng)絡(luò)通信發(fā)送和接收消息。一個(gè)Node中可以運(yùn)行多個(gè)Actor,一個(gè)Actor僅可與一個(gè)Node綁定。
Node的唯一標(biāo)識(shí)也是它的name,與Actor的name稍有不同,Node的name是全局唯一,而Actor的name是Node內(nèi)唯一。
public abstract class Node { /** * 名字 * 需要是唯一的,按名字查找 */ private String name; private InetSocketAddress address; public String getName() { return name; } public void setName(String nodeName) { name = nodeName; } public void setAddress(InetSocketAddress address) { this.address = address; } }
ActorSystem
ActorSystem是Actor的管理系統(tǒng),也是外部調(diào)用API的主要入口,提供本框架中的主要功能:創(chuàng)建Actor、發(fā)送消息、休眠Actor、網(wǎng)絡(luò)通信等。下面分別詳細(xì)說(shuō)明。
ActorSystem初始化
分為以下三步:
首先是調(diào)用conf方法讀取集群配置,包括每個(gè)Node的name和address。
其次是調(diào)用bindNode方法綁定當(dāng)前Node。
最后是調(diào)用start方法初始化自身,包括對(duì)定時(shí)器的初始化和Netty服務(wù)端的初始化。之所以引入定時(shí)器,是因?yàn)闊o(wú)阻塞sleep需要用到,這個(gè)具體后面再說(shuō),另外也可以用于擴(kuò)展實(shí)現(xiàn)通用的定時(shí)任務(wù)功能。Node之間發(fā)送消息都是異步的,客戶端和服務(wù)端都使用了Netty做異步網(wǎng)絡(luò)通信。
public class ActorSystem { private static Map<String, InetSocketAddress> clusterConfig; /** * 當(dāng)前綁定到的節(jié)點(diǎn) */ private static Node currNode; private final static Map<String, Actor> actors = new HashMap<>(); /** * 維護(hù)線程與Actor的對(duì)應(yīng)關(guān)系 */ private final static ThreadLocal<Actor> currThreadActor = new ThreadLocal<>(); /** * 客戶端Netty bootstrap */ private static Bootstrap clientBootstrap; /** * 維護(hù)節(jié)點(diǎn)與通道的對(duì)應(yīng)關(guān)系 */ private final static Map<String, Channel> channels = new ConcurrentHashMap<>(); private static void startNettyBootstrap() { try { // 先啟動(dòng)服務(wù)端bootstrap EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) .addLast(new ServerHandler()); } }); InetSocketAddress address = clusterConfig.get(currNode.getName()); b.bind(address).sync(); // 再啟動(dòng)客戶端bootstrap EventLoopGroup group = new NioEventLoopGroup(); clientBootstrap = new Bootstrap(); clientBootstrap.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) .addLast(new ClientHandler()); } }); } catch (Exception e) { throw new RuntimeException("actor system start fail", e); } } public static void start() { // 啟動(dòng)定時(shí)器 Timer.start(); // 啟動(dòng)Netty bootstrap startNettyBootstrap(); } public static void conf(Map<String, InetSocketAddress> config) { clusterConfig = config; } /** * 將當(dāng)前系統(tǒng)綁定到某個(gè)節(jié)點(diǎn) */ public static void bindNode(Class<? extends Node> nodeClass, String nodeName) { InetSocketAddress address = clusterConfig.get(nodeName); try { Constructor<? extends Node> constructor = nodeClass.getDeclaredConstructor(); Node node = constructor.newInstance(); node.setName(nodeName); currNode = node; } catch (Exception e) { throw new RuntimeException("create node fail", e); } }
創(chuàng)建Actor
創(chuàng)建Actor調(diào)用newActor方法,指定要?jiǎng)?chuàng)建的Actor具體類和Actor name,Actor name需要Node內(nèi)部唯一。
創(chuàng)建Actor時(shí),先綁定當(dāng)前Node,再調(diào)用Actor的start方法初始化,然后將name與Actor的映射關(guān)系加入到actors中。
/** * 啟動(dòng)新的Actor */ public static void newActor(Class<? extends Actor> actorClass, String name) { try { Constructor<? extends Actor> constructor = actorClass.getDeclaredConstructor(); Actor actor = constructor.newInstance(); actor.setName(name); actor.setNode(currNode); actor.start(); actors.put(name, actor); } catch (Exception e) { throw new RuntimeException("create actor fail", e); } } }
發(fā)送消息
核心是send方法,指定目標(biāo)Node name、目標(biāo)Actor name、命令名和參數(shù)后發(fā)送消息,也可以把這些信息包裝在Message中發(fā)出。
消息的來(lái)源Node和來(lái)源Actor保存在一個(gè)ThreadLocal變量currThreadActor中。它的作用是在Actor創(chuàng)建時(shí),將Actor線程與Actor綁定在一起,這樣當(dāng)調(diào)用send方法發(fā)送消息時(shí),無(wú)需再顯式指定來(lái)源Node和來(lái)源Actor,因?yàn)槿绻茿ctor線程本身調(diào)用的send方法,那么直接從currThreadActor中取值即可;否則取不到值,那么來(lái)源Node和來(lái)源Actor都是null。
如果消息的目標(biāo)Node與來(lái)源Node相同,那么直接找到對(duì)應(yīng)的Actor添加消息即可;否則,需要走網(wǎng)絡(luò)通信。這里的網(wǎng)絡(luò)通信實(shí)際上就是一個(gè)簡(jiǎn)單的RPC通信,此處使用了Netty的ObjectEncoder和ObjectDecoder做消息的序列化和反序列化(注意:ObjectEncoder和ObjectDecoder在Netty的最新版本中已被廢棄,因?yàn)镴ava序列化具有很大的安全隱患,這里仍然使用它們僅是為了演示方便)。
當(dāng)走網(wǎng)絡(luò)通信發(fā)送消息時(shí),先判斷到目標(biāo)Node的Channel是否有效,若是,則直接發(fā)送消息;否則,先重新創(chuàng)建好Channel,再異步發(fā)送。這里實(shí)際上會(huì)有一個(gè)多線程同步的問(wèn)題,就是多個(gè)線程同時(shí)嘗試創(chuàng)建Channel,那么后面創(chuàng)建的Channel會(huì)把前面的覆蓋掉,最后只會(huì)保留最后創(chuàng)建的一個(gè)。優(yōu)化方法有兩種:一是允許多個(gè)線程同時(shí)嘗試創(chuàng)建Channel,但是當(dāng)創(chuàng)建Channel成功時(shí),如果發(fā)現(xiàn)已經(jīng)有創(chuàng)建好的Channel引用了(來(lái)自別的線程創(chuàng)建),那么不保留這次創(chuàng)建的Channel,發(fā)送也通過(guò)已有的Channel引用;二是每次嘗試創(chuàng)建Channel時(shí)都禁止別的線程做同樣的操作。兩種優(yōu)化方法各有優(yōu)劣,限于時(shí)間,這里沒(méi)有用優(yōu)化方法做具體實(shí)現(xiàn)。
public static void send(Message msg) { String destNodeName = msg.getDestNode(); String destActorName = msg.getDestActor(); if (destNodeName.equals(currNode.getName())) { Actor destActor = actors.get(destActorName); destActor.act(msg); } else { sendToAnotherNode(msg); } } private static void sendToAnotherNode(Message msg) { try { String destNodeName = msg.getDestNode(); // 如果沒(méi)有連接,那么先建立連接 Channel channel = getChannel(destNodeName); if (!isChannelValid(channel)) { InetSocketAddress address = clusterConfig.get(destNodeName); // TODO 有可能出現(xiàn)多線程同時(shí)嘗試建立連接的情況,這里會(huì)保留最后一個(gè) // 優(yōu)化方法有兩種: // 1. 允許多次嘗試,當(dāng)建立連接成功后,如果已有成功連接的引用,那么不保留這次創(chuàng)建的連接 // 2. 嘗試時(shí)阻塞其他嘗試 clientBootstrap.connect(address).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { setChannel(destNodeName, future.channel()); future.channel().writeAndFlush(msg); } }); } else { // 否則直接發(fā)送消息 channel.writeAndFlush(msg); } } catch (Exception e) { throw new RuntimeException("send to another node fail"); } } public static void send(String destNodeName, String destActorName, String command, Object... params) { Actor srcActor = currThreadActor.get(); String srcActorName = srcActor == null ? null : srcActor.getName(); String srcNodeName = srcActor == null ? null : srcActor.getNode().getName(); Message msg = new Message(command, srcNodeName, srcActorName, destNodeName, destActorName, params); send(msg); } public static boolean isChannelValid(Channel channel) { return channel != null && channel.isActive() && channel.isWritable(); } public static Channel getChannel(String destNodeName) { return channels.get(destNodeName); } public static void setChannel(String destNodeName, Channel channel) { channels.put(destNodeName, channel); } /** * Actor發(fā)送給自己 */ public static void sendSelf(String command, Object... params) { Actor selfActor = currThreadActor.get(); if (selfActor == null) { throw new RuntimeException("not in an actor, send fail"); } send(selfActor.getNode().getName(), selfActor.getName(), command, params); } public static void setThreadLocalActor(Actor actor) { currThreadActor.set(actor); }
休眠Actor
休眠Actor調(diào)用sleep方法實(shí)現(xiàn),它制定了需要休眠的毫秒數(shù),休眠完后回調(diào)的命令及參數(shù)。
sleep方法對(duì)應(yīng)于Skynet中的skynet.sleep,它們都是阻塞任務(wù)但是不阻塞線程。不同的是,skynet.sleep使用了Lua的協(xié)程yield/resume,在實(shí)現(xiàn)上更加優(yōu)雅,對(duì)用戶是透明的,用戶無(wú)需指定回調(diào)函數(shù),就能在sleep到期時(shí)自動(dòng)切換回當(dāng)前任務(wù)繼續(xù)執(zhí)行。而Java沒(méi)有這種特性,所以此處乞丐版的實(shí)現(xiàn)需要指定回調(diào)方法。
這里的sleep方法和skynet.sleep一樣,底層都是通過(guò)定時(shí)任務(wù)來(lái)實(shí)現(xiàn)。具體來(lái)說(shuō),sleep調(diào)用后會(huì)添加一個(gè)TimerTask,封裝了過(guò)期時(shí)間和回調(diào)命令及參數(shù),待任務(wù)到期后將命令封裝成Message發(fā)送給當(dāng)前Actor自身。
public static void sleep(long millis, String command, Object... params) { String destActorName = currThreadActor.get().getName(); Timer.addTimeTask(new TimerTask(System.currentTimeMillis() + millis, () -> { ActorSystem.send(currNode.getName(), destActorName, command, params); })); }
定時(shí)器
上面說(shuō)到sleep方法依賴定時(shí)器的實(shí)現(xiàn)。定時(shí)器在Timer類中實(shí)現(xiàn),它在start方法中啟動(dòng)一個(gè)線程不斷輪詢處理定時(shí)任務(wù),并提供了addTimeTask方法添加新的定時(shí)任務(wù)。
Timer使用優(yōu)先級(jí)隊(duì)列作為存儲(chǔ)定時(shí)任務(wù)的數(shù)據(jù)結(jié)構(gòu),這樣在插入任務(wù)時(shí)可以達(dá)到O(logN)的時(shí)間復(fù)雜度。
為性能考慮,Timer主線程非采用每隔一小段時(shí)間不斷輪詢的方式,而是在當(dāng)前沒(méi)有任務(wù)需要執(zhí)行時(shí)保持阻塞。為此需要考慮兩個(gè)喚醒阻塞條件,一是任務(wù)隊(duì)列由空到非空時(shí)喚醒,二是當(dāng)下個(gè)定時(shí)任務(wù)還沒(méi)到期而阻塞時(shí),插入一個(gè)到期時(shí)間更早的定時(shí)任務(wù),需要重新設(shè)定阻塞時(shí)間,因此先喚醒主線程。
public class Timer { /** * 基于優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的定時(shí)任務(wù)隊(duì)列 */ private static final PriorityQueue<TimerTask> timerTasks = new PriorityQueue<>(); private static final ReentrantLock lock = new ReentrantLock(); /** * 喚醒阻塞條件一:隊(duì)列非空 */ private static final Condition notEmpty = lock.newCondition(); /** * 喚醒阻塞條件二:當(dāng)前時(shí)刻有任務(wù)需要執(zhí)行 */ private static final Condition hasCurrTask = lock.newCondition(); /** * 添加新的定時(shí)任務(wù) */ public static void addTimeTask(TimerTask task) { lock.lock(); if (timerTasks.isEmpty()) { notEmpty.signal(); } TimerTask firstTask = timerTasks.peek(); timerTasks.offer(task); if (firstTask != null && task.getExecTime() < firstTask.getExecTime()) { hasCurrTask.signal(); } lock.unlock(); } /** * 啟動(dòng)定時(shí)器 */ public static void start() { Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { while (true) { TimerTask firstTask; lock.lock(); if (timerTasks.isEmpty()) { try { notEmpty.await(); } catch (InterruptedException ignore) { // ignore } } firstTask = timerTasks.peek(); long currDeadlineMillis = firstTask.getExecTime(); long currTime = System.currentTimeMillis(); long delay = currDeadlineMillis - currTime; if (delay > 0) { try { hasCurrTask.await(delay, TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { // ignore } } else { firstTask = timerTasks.poll(); } lock.unlock(); if (firstTask != null) { firstTask.run(); } } }); } }
程序運(yùn)行
示例程序放在test包下面,涉及到的類說(shuō)明:
ActorPing:每隔固定間隔向ActorPong發(fā)送消息,并接收回包。
ActorPong:接收ActorPing發(fā)送的消息并原樣返回。
Cluster:包含NodeA和NodeB兩個(gè)節(jié)點(diǎn)的配置。
NodeA:?jiǎn)?dòng)時(shí)創(chuàng)建兩個(gè)ActorPing,分別命名為ping1和ping2,分別以1s和5s的間隔向NodeB上的pong發(fā)送消息。
NodeB:?jiǎn)?dòng)時(shí)創(chuàng)建一個(gè)ActorPong,命名為pong。
運(yùn)行時(shí),先啟動(dòng)NodeB,再啟動(dòng)NodeA,NodeA下面會(huì)打印帶時(shí)間戳的如下信息:
[time:8, srcActor:null, destActor:ping1]command:start,params:[1000]
[time:8, srcActor:null, destActor:ping2]command:start,params:[5000]
[time:9, srcActor:ping1, destActor:ping1]command:ping,params:[1000]
[time:9, srcActor:ping2, destActor:ping2]command:ping,params:[5000]
[time:22, taskId:2]addTask
[time:22, taskId:1]addTask
[time:143, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:143, srcActor:pong, destActor:ping2]command:receivePong,params:[msg]
[time:1026, taskId:2]execTask
[time:1026, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:1029, taskId:3]addTask
[time:1035, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:2033, taskId:3]execTask
[time:2034, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:2034, taskId:4]addTask
[time:2037, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:3036, taskId:4]execTask
[time:3036, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:3036, taskId:5]addTask
[time:3039, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:4041, taskId:5]execTask
[time:4042, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:4042, taskId:6]addTask
[time:4044, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:5022, taskId:1]execTask
[time:5022, srcActor:null, destActor:ping2]command:ping,params:[5000]
[time:5022, taskId:7]addTask
NodeB下面會(huì)打印如下信息:
[time:1938, srcActor:ping2, destActor:pong]command:pong,params:[msg]
[time:1940, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:2855, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:3856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:4856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:5860, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:6850, srcActor:ping2, destActor:pong]command:pong,params:[msg]
小結(jié)
本文總結(jié)了使用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單Actor模型的完整流程。由于時(shí)間所限,本文只實(shí)現(xiàn)了Actor模型的基礎(chǔ)功能。不過(guò)造輪子的目的主要是為了深入掌握Actor模型的核心概念,作為演示和研究的用途。對(duì)于并發(fā)模型來(lái)說(shuō),不管用哪種語(yǔ)言來(lái)實(shí)現(xiàn),原理才是主要的、相通的,語(yǔ)言只不過(guò)是實(shí)現(xiàn)的工具。相信筆者的這篇文章也會(huì)幫助讀者對(duì)Actor模型有更為深入的了解。
以上就是基于Java實(shí)現(xiàn)Actor模型的詳細(xì)內(nèi)容,更多關(guān)于Java Actor模型的資料請(qǐng)關(guān)注其它相關(guān)文章!
原文地址:https://blog.csdn.net/needmorecode/article/details/130457322