嵌入式技术论坛
直播中

张静

7年用户 1464经验值
私信 关注
[经验]

关于RTT中scheduler线程调度的学习

RTT中的scheduler并不是以一个类的形式存在,更类似传统的过程编程。个人认为这一点在编程风格上和其他的组件是不够统一的。

下面引用一段RTT官网上,关于RTT线程调度的介绍。

RTT中提供的线程调度器是基于优先级的全抢占式调度:在系统中除了中断处理函数、调度器上锁部分的代码和禁止中断的代码是不可抢占的之外,系统的其他部分都是可以抢占的,包括线程调度器自身。系统总共支持256个优先级(0 ~ 255,数值越小的优先级越高,0为最高优先级,255分配给空闲线程使用,一般用户不使用。在一些资源比较紧张的系统中,可以根据实际情况选择只支持8个或32个优先级的系统配置)。在系统中,当有比当前线程优先级更高的线程就绪时,当前线程将立刻被换出,高优先级线程抢占处理器运行。如下图所示,在RT-Thread调度器的实现中,包含了一个共256个优先级队列的数组(如果系统最大支持32个优先级,那么这里将是一个包含了32个优先级队列的数组),每个数组元素中放置相同优先级链表的表头。这些相同优先级的列表形成一个双向环形链表,最低优先级线程链表一般只包含一个idle线程。

在优先级队列1#和2#中,可以看到三个线程:线程A、线程B和线程C。由于线程A、B的优先级比线程C的高,所以此时线程C得不到运行,必须要等待优先级队列1#的中所有线程(因为阻塞)都让出处理器后才能得到执行。一个操作系统如果只是具备了高优先级任务能够“立即”获得处理器并得到执行的特点,那么它仍然不算是实时操作系统。因为这个查找最高优先级线程的过程决定了调度时间是否具有确定性,例如一个包含n个就绪任务的系统中,如果仅仅从头找到尾,那么这个时间将直接和n相关,而下一个就绪线程抉择时间的长短将会极大的影响系统的实时性。当所有就绪线程都链接在它们对应的优先级队列中时,抉择过程就将演变为在优先级数组中寻找具有最高优先级线程的非空链表。RT-Thread内核中采用了基于位图的优先级算法(时间复杂度O(1),即与就绪线程的多少无关),通过位图的定位快速的获得优先级最高的线程。RT-Thread内核中也允许创建相同优先级的线程。相同优先级的线程采用时间片轮转方式进行调度(也就是通常说的分时调度器),时间片轮转调度仅在当前系统中无更高优先级就绪线程存在的情况下才有效。例如在上图中,我们假设线程A和线程B一次最大允许运行的时间片分别是10个时钟节拍和7个时钟节拍。那么线程B将在线程A的时间片结束(10个时钟节拍)后才能运行,但如果中途线程A被挂起了,即线程A在运行的途中,因为试图去持有不可用的资源,而导致线程状态从就绪状态更改为阻塞状态,那么线程B会因为其优先级成为系统中就绪线程中最高的而马上运行。每个线程的时间片大小都可以在初始化或创建这个线程时指定。因为RT-Thread调度器的实现是采用优先级链表的方式,所以系统中的总线程数不受限制,只和系统所能提供的内存资源相关。为了保证系统的实时性,系统尽最大可能地保证高优先级的线程得以运行。线程调度的原则是一旦任务状态发生了改变,并且当前运行的线程优先级小于优先级队列组中线程最高优先级时,立刻进行线程切换(除非当前系统处于中断处理程序中或禁止线程切换的状态)。

以上应该来说把调度器相关的内容说的比较明确了,下面我们就来看看函数的具体实现吧。
```void rt_system_scheduler_init(void);
void rt_system_scheduler_start(void);

void rt_schedule(void);
void rt_schedule_insert_thread(struct rt_thread thread);
void rt_schedule_remove_thread(struct rt_thread thread);

void rt_enter_critical(void);
void rt_exit_critical(void);
rt_uint16_t rt_critical_level(void);

rt_list_t rt_thread_priority_table[RT_THREAD_PRIORITY_MAX];

if RT_THREAD_PRIORITY_MAX > 32
/ Maximum priority level, 256 /
rt_uint32_t rt_thread_ready_priority_group;
rt_uint8_t rt_thread_ready_table[32];

else
/ Maximum priority level, 32 /
rt_uint32_t rt_thread_ready_priority_group;

endif```
变量作用分析:

rt_thread_priority_table[RT_THREAD_PRIORITY_MAX]

