文章转载自:liangkz
在继续往下分析之前,我们先来简单了解一下分布式任务调度子系统中的线程/进程概念和它们之间的通信模型。
在鸿蒙系统中,线程(Thread)和任务(Task)是同一个概念,可以等同混用。
线程是竞争系统资源的最小运行单元,它可以等待或使用CPU、内存等硬件资源,并独立于其它线程运行。线程一共有32个优先级(0-31),最高优先级为0,最低优先级为31。在创建线程的时候,通过线程的属性参数指定线程的优先级和其他一些必要的属性,如线程名称、运行的主体函数和线程栈的大小等等。内核对线程的调度采用抢占式调度机制,同时支持时间片轮转调度和FIFO调度方式。
轻量系统(Hi3861平台上),只有线程概念,没有进程概念。所以你在Hi3861工程代码上是找不到进程Process相关的东西的,各种服务和程序,都是以线程的形式在跑,比如系统启动时,每个service是一个线程,开发者自己编写的程序,在SYS_RUN的时候,也是一个或多个线程在跑。
小型系统和标准系统,则有进程和线程概念,每个进程内的线程独立运行、独立调度,当前进程内线程的调度不受其它进程内线程的影响。在当前进程内高优先级的线程可抢占当前进程内低优先级线程,当前进程内低优先级线程必须在当前进程内高优先级线程阻塞或结束后才能得到调度。
轻量系统的线程按cmsis接口标准来实现,如创建线程用:osThreadNew()接口。
小型系统的线程按posix接口标准来实现,如创建线程用:pthread_create()接口。
在分布式任务调度子系统中,使用封装了的线程相关接口来屏蔽底层实现标准的差异,如THREAD_Create()分别使用了cmsis接口和posix接口来实现线程的创建:
更详细的适配代码见:
//foundation/distributedschedule/samgr_lite/samgr/adapter/
轻量系统的分布式任务调度子系统,基本上只在 //foundation/distributedschedule/samgr_lite/ 目录下的 samgr 和communication两个子目录代码内就实现了,只涉及线程间通信。
小型系统和标准系统的分布式任务调度子系统,包含了整个//foundation/distributedschedule/目录,除了线程和线程间通信,还涉及到进程间通信。
在只有线程概念的轻量系统中,或者在小型系统的同一个进程内部,线程之间其实是位于相同的地址空间内的,互相之间通信相对简单,也有很多成熟的标准方法,本文不打算都介绍,请网络搜索进行学习,这里仅结合代码,简单说一下samgr模块用到的消息队列机制和相关流程。
前文《Hi3861的SAMGR--系统服务框架子系统-2》提到,轻量系统中,SamgrLiteImpl g_samgrImpl 管理着一张本系统内所有service+feature的关键信息(如Sid,Fid,Qid,Tid等)的树状图。要向某个service发消息,只要向g_samgrImpl查询service的名字,就可以获得对应的Identity信息(包含了Qid),并对其发送消息,而每一个service有一个线程监控着自己的Qid消息队列,随时从中读取消息进行处理和回复应答。
主要的代码都在task_manager.c和message.c两个文件里了。
task_manager.c定义了一组函数:
- SAMGR_CreateFixedTaskPool():为每一个服务创建Queue和TaskPool
- SAMGR_StartTaskPool():创建并运行监控线程TaskEntry()
- TaskEntry():监控线程本体,循环监控消息队列,并调用相关的API对消息进行处理
- ProcResponse():处理MSG_ACK类型的应答消息
- ProcDirectRequest():处理MSG_DIRECT类型的直接请求消息,不需要应答
- ProcRequest(); 处理MSG_NON/MSG_CON类型的请求消息,要或者不要应答
- 其他函数略。
message.c 定义了一组对消息的管理函数,包括对不同类型消息的打包、发送、收取、解包、转发等等。
我们看一下主要的相关结构体。
- //这是在线程消息队列间来回交换的信息结构体:
- struct Exchange {
- //消息接收者的身份信息结构体,包括了Sid/Fid/Qid,见下面的解释
- Identity id; /**< The target service or feature identity. */
- Request request; //请求消息的主体,见下面的解释
- Response response; //应答消息的主体,见下面的解释
- short type; /**< The exchange type. */ //消息的类型,见下面的ExchangeType 的解释
- //收到应答消息后的回调处理函数,为空则表示不需要或不处理应答消息
- Handler handler; /**< async response or immediately request callback function */
- //当发送的Request中的*data或者Response中的*data 不为空时,也就是*data可以在发送者和
- //接收者之间共享,这个指针*sharedRef指向的值(也是在发送者和接收者之间共享的),
- //就记录了*data被引用的次数,当引用次数降到0时,就会释放掉*data指向的内存块。
- uint32 *sharedRef; /**< use to share the request and response for saving memory */
- };
- struct Identity {
- int16 serviceId; /** Service ID */ //消息接收者的Sid
- //消息接收者下面可能会没有或者有多个feature,这个id就是要处理本消息的feature的Fid
- //如果这个id不存在,则默认会调用service的MessageHandle 来处理消息
- int16 featureId; /** Feature ID */
- //在发送消息前这里写的是消息接收者的Qid,如果要求接收者处理消息后,返回应答消息,
- //那就在发送消息时,将这个值修改成消息发送者的Qid,这样接收者就知道该向谁回复应答消息了。
- MQueueId queueId; /** Message queue ID */
- };
- //发送消息的具体类型定义
- enum ExchangeType {
- //终止消息处理任务:线程往自己的消息队列发送一个这种消息来自杀
- MSG_EXIT = -1,
- //不需要回应的请求消息,service收到该消息调用MessageHandle/OnMessage进行处理即可
- MSG_NON = 0,
- //需要回应的请求消息,service收到该消息调用MessageHandle/OnMessage进行处理,需要回复应答消息
- MSG_CON = 1,
- //应答消息
- MSG_ACK = 2,
- //同步消息, 未见到使用之处
- MSG_SYNC = 3,
- //不需要回应的直接请求消息,需要消息接收者调用消息中指定的 handler 来处理消息,
- //也就是借用消息接收者线程去执行发送者指定的函数
- MSG_DIRECT = 4,
- };
- //发送的请求消息本体
- struct Request {
- /** Message ID */
- int16 msgId; //具体消息的ID,消息接收者据此区分并处理消息
- /** Data length */
- int16 len; //本消息传递的数据的长度
- /** Data content */
- //指向本消息传递的具体数据所在内存位置的指针,意味着发送方和接收方看到的是同一块内存中的数据,
- //线程虽然不同,但它们所在的虚拟地址空间是相同的,所以这个*data指向的内存空间跨线程是可访问的,
- //但两个线程如果位于不同的地址空间(在不同的进程空间内),就不能这样共享同一个内存块了。
- void *data;
- /** Message value, which is defined by developers */
- //由开发人员自定义的Message value字段,实际的消息交换中,消息都比较简单,只要msgId和msgValue
- //就可以表达完整的意思了,所以很多时候根本用不到上面的len和*data两个字段
- uint32 msgValue;
- };
- //消息接收者返回给消息发送者的应答消息
- struct Response {
- /** Data content */
- //指向本应答消息传递的具体数据所在内存位置的指针,与上面的Request中的*data一样,可共享内存块
- void *data;
- /** Data length */
- int16 len; //本应答消息传递的数据的长度
- };
复制代码
消息的接收和分发处理,见监控线程本体:
首先是从消息队列中提取Exchange消息,没有消息则继续循环监控,有消息且类型是MSG_EXIT意味着本消息队列和监控线程要注销和退出了。
正常的消息将会有下面三个ProcXxx()中的一个来对消息进行处理,三个消息处理函数是互斥的,某个消息只能由三个中的一个来处理。
- ProcResponse()只处理应答消息,实际上是直接由exchange内handler指向的函数来处理该应答内容。
- ProcDirectRequest()只处理不需要回应的直接请求消息,实际上是直接由exchange内handler指向的函数来处理该消息。
- ProcRequest()则处理其他类型的消息,由service/feature自己的MessageHandle来响应,如果需要返回应答消息,那就直接在发送一个Response消息就可以了。
基本上,线程间以消息队列方式进行通信的东西就这么多了,更具体的处理流程,以及消息的重新打包发送,请自行阅读相关代码进行理解即可。
进程是系统资源管理的独立单元,它可以等待或使用CPU、内存等硬件资源,并独立于其它进程运行。进程一共有32个优先级(0-31),用户态进程可配置的优先级有22个(10-31),最高优先级为10,最低优先级为31。进程调度采用抢占式调度机制,支持时间片轮转调度方式。高优先级的进程可抢占低优先级进程,低优先级进程必须在高优先级进程阻塞或结束后才能得到调度。
鸿蒙系统的进程,可分为两大类:应用程序进程和系统进程,而系统进程又可再分为内核态系统进程和用户态系统进程。
应用程序进程属于最上层的应用开发概念,本文不涉及,但每一个应用程序进程作为一个独立的进程,也同样受到系统进程管理模块的调度和管理。
内核态系统进程在操作系统的内核启动阶段创建并一直在内核层工作,为上层的所有进程/线程提供管理、调度、通信等基础的运行保障,本文也不涉及,详情可以去看前文《鸿蒙系统的启动流程v2.0》或者其他人的鸿蒙系统内核的分析文章。
用户态系统进程则是承下启上,在系统内核进程提供的基础保障之上,再为上层应用进程提供必要的各种的基础服务,帮助应用开发者更方便快捷有效地实现上层的业务逻辑,最终呈现给用户的是数以万计的各种APP和丰富多彩的用户体验。
本文主要分析用户态的系统进程。
所有的用户态系统进程,都是由用户态根进程init创建的,它读取并分析 /etc/init.cfg 配置文件(它是//vendor/hisilicon/hispark_taurus/init_configs/init_liteos_a_3516dv300.cfg 的副本),根据上面的记录,按顺序启动几个关键的用户态系统服务进程。每一个系统服务进程均拥有自己独立的进程空间,相互之间不可直接访问,实现进程间的隔离。每一个系统服务进程内部,又由若干个互相独立的线程组成,各自对内或对外提供具体的服务和功能。既然要对外部进程提供服务,那就不可避免要跨进程进行通信和API的调用。
小型系统和标准系统中,进程内部的多线程,按上面的消息队列方式进行通信。进程间的通信方式,也有很多标准方法,但应该是基于通信效率的考虑(鸿蒙系统的进程间通信效率是非常之高的),分布式任务调度子系统采用的是共享内存机制作为进程间通信方式。
我们知道,不同的进程,都在各自独立的虚拟内存地址空间内工作,进程不能直接访问物理内存,需要经过内核的内存管理模块进行映射,才能对应到具体的物理内存地址上去。如果进程A的一块虚拟内存地址Aa,和进程B的一块虚拟内存地址Bb,经过内核内存管理模块的针对性地处理,同时映射到了同一块物理内存Mm上,这样进程A和进程B就可以同时访问这块物理内存Mm了,只要它们按预先约定的相同的规则去读写各自的虚拟内存Aa或Bb(也就是Mm这块共享的物理内存),那就是说两个进程可以通信了。如果多个进程都共享这块内存,那这多个进程之间也是可以互相通信的了。
这几句话说起来很简单,但实现起来还是很复杂的,涉及到非常多的东西,需要深入内核去理解,这里也不展开分析了,想了解的话,请搜索内核分析的相关文章进行阅读理解。
进程间通信有个非常基本的问题,就是如何获悉对方进程的通信地址?比如上面进程A写入共享内存中的msg,怎么知道是写给进程B的还是写给进程C的。
如前面所说,线程是操作系统进行资源调度的最小单元,进程间的通信,实际上的往来消息是由线程来接收和发送的。进程A的若干个线程中的a线程与进程B的若干个线程中的b线程建立了通信,我们就可以说进程A和进程B建立了通信,所以进程的通信地址,实际上是进程内某个线程a/b的通信地址,也就是该线程的ID。
为了顺利获得进程的通信地址和高效地完成进程间通信,这里引入了两个概念:
Router和
Endpoint。
Router 是进程内对外(别的进程)提供服务或功能的一个单位,一个Router 一般是一个service(如果它没有feature的话)、或者service的一个feature(如果service对外提供多个feature,那每个feature就是一个Router)。进程内不对外提供服务的service/feature,不会加入到Router列表中去的。
Endpoint(EP)就是通信终端,每个独立的进程,都被定义为一个EP,每个EP都创建一个专门的boss线程来跟别的EP进行IPC通信。
但EP与EP之间,仍然存在基本的通信地址的问题,这时候,名字为“samgr”的EP就起到了关键的作用了。
“samgr”EP是管理者(SamgrServer g_server)的EP,是全系统唯一的,它的通信地址也是人为设定的、其它EP都肯定知道的一个地址,所以称它为知名EP。其他所有的EP名字都是“ipc client”,它们都会主动向管理者的知名EP注册自己,不仅注册自己的通信地址(boss线程的Tid),也注册了本EP对外提供的服务列表(Router),管理者g_server把这些信息记录在相应的结构体中。进程A想要调用进程B的某项服务,它需要先发送消息到管理者的EP,向管理者查询登记在册的进程B提供的对应的服务接口,从而获得进程B的通信地址(handle)和对应服务(Router)的id(token),这样进程A就可以向进程B发送IPC消息,调用它的服务了。
下面我们通过一张图(附件是原图)来简单看一下几个重要的结构体以及它们之间的关系,后面再详细梳理这些关系的建立过程和使用过程。
- struct Endpoint {
- const char *name; //EP名字,"samgr" or "ipc client"
- IpcContext *context; //IPC 通信通道环境 "/dev/lite_ipc"
- Vector routers; //routers.data[idx] = Router *router -> {Sname, Fname}+IUnknown
- ThreadId boss; //本EP接收其它EP的IPC信息的监控线程
- uint32 deadId;
- int running;
- //handle: kernel mapped boss_Tid to this handle
- //token : idx to routers.data[idx]
- //cookie: --
- SvcIdentity identity; //本EP的对外身份信息,包含的handle非常重要
-
- // "samgr": RegisterSamgrEndpoint()
- //"ipc client": RegisterRemoteEndpoint()
- RegisterEndpoint registerEP; //本EP向管理者注册EP的方法
-
- TokenBucket bucket;
- };
复制代码
Vector routers; 字段是一个Vector,它的 **data 是一个指针数组,每一个data[x] 都是一个指向Router结构体对象的指针,某个Router结构体记录了具体的某个对外提供的服务的一些关键信息。
- typedef struct Router
- {
- SaName saName; //{Service name, Feature name}
- Identity identity; //{Sid, Fid, Qid}
- IServerProxy *proxy; //Service 或Feature 的IUnknown *iUnknown
- PolicyTrans *policy;
- uint32 policyNum;
- } Router;
复制代码
SvcIdentity identity 是本EP的对外身份信息,包含的handle非常重要
- typedef struct {
- uint32_t handle; //本EP的boss线程ID经过内核的映射得到的一个线程序号
- uint32_t token; //别的EP要访问本EP中的routers列表中具体router的编号routers.data[token]
- uint32_t cookie; //未见使用
- #ifdef __LINUX__
- IpcContext* ipcContext;
- #endif
- } SvcIdentity;
复制代码
RegisterEndpoint registerEP; //本EP向管理者注册EP的方法。
对于知名EP使用RegisterSamgrEndpoint() 来注册,而其他的"ipc client" 则使用RegisterRemoteEndpoint()来注册。