咨询热线:18551979889 联系人:刘俊格 地址:大渡口区新街道翠园路115-68号
Akka源码分析-ActorSystem
来源:必威体育是做什么的 发布时间:2019-11-28 点击量:448
由于本人对Akka比较感兴趣,也用Akka开发了一些系统,但对Akka的源码还没有具体分析过,希望研究源码的同时写一点博客跟大家分享。有不当之处还请指正。我准备采取Debug的方式来研究Akka的运行过程,从入口开始,直至分析Akka是如何运转的。这样虽然会有点乱,但比较直接,大家凑合着看吧。
使用Akka首先要创建的一个对象就是ActorSystem,那么我们就先分析这个类及其相关的技术细节。
val system = ActorSystem("WhilePattern1",ConfigFactory.load())
第一步就是创建ActorSystem,很明显,这是调用了ActorSystem伴生对象的apply方法。ActorSystem的伴生对象并不复杂,有很多的apply和create方法来创建ActorSystem的实例。apply/create分别供scala和java开发使用。其他字段都是一些环境变量,例如version、envHome、systemHome。还有一个内部类Settings,主要是用来给ActorSystem提供参数配置。
下面我们来看ActorSystem类,这是一个抽象类,它继承了ActorRefFactory特质,下面是源码中对该特质的描述。很明显,这个特质是用来创建Actor实例的。我们常用的actorFor和actorSelection是该特质提供的比较重要的方法,当然还有与创建actor有关的其他函数和字段。ActorSystem是一个抽象类,除了继承ActorRefFactory特质的函数和字段之外,定义了一些其他字段和方法,但也都没有具体的实现。
/** * Interface implemented by ActorSystem and ActorContext, the only two places * from which you can get fresh actors. */
通过跟踪AcotSystem的apply我们发现最终调用了以下代码,主要涉及了两个对象:ActorSystemSetup、ActorSystemImpl。其中源码中对ActorSystemSetup的描述是“A set of setup settings for programmatic configuration of the actor system.”很明显主要是提供一些可编程的配置,我们不再深入这个类。ActorSystemImpl则是我们需要关心的类,因为ActorSystem.apply最终创建了这个类的实例。而ActorSystemImpl由继承了ExtendedActorSystem,ExtendedActorSystem抽象类提供了有限的几个函数,暴露了ActorRefFactory中本来是protected的函数,也并没有具体的实现,我们也暂时忽略。
/** * Scala API: Creates a new actor system with the specified name and settings * The core actor system settings are defined in [[BootstrapSetup]] */ def apply(name: String, setup: ActorSystemSetup): ActorSystem = { val bootstrapSettings = setup.get[BootstrapSetup] val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader()) val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl)) val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext) new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() }
由于ActorSystemImpl代码比较多,如果从头到尾读一遍代码效率比较低。而且从上面代码可以看出,apply在创建ActorSystemImpl实例之后,调用了start函数,那么我们就从start切入,看看做了哪些操作。
private lazy val _start: this.type = try { registerOnTermination(stopScheduler()) // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise _initialized = true if (settings.LogDeadLetters > 0) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) eventStream.startUnsubscriber() loadExtensions() if (LogConfigOnStart) logConfiguration() this } catch { case NonFatal(e) ⇒ try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) } throw e }
其实start的代码还是比较清晰的,首先用registerOnTermination注册了stopScheduler(),也就是给ActorSystem的退出注册了一个回调函数stopScheduler(),这一点也不再具体分析。而provider.init(this)这段代码比较重要,从provider的类型来看,它是一个ActorRefProvider,前面我们已经分析过,这是一个用来创建actor的工厂类。provider初始化完成意味着就可以创建actor了,源码注释中也明确的说明了这一点。
val provider: ActorRefProvider = try { val arguments = Vector( classOf[String] → name, classOf[Settings] → settings, classOf[EventStream] → eventStream, classOf[DynamicAccess] → dynamicAccess) dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get } catch { case NonFatal(e) ⇒ Try(stopScheduler()) throw e }
上面是provider的创建过程,最重要的一段代码是dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get,它使用DynamicAccess创建了ActorRefProvider对象的实例。跟踪dynamicAccess创建我们发现这是一个ReflectiveDynamicAccess实例,其实这个类也比较简单,就是从ClassLoader中根据ProviderClass字段加载对应的类并创建对应的实例。ProviderClass定义如下,这是配置文件中经常看到的配置。目前的provider一共有三种:LocalActorRefProvider、akka.remote.RemoteActorRefProvider、akka.cluster.ClusterActorRefProvider,当然我们也可以自定义。
final val ProviderClass: String = setup.get[BootstrapSetup] .flatMap(_.actorRefProvider).map(_.identifier) .getOrElse(getString("akka.actor.provider")) match { case "local" ⇒ classOf[LocalActorRefProvider].getName // these two cannot be referenced by class as they may not be on the classpath case "remote" ⇒ "akka.remote.RemoteActorRefProvider" case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider" case fqcn ⇒ fqcn }
自此provider创建结束,简单来说就是根据配置,通过Class.forName加载了对应的ActorRefProvider实现类,并把当前的参数传给它,调用对应的构造函数,完成实例的创建。provider创建完成后调用init完成初始化,就可以创建actor了。
start函数还创建了一个DeadLetterListener类型的actor,这也是我们经常会遇到的。如果给一个不存在的目标actor发消息,或者发送消息超时,都会把消息转发给这个DeadLetter。这就是一个普通的actor,主要用来接收没有发送成功的消息,并把消息打印出来。后面还调用了eventStream.startUnsubscriber(),由于eventStream也不是我们关注的重点,先忽略。loadExtensions()功能也比较单一,就是根据配置加载ActorSystem的扩展类,并进行注册,关于Extensions也不再深入分析。
private def loadExtensions() { /** * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = { immutableSeq(settings.config.getStringList(key)) foreach { fqcn ⇒ dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match { case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) case Success(p: ExtensionId[_]) ⇒ registerExtension(p) case Success(other) ⇒ if (!throwOnLoadFail) log.error("[{}] is not an "ExtensionIdProvider" or "ExtensionId", skipping...", fqcn) else throw new RuntimeException(s"[$fqcn] is not an "ExtensionIdProvider" or "ExtensionId"") case Failure(problem) ⇒ if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem) } } } // eager initialization of CoordinatedShutdown CoordinatedShutdown(this) loadExtensions("akka.library-extensions", throwOnLoadFail = true) loadExtensions("akka.extensions", throwOnLoadFail = false) }
至此,我们就对ActorSystem的创建和启动分析完毕,但还有一些细节需要说明,在start之前还是有一些其他字段的初始化。由于这些字段同样重要,且初始化的顺序没有太大关联,我就按照代码结构从上至下依次分析几个重要的字段。
final val threadFactory: MonitorableThreadFactory = MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)
threadFactory这是一个线程工厂类,默认是MonitorableThreadFactory,我们只需要记住这是一个线程工厂类,默认创建ForkJoinWorkerThread的线程就好了。
val scheduler: Scheduler = createScheduler()
scheduler是一个调度器,主要用来定时发送一些消息,这个我们也会经常遇到,但不是此次分析的重点,略过就好。
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)
mailboxes是一个非常重要的字段,它是Mailboxes一个实例,用来创建对应的Mailbox,Mailbox用来接收消息,并通过dispatcher分发给对应的actor。
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext)) val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
dispatchers是Dispatchers的一个实例,它用来创建、查询对应的MessageDispatcher。它有一个默认的全局dispatcher,从代码来看,它从配置中读取akka.actor.default-dispatcher,并创建MessageDispatcher实例。MessageDispatcher也是一个非常重要的类,我们后面再具体分析。
/** * The one and only default dispatcher. */ def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
object Dispatchers { /** * The id of the default dispatcher, also the full key of the * configuration of the default dispatcher. */ final val DefaultDispatcherId = "akka.actor.default-dispatcher"}
到这里我们就算分析完了ActorSystem的创建过程及其技术细节,当然ActorSystem创建只是第一步,后面需要创建actor,actor如何收到dispatcher的消息,还是需要进一步研究的。
相关产品
-
“要把职业体育和全运会的体系分开,竞赛体系也要做相应改革。全运怎么玩?联赛怎么玩?要分别考虑。我们也得客观认识排球项目的不足,不能完全拿排球去跟足球和篮球比。但是,在排球项目自身发展的基础上更进一步,应该是做得到的。”
-
这对于谷歌中国尤其难。在过去的几年里,谷歌中国的高管们,只能(或者只愿意)谈论与“营收”有关的话题,而很少触及与“技术”有关的讨论。这让谷歌中国和谷歌之间,越发产生区隔:那些新产品、新技术,似乎与谷歌中国毫无关系。
-
1、鲜花生泥土多,先加入一勺盐和一大勺面粉,搅拌均匀,泡十分钟,然后使劲搓洗清洗干净,这样比直接清洗要容易清洗干净。
-
鹿晗告诉记者,相比《重返二十岁》中的“项前进”,此次饰演的林冲更像自己一些,“第一眼看到这个角色就爱上了他,尤其是林冲身上的痞气很吸引我,而且我小时候也是轮滑队的。”
-
2009年10月29日晚22点左右, YU女士及其母亲和女儿被被告ZHANG先生拳打脚踢,三人头部和身体多处遭到击伤。YU女士与女儿夺路逃出家中,老人遭到殴打致死。
-
在“春季恋歌三部曲”变奏曲《天使的城》里,李晨和马苏饰演了一对欢喜冤家。两人曾在《北京青年》里饰演的也是一对情侣,“迷茫青年”何东(李晨饰)在领结婚证时反悔了,虐得“乖乖女博士”权筝(马苏饰)抓心挠肝。而此次在《天使的城》中,马苏可是逮着了报仇的机会,单是片花中,李晨便被马苏狂扇了三记耳光,甚至还扯住李晨的衣领疯狂厮打,看得网友直喊心疼。网友“天天爱晨么么哒”表示:“放开李晨的领子,不许打他的脸!”
-
TPP生效需要签署国中最大经济体美国的批准,但特朗普的就任使生效前景难以预料,日本在野党集中炮轰执政党与政府急于使其获批。
-
据《日经亚洲评论》报道,正当外界期待特朗普亚洲政策更加明确之际,美国副总统彭斯将于下月访问东南亚最大经济体印度尼西亚。但印尼首席安全部长卫兰托在会见美国驻印尼大使后表示,此访的具体日期尚未确定。
热点资讯
- 郑州婚纱照摄影工作室哪家好【朵拉】排行前十名品质担当强势来袭!2019-11-12
- 囧科技:Siri生死恋2019-06-26
- 高通与阿里将合作推出国内OS芯片产品——IT新闻uuuuuuu2019-06-26
- 权限系统缓存设计知多少2019-06-26
- 江苏宿迁市:醉美梨园湾花开振兴图2019-11-18
- 8月份暴雨台风仍将多发登陆我国的台风强度较强2019-06-26
- 记者在土耳其遇害,苹果AppleWatch记下被害全过程2019-11-24
- 车险业务陷两难,存依赖利宝保险年内三获股东施援2019-11-22