用来存储每个优先级的列表头。

所有函数中关键函数是
void rt_schedule(void);

优先级调度算法就在里面实现,先来看看函数。
```void rt_schedule(void)
{
rt_base_t level;
struct rt_thread to_thread;
struct rt_thread from_thread;

/* disable interrupt */
level = rt_hw_interrupt_disable();
/* check the scheduler is enabled or not */
if (rt_scheduler_lock_nest == 0)
{
    register rt_ubase_t highest_ready_priority;
if RT_THREAD_PRIORITY_MAX <= 32
    highest_ready_priority = __rt_ffs(rt_thread_ready_priority_group) - 1;
else
    register rt_ubase_t number;
    number = __rt_ffs(rt_thread_ready_priority_group) - 1;
    highest_ready_priority = (number << 3) + __rt_ffs(rt_thread_ready_table[number]) - 1;
endif
    /* get switch to thread */
    to_thread = rt_list_entry(rt_thread_priority_table[highest_ready_priority].next,
                              struct rt_thread,
                              tlist);
    /* if the destination thread is not the same as current thread */
    if (to_thread != rt_current_thread)
    {
        rt_current_priority = (rt_uint8_t)highest_ready_priority;
        from_thread         = rt_current_thread;
        rt_current_thread   = to_thread;
        RT_OBJECT_HOOK_CALL(rt_scheduler_hook, (from_thread, to_thread));
        /* switch to new thread */
        RT_DEBUG_LOG(RT_DEBUG_SCHEDULER,
                     ("[%d]switch to priority#%d "
                      "thread:%.*s(sp:0x%p), "
                      "from thread:%.*s(sp: 0x%p)
",
                      rt_interrupt_nest, highest_ready_priority,
                      RT_NAME_MAX, to_thread->name, to_thread->sp,
                      RT_NAME_MAX, from_thread->name, from_thread->sp));
ifdef RT_USING_OVERFLOW_CHECK
        _rt_scheduler_stack_check(to_thread);
endif
        if (rt_interrupt_nest == 0)
        {
            extern void rt_thread_handle_sig(rt_bool_t clean_state);
            rt_hw_context_switch((rt_uint32_t)&from_thread->sp,
                                 (rt_uint32_t)&to_thread->sp);
            /* enable interrupt */
            rt_hw_interrupt_enable(level);
ifdef RT_USING_SIGNALS
            /* check signal status */
            rt_thread_handle_sig(RT_TRUE);
endif
        }
        else
        {
            RT_DEBUG_LOG(RT_DEBUG_SCHEDULER, ("switch in interrupt
"));
            rt_hw_context_switch_interrupt((rt_uint32_t)&from_thread->sp,
                                           (rt_uint32_t)&to_thread->sp);
            /* enable interrupt */
            rt_hw_interrupt_enable(level);
        }
    }
    else
    {
        /* enable interrupt */
        rt_hw_interrupt_enable(level);
    }
}
else
{
    /* enable interrupt */
    rt_hw_interrupt_enable(level);
}
}```

这里调度算法有一个变种,当最大优先级大于32时,使用了一个变种算法。这里先来分析最大优先级不大于32的情况。

首先来说,RTT使用的线程调度算法叫做位图调度算法,该算法的好处是,在任意多的优先级以及任意多线程的情况下,都可以在O(1)的时间复杂度情况下完成线程调度(既都为一个固定的时间),这也就为系统的实时性提供了保障。

在这我们先从,最大优先级小于32的情况,来分析整个算法。此时,调度算法实现相关的数据结构有以下几个
rt_list_t rt_thread_priority_table[RT_THREAD_PRIORITY_MAX]; rt_uint32_t rt_thread_ready_priority_group;

首先是rt_thread_priority_table[RT_THREAD_PRIORITY_MAX],该数组的长度为RT_THREAD_PRIORITY_MAX,它是一个列表数组,功能如开始的图所示,其为每一个处在ready状态,且不同优先级的线程,都创建了一个列表。这个数组内就是各个列表的head地址。

其次来看看rt_thread_ready_priority_group这个变量,该变量为一个32bit的数据,每一个位对应的该优先级是否有ready状态的线程等待调度。举个例子说明:

若rt_thread_ready_priority_group = 0x76543210;则说明:

优先级为4,9,12,13,18,20,22,25,26,28,29,30的线程都已准备就绪。

此时rt_thread_priority_table[4],rt_thread_priority_table[9]….等等列表中就链接着已经准备就绪等待调度的线程。

