每一个支持多进程(线程)的系统都会有一个滴答时钟(系统时钟),这个时钟就好比系统的“心脏”,线程的休眠(延时)和时间片轮转调度都需要用到它。
Cortex-M系列的内核都有一个systick时钟,这个时钟就是设计用来支持操作系统的,是一个24位的自动重装载向下计数器,中断入口就位于中断向量表里面,定义在zephyr-zephyr-v1.13.0\arch\arm\core\cortex_m\vector_table.S:
1 SECTION_SUBSEC_FUNC(exc_vector_table,_vector_table_section,_vector_table)2 3 /*4 * setting the _very_ early boot on the main stack allows to use memset5 * on the interrupt stack when CONFIG_INIT_STACKS is enabled before6 * switching to the interrupt stack for the rest of the early boot7 */8 .word _main_stack + CONFIG_MAIN_STACK_SIZE9 10 .word __reset11 .word __nmi1213 .word __hard_fault14 .word __mpu_fault15 .word __bus_fault16 .word __usage_fault17 .word __reserved18 .word __reserved19 .word __reserved20 .word __reserved21 .word __svc22 .word __debug_monitor2324 .word __reserved25 .word __pendsv26#if defined(CONFIG_CORTEX_M_SYSTICK)27 .word _timer_int_handler28#else29 .word __reserved30#endif
第27行,_timer_int_handler()就是systick时钟的中断入口函数。
那么问题来了,前面的随笔里并没有分析到systick时钟是何时被初始化的,事实上systick也是通过设备宏定义的方式进行初始化的,定义在zephyr-zephyr-v1.13.0\drivers\timer\sys_clock_init.c:
SYS_DEVICE_DEFINE("sys_clock", _sys_clock_driver_init, sys_clock_device_ctrl, PRE_KERNEL_2, CONFIG_SYSTEM_CLOCK_INIT_PRIORITY);
可知,系统时钟属于PRE_KERNEL_2类设备,同一类设备也是有分优先级的,优先级高的先初始化,初始化函数为_sys_clock_driver_init(),定义在zephyr-zephyr-v1.13.0\drivers\timer\cortex_m_systick.c:
1 int _sys_clock_driver_init(struct device *device)2 {3 /* enable counter, interrupt and set clock src to system clock */4 u32_t ctrl = SysTick_CTRL_ENABLE_Msk | SysTick_CTRL_TICKINT_Msk |5 SysTick_CTRL_CLKSOURCE_Msk;6 7 ARG_UNUSED(device);8 9 /*10 * Determine the reload value to achieve the configured tick rate.11 */1213 /* systick supports 24-bit H/W counter */14 __ASSERT(sys_clock_hw_cycles_per_tick <= (1 << 24),15 "sys_clock_hw_cycles_per_tick too large");16 sysTickReloadSet(sys_clock_hw_cycles_per_tick - 1);1718 NVIC_SetPriority(SysTick_IRQn, _IRQ_PRIO_OFFSET);1920 SysTick->CTRL = ctrl;2122 SysTick->VAL = 0; /* triggers immediate reload of count */2324 return 0;25}
系统时钟不一定要使用systick,像Nordic的SOC用的是硬件RTC作为系统时钟的,只是不过systick是一个通用的时钟。
第16行,参数sys_clock_hw_cycles_per_tick的含义是多少个systick时钟计数产生一个中断,这里CPU时钟为72MHz(systick时钟源来自CPU),系统时钟中断周期为10ms(100Hz,1秒产生100个中断),所以sys_clock_hw_cycles_per_tick = 72000000 / 100 = 720000。sysTickReloadSet()函数定义在zephyr-zephyr-v1.13.0\drivers\timer\cortex_m_systick.c:
1 static ALWAYS_INLINE void sysTickReloadSet(2 u32_t count /* count from which timer is to count down */3 )4 {5 /*6 * Write the reload value and clear the current value in preparation7 * for enabling the timer.8 * The countflag in the control/status register is also cleared by9 * this operation.10 */11 SysTick->LOAD = count;12 SysTick->VAL = 0; /* also clears the countflag */13 }
第11行,设置重装载寄存器。
第12行,将计数值置0,在使能systick后就会马上触发中断。
回到_sys_clock_driver_init()函数,第18行,设置systick的中断优先级,这里_IRQ_PRIO_OFFSET的值为1,因此systick的中断优先级就为1。
第20行,使能systick。
第22行,马上触发systick中断,并自动重装计数值。
接下来看systick中断执行函数_timer_int_handler(),定义在zephyr-zephyr-v1.13.0\drivers\timer\cortex_m_systick.c:
1 void _timer_int_handler(void *unused)2 {3 ARG_UNUSED(unused);4 5 sys_trace_isr_enter();6 7 /* accumulate total counter value */8 clock_accumulated_count += sys_clock_hw_cycles_per_tick;9 10 /*11 * one more tick has occurred -- don't need to do anything special since12 * timer is already configured to interrupt on the following tick13 */14 _sys_clock_tick_announce();15 16 extern void _ExcExit(void);17 _ExcExit();18 }
第8行,累加系统启动后经历了多少个时钟计数,注意这里不是累加系统ticks的个数,因为累加时钟计数会更加精确。
第14行,调用_sys_clock_tick_announce()函数,定义在zephyr-zephyr-v1.13.0\include\drivers\ system_timer.h:
#define _sys_clock_tick_announce() \ _nano_sys_clock_tick_announce(_sys_idle_elapsed_ticks)
在没有使能TICKLESS_KERNEL配置的情况下参数_sys_idle_elapsed_ticks的值为1,实际上调用的是_nano_sys_clock_tick_announce()函数,定义在zephyr-zephyr-v1.13.0\kernel\ sys_clock.c:
1 void _nano_sys_clock_tick_announce(s32_t ticks)2 {3 unsigned int key;4 5 K_DEBUG("ticks: %d\n", ticks);6 7 /* 64-bit value, ensure atomic access with irq lock */8 key = irq_lock();9 _sys_clock_tick_count += ticks;10 irq_unlock(key);1112 handle_timeouts(ticks);1314 /* time slicing is basically handled like just yet another timeout */15 handle_time_slicing(ticks);16}
第9行,累加系统启动后所经历的ticks个数。
在分析第12行的handle_timeouts()函数之前,先说一下线程加入到超时队列的过程。线程通过调用k_sleep()等函数后,系统会将该线程加入到超时队列里,然后调度其他线程。k_sleep()对应的实现函数为_impl_k_sleep(),定义在zephyr-zephyr-v1.13.0\kernel\ sched.c:
1 void _impl_k_sleep(s32_t duration)2 {3 /* volatile to guarantee that irq_lock() is executed after ticks is4 * populated5 */6 volatile s32_t ticks;7 unsigned int key;8 9 __ASSERT(!_is_in_isr(), "");10 __ASSERT(duration != K_FOREVER, "");1112 K_DEBUG("thread %p for %d ns\n", _current, duration);1314 /* wait of 0 ms is treated as a 'yield' */15 if (duration == 0) {16 k_yield();17 return;18 }1920 ticks = _TICK_ALIGN + _ms_to_ticks(duration);21 key = irq_lock();2223 _remove_thread_from_ready_q(_current);24 _add_thread_timeout(_current, NULL, ticks);2526 _Swap(key);27}
第15行,如果传进来的时参数为0,则直接调用k_yield()函数,切换到其他线程,具体实现的话在下一篇随笔里再分析。
第20行,_TICK_ALIGN的值为1,即将睡眠时间以tick为单位补齐。
第23行,调用_remove_thread_from_ready_q()函数,定义在zephyr-zephyr-v1.13.0\kernel\ sched.c:
1 void _remove_thread_from_ready_q(struct k_thread *thread)2 {3 LOCKED(&sched_lock) {4 if (_is_thread_queued(thread)) {5 _priq_run_remove(&_kernel.ready_q.runq, thread);6 _mark_thread_as_not_queued(thread);7 update_cache(thread == _current);8 }9 }10 }
第4行,线程能够运行,那它的状态必须是已经_THREAD_QUEUED了的。
第5行,将线程从运行队列移除,那么线程就不会参与线程调度了。
第6行,设置线程状态不为_THREAD_QUEUED。
第7行,调用update_cache()函数,在上一篇随笔已经分析过了,这里不再重复。
回到_impl_k_sleep()函数,第24行,调用_add_thread_timeout()函数,定义在zephyr-zephyr-v1.13.0\kernel\include\timeout_q.h:
1 static inline void _add_thread_timeout(struct k_thread *thread,2 _wait_q_t *wait_q,3 s32_t timeout_in_ticks)4 {5 _add_timeout(thread, &thread->base.timeout, wait_q, timeout_in_ticks);6 }
实际上调用的是_add_timeout()函数,定义在zephyr-zephyr-v1.13.0\kernel\include\timeout_q.h:
1 static inline void _add_timeout(struct k_thread *thread,2 struct _timeout *timeout,3 _wait_q_t *wait_q,4 s32_t timeout_in_ticks)5 {6 __ASSERT(timeout_in_ticks >= 0, "");7 8 timeout->delta_ticks_from_prev = timeout_in_ticks;9 timeout->thread = thread;10 timeout->wait_q = (sys_dlist_t *)wait_q;1112 K_DEBUG("before adding timeout %p\n", timeout);1314 /* If timer is submitted to expire ASAP with15 * timeout_in_ticks (duration) as zero value,16 * then handle timeout immedately without going17 * through timeout queue.18 */19 if (!timeout_in_ticks) {20 _handle_one_expired_timeout(timeout);21 return;22 }2324 s32_t *delta = &timeout->delta_ticks_from_prev;25 struct _timeout *in_q;2627 SYS_DLIST_FOR_EACH_CONTAINER(&_timeout_q, in_q, node) {28 if (*delta <= in_q->delta_ticks_from_prev) {29 in_q->delta_ticks_from_prev -= *delta;30 sys_dlist_insert_before(&_timeout_q, &in_q->node,31 &timeout->node);32 goto inserted;33 }3435 *delta -= in_q->delta_ticks_from_prev;36 }3738 sys_dlist_append(&_timeout_q, &timeout->node);3940inserted:41 K_DEBUG("after adding timeout %p\n", timeout);42}
第19行,很明显timeout_in_ticks的值不为0。
第27~38行, 按delta_ticks_from_prev的值由小到大插入到_timeout_q超时队列里。由此可知,超时队列里存放的是与前一个线程的时间的差值,而不是绝对值。
回到_impl_k_sleep()函数,第26行,调用_Swap()函数,把线程切换出去,这在下一篇随笔再分析。
好了,有了这些基础之后,现在回到_nano_sys_clock_tick_announce()函数,第12行,调用handle_timeouts()函数,定义在zephyr-zephyr-v1.13.0\kernel\ sys_clock.c:
1 static inline void handle_timeouts(s32_t ticks)2 {3 sys_dlist_t expired;4 unsigned int key;5 6 /* init before locking interrupts */7 sys_dlist_init(&expired);8 9 key = irq_lock();10 11 sys_dnode_t *next = sys_dlist_peek_head(&_timeout_q);12 struct _timeout *timeout = (struct _timeout *)next;13 14 K_DEBUG("head: %p, delta: %d\n",15 timeout, timeout ? timeout->delta_ticks_from_prev : -2112);16 17 if (!next) {18 irq_unlock(key);19 return;20 }21 22 /*23 * Dequeue all expired timeouts from _timeout_q, relieving irq lock24 * pressure between each of them, allowing handling of higher priority25 * interrupts. We know that no new timeout will be prepended in front26 * of a timeout which delta is 0, since timeouts of 0 ticks are27 * prohibited.28 */29 30 while (next) {31 32 /*33 * In the case where ticks number is greater than the first34 * timeout delta of the list, the lag produced by this initial35 * difference must also be applied to others timeouts in list36 * until it was entirely consumed.37 */38 39 s32_t tmp = timeout->delta_ticks_from_prev;40 41 if (timeout->delta_ticks_from_prev < ticks) {42 timeout->delta_ticks_from_prev = 0;43 } else {44 timeout->delta_ticks_from_prev -= ticks;45 }46 47 ticks -= tmp;48 49 next = sys_dlist_peek_next(&_timeout_q, next);50 51 if (timeout->delta_ticks_from_prev == 0) {52 sys_dnode_t *node = &timeout->node;53 54 sys_dlist_remove(node);55 56 /*57 * Reverse the order that that were queued in the58 * timeout_q: timeouts expiring on the same ticks are59 * queued in the reverse order, time-wise, that they are60 * added to shorten the amount of time with interrupts61 * locked while walking the timeout_q. By reversing the62 * order _again_ when building the expired queue, they63 * end up being processed in the same order they were64 * added, time-wise.65 */66 67 sys_dlist_prepend(&expired, node);68 69 timeout->delta_ticks_from_prev = _EXPIRED;70 71 } else if (ticks <= 0) {72 break;73 }74 75 irq_unlock(key);76 key = irq_lock();77 78 timeout = (struct _timeout *)next;79 }80 81 irq_unlock(key);82 83 _handle_expired_timeouts(&expired);84 }
代码有点多,但是原理比较简单。
第7行,初始化一个超时双向链表,用于后面存放已经超时(到期)的线程。
第11行,取出超时队列的头节点。
第17行,即如果超时队列为空(没有超时任务要处理),则直接返回。
第30行,遍历超时队列。
第41行,如果取出的线程剩余的超时时间小于ticks(这里是1),则说面线程到期了,第42行将线程的超时时间置为0。否则,第44行,将超时时间减ticks。
第47行,剩下的ticks个数,其值可能为负数。
第49行,取出下一个节点。
第51行,如果当前线程的超时时间已经到了,则if条件成立。
第54行,将当前线程从超时队列移除。
第67行,将当前线程加入到临时队列里,后面会统一处理这个队列里的线程。
第69行,将当前线程的超时时间置为_EXPIRED。
如此循环,直到ticks用完(其值小于等于0),然后跳出循环,调用83行的_handle_expired_timeouts()函数,定义在zephyr-zephyr-v1.13.0\kernel\include\timeout_q.h:
1 static inline void _handle_expired_timeouts(sys_dlist_t *expired)2 {3 struct _timeout *timeout;4 5 SYS_DLIST_FOR_EACH_CONTAINER(expired, timeout, node) {6 _handle_one_expired_timeout(timeout);7 }8 }
即遍历临时队列,每次调用_handle_one_expired_timeout()函数,定义在zephyr-zephyr-v1.13.0\kernel\include\timeout_q.h:
1 static inline void _handle_one_expired_timeout(struct _timeout *timeout)2 {3 struct k_thread *thread = timeout->thread;4 unsigned int key = irq_lock();5 6 timeout->delta_ticks_from_prev = _INACTIVE;7 8 K_DEBUG("timeout %p\n", timeout);9 if (thread) {10 _unpend_thread_timing_out(thread, timeout);11 _mark_thread_as_started(thread);12 _ready_thread(thread);13 irq_unlock(key);14 } else {15 irq_unlock(key);16 if (timeout->func) {17 timeout->func(timeout);18 }19 }20 }
第6行,将超时时间置为_INACTIVE。
超时的方式有两种,一是线程调用k_sleep()等函数后将自己挂起导致的超时,二是线程调用软件定时器k_timer_start()函数导致的超时,线程本身不会挂起,只是开启了一个定时器。所以就有了第9行和第14行两种不同路径。
先看第一种方式,第10行,调用_unpend_thread_timing_out()函数,定义在zephyr-zephyr-v1.13.0\kernel\include\timeout_q.h:
1 static inline void _unpend_thread_timing_out(struct k_thread *thread,2 struct _timeout *timeout_obj)3 {4 if (timeout_obj->wait_q) {5 _unpend_thread_no_timeout(thread);6 thread->base.timeout.wait_q = NULL;7 }8 }
第5行,调用_unpend_thread_no_timeout()函数,定义在zephyr-zephyr-v1.13.0\kernel\sched.c:
1 void _unpend_thread_no_timeout(struct k_thread *thread)2 {3 LOCKED(&sched_lock) {4 _priq_wait_remove(&pended_on(thread)->waitq, thread);5 _mark_thread_as_not_pending(thread);6 }7 }
第4行,实际上调用的是_priq_dumb_remove()函数,定义在zephyr-zephyr-v1.13.0\kernel\sched.c:
void _priq_dumb_remove(sys_dlist_t *pq, struct k_thread *thread){ __ASSERT_NO_MSG(!_is_idle(thread)); sys_dlist_remove(&thread->base.qnode_dlist);}
将线程从队列移除。
回到_unpend_thread_no_timeout()函数,第5行,将线程状态设置为不是_THREAD_PENDING。
回到_handle_one_expired_timeout()函数,第11~12行这两个函数在上一篇随笔里已经分析过了。第16~17行,如果定时器超时函数不为空,则调用定时器超时函数。
至此,handle_timeouts()函数分析完了。
回到_nano_sys_clock_tick_announce()函数,第15行,调用handle_time_slicing()函数,定义在zephyr-zephyr-v1.13.0\kernel\sys_clock.c:
1 static void handle_time_slicing(s32_t ticks)2 {3 if (!_is_thread_time_slicing(_current)) {4 return;5 }6 7 _time_slice_elapsed += ticks;8 if (_time_slice_elapsed >= _time_slice_duration) {9 10 unsigned int key;1112 _time_slice_elapsed = 0;1314 key = irq_lock();15 _move_thread_to_end_of_prio_q(_current);16 irq_unlock(key);17 }18}
第3行,调用_is_thread_time_slicing()函数,定义在zephyr-zephyr-v1.13.0\kernel\sched.c:
1 int _is_thread_time_slicing(struct k_thread *thread)2 {3 int ret = 0;4 5 /* Should fix API. Doesn't make sense for non-running threads6 * to call this7 */8 __ASSERT_NO_MSG(thread == _current);9 10 if (_time_slice_duration <= 0 || !_is_preempt(thread) ||11 _is_prio_higher(thread->base.prio, _time_slice_prio_ceiling)) {12 return 0;13 }14 15 16 LOCKED(&sched_lock) {17 struct k_thread *next = _priq_run_best(&_kernel.ready_q.runq);18 19 if (next) {20 ret = thread->base.prio == next->base.prio;21 }22 }23 24 return ret;25 }
第10~13行,_time_slice_duration的值在系统启动时就设置了。_is_preempt()函数:
static inline int _is_preempt(struct k_thread *thread){ /* explanation in kernel_struct.h */ return thread->base.preempt <= _PREEMPT_THRESHOLD;}
_PREEMPT_THRESHOLD的值为127。即如果线程的优先级小于128则_is_preempt()返回1。
_is_prio_higher()比较当前线程的优先级是否高于_time_slice_prio_ceiling的值(也是在系统启动时就设置了),如果这三个条件有一个成立了,则不会处理时间片相关的内容。
第17行,调用_priq_run_best()函数取出运行队列的头节点,即优先级最高的线程。只有运行队列的头节点的优先级与当前线程的优先级相等才会继续往下处理。
回到handle_time_slicing()函数,第7行,累加ticks个数。
第8行,如果累加的ticks个数大于等于配置的时间片数,则if条件成立。
第12行,将累加的ticks个数清0。
第15行,调用_move_thread_to_end_of_prio_q()函数,定义在zephyr-zephyr-v1.13.0\kernel\sched.c:
1 void _move_thread_to_end_of_prio_q(struct k_thread *thread)2 {3 LOCKED(&sched_lock) {4 _priq_run_remove(&_kernel.ready_q.runq, thread);5 _priq_run_add(&_kernel.ready_q.runq, thread);6 _mark_thread_as_queued(thread);7 update_cache(0);8 }9 }
第4~7行,这几个函数前面都已经分析过了。
到这里就可以知道,要使用时间片轮转的调度方式,需要以下设置:
1.配置时间片大小(大于0)和优先级;
2.所有创建的线程的优先级要相同,并且优先级要比1中的优先级高;
仔细思考会发现目前这种超时处理机制对延时(休眠)的时间是不准确的,因此这种机制总是以tick为单位进行延时(休眠),也即时间只能精确到tick。那有没有其他方法可以准确延时(休眠)呢?肯定是有的,就是需要打开TICKLESS_KERNEL配置,其原理就是不以tick(假如10ms)为固定时间进行定时,而是每次根据需要延时(休眠)的最小时间进行定时,这样就能实现精确的延时(休眠),zephyr是支持这种精确定时方式的,感兴趣的可以去研究研究。