这里说起来也就很简单了,在rt_thread_ready_priority_group中找出最低的一个非0位,然后在对应的就绪优先级列表中取出线程并调度就好。这里就引出了接下来一个问题如何在rt_thread_ready_priority_group中找出最低的非0位,最简单的,我们可能想出了这么做:

int get_lowest_ready_priority(rt_uint32_t rt_thread_ready_priority_group) { uint8_t i; uint32_t m=1; if(rt_thread_ready_priority_group==0) return -1; for(i=0;i<32;i++) { if(m&rt_thread_ready_priority_group) return i; m<<=1; } }

这个算法虽然简单,但有个问题,可以看到中间个for循环,所以这个算法执行的时间和优先级数量N成正比,时间复杂度为O(N)。那么位图算法是如何解决这个问题的呢?通过空间换时间,引入了一个数组。
const rt_uint8_t __lowest_bit_bitmap[] = { /* 00 */ 0, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 10 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 20 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 30 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 40 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 50 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 60 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 70 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 80 */ 7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 90 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* A0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* B0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* C0 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* D0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* E0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* F0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0 };

这个数组在rt_thread_ready_priority_group为8bit的时候,也就是最大优先级为8的时候可以立刻得出就绪状态的最小优先级的线程的优先级。

__lowest_bit_bitmap[rt_thread_ready_priority_group]这就是就绪状态优先级最小的线程的优先级。

那么当最大优先级大于8了又如何呢?继续扩大这个矩阵是不可取的,因为,这个矩阵的大小为2^N,当最大优先级提升以后,矩阵将迅速扩大,所占空间变得不可接受。所以使用了另一个函数:
```int __rt_ffs(int value)
{
if (value == 0) return 0;

if (value & 0xff)
    return __lowest_bit_bitmap[value & 0xff] + 1;
if (value & 0xff00)
    return __lowest_bit_bitmap[(value & 0xff00) >> 8] + 9;
if (value & 0xff0000)
    return __lowest_bit_bitmap[(value & 0xff0000) >> 16] + 17;
return __lowest_bit_bitmap[(value & 0xff000000) >> 24] + 25;
}```

将32个优先级拆成了4组,先判断每一组是否有就绪的任务,然后再去查表,这样通过2次查表,就完成搜索最低优先级就绪任务列表的功能。将原来O(n)的时间复杂度大幅降低到一个可接受的范围。

但请注意上面的函数严格来说并不能算是O(1)的复杂度,因为当最大优先级为N的时候,上面函数是无法直接写出的,实际上无论分成多少组,仍然是与N成正比关系。比如分4组就是O(N/4)分成3组就是O(N/3)。

分析完最大32个优先级的情况,现在就来看看256个优先级的情况。根据刚才的分析,256个优先级的情况如果按照之前每8位判断一次的方法,那么最坏的情况将需要判断32次,这个时间将不可忽略,所以不能无脑的把int rt_ffs(int value)这个函数扩展。当然因为空间原因,也不能随意扩展const rt_uint8_t lowest_bit_bitmap[]这个数组。

所以针对这种情况,RTT又引入了一个数据:
rt_uint8_t rt_thread_ready_table[32];

同时采用如下算法:
```register rt_ubase_t number;

number = rt_ffs(rt_thread_ready_priority_group) - 1;
highest_ready_priority = (number << 3) + rt_ffs(rt_thread_ready_table[number]) - 1;```

1.先把256个优先级分为32组,每组就是8个,就是一个8bit数据。

2.使用rt_thread_ready_priority_group这个变量的每个位记录这32组中哪一组中有就绪状态的线程队列。

3.使用__rt_ffs()查找出优先级最低的组。

4.再次使用__rt_ffs()查找出该组中优先级最低的就绪队列。

所以除了使用了一次查表,之后随着最大优先级的提升,就是不断的做除法,以减少查找所消耗的时间,把查找所消耗的时间减少到一定的范围之内。

整个RTT线程的核心调度算法就分析完毕了。据说ucos也使用的是类似的算法,没有具体比较过也就不好说到底谁优谁劣,但是该算法既然在256个优先级的情况下,做到了O(1),在不继续增大最大优先级的情况下,其实在改进上其实基本算是到顶了。继续的优化看起来也只能是扩大__lowest_bit_bitmap数组,使用更多的空间来换时间(不划算)。


原作者:yushigengyu

更多回帖

发帖
×
20
完善资料,
赚取积分