zephyr/work.rs
1//! Zephyr Work Queues
2//!
3//! # Zephyr Work Queues and Work
4//!
5//! Zephyr has a mechanism called a
6//! [Workqueue](https://docs.zephyrproject.org/latest/kernel/services/threads/workqueue.html).
7//!
8//! Each workqueue is backed by a single Zephyr thread, and has its own stack. The work queue
9//! consists of a FIFO queue of work items that will be run consecutively on that thread. The
10//! underlying types are `k_work_q` for the work queue itself, and `k_work` for the worker.
11//!
12//! In addition to the simple schedulable work, Zephyr also has two additional types of work:
13//! `k_work_delayable` which can be scheduled to run in the future, and `k_work_poll`, described as
14//! triggered work in the docs. This can be scheduled to run when various items within Zephyr
15//! become available. This triggered work also has a timeout. In this sense the triggered work is
16//! a superset of the other types of work. Both delayable and triggered work are implemented by
17//! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given
18//! reason happens.
19//!
20//! At this time, only the basic work queue type is supported.
21//!
22//! Zephyr's work queues can be used in different ways:
23//!
24//! - Work can be scheduled as needed. For example, an IRQ handler can queue a work item to process
25//! data it has received from a device.
26//! - Work can be scheduled periodically.
27//!
28//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled
29//! when the work is complete. The work queue scheduling functions are designed, and intended, for
30//! a given work item to be able to reschedule itself, and such usage is common.
31//!
32//! ## Ownership
33//!
34//! The remaining challenge with implementing `k_work` for Rust is that of ownership. The model
35//! taken here is that the work items are held in a `Box` that is effectively owned by the work
36//! itself. When the work item is scheduled to Zephyr, ownership of that box is effectively handed
37//! off to C, and then when the work item is called, the Box re-constructed. This repeats until the
38//! work is no longer needed, at which point the work will be dropped.
39//!
40//! There are two common ways the lifecycle of work can be managed in an embedded system:
41//!
42//! - A set of `Future`'s are allocated once at the start, and these never return a value. Work
43//! Futures inside of this (which correspond to `.await` in async code) can have lives and return
44//! values, but the main loops will not return values, or be dropped. Embedded Futures will
45//! typically not be boxed.
46//!
47//! One consequence of the ownership being passed through to C code is that if the work cancellation
48//! mechanism is used on a work queue, the work items themselves will be leaked.
49//!
50//! These work items are also `Pin`, to ensure that the work actions are not moved.
51//!
52//! ## The work queues themselves
53//!
54//! Workqueues themselves are built using [`WorkQueueBuilder`]. This needs a statically defined
55//! stack. Typical usage will be along the lines of:
56//! ```rust
57//! kobj_define! {
58//! WORKER_STACK: ThreadStack<2048>;
59//! }
60//! // ...
61//! let main_worker = Box::new(
62//! WorkQueueBuilder::new()
63//! .set_priority(2).
64//! .set_name(c"mainloop")
65//! .set_no_yield(true)
66//! .start(MAIN_LOOP_STACK.init_once(()).unwrap())
67//! );
68//!
69//! let _ = zephyr::kio::spawn(
70//! mainloop(), // Async or function returning Future.
71//! &main_worker,
72//! c"w:mainloop",
73//! );
74//!
75//! ...
76//!
77//! // Leak the Box so that the worker is never freed.
78//! let _ = Box::leak(main_worker);
79//! ```
80//!
81//! It is important that WorkQueues never be dropped. It has a Drop implementation that invokes
82//! panic. Zephyr provides no mechanism to stop work queue threads, so dropping would result in
83//! undefined behavior.
84//!
85//! # Current Status
86//!
87//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of
88//! the behavior, and this implementation only implements this type. Non Future work could be built
89//! around the other work types.
90//!
91//! As such, this means that manually constructed work is still built using `Future`. The `_async`
92//! primitives throughout this crate can be used just as readily by hand-written Futures as by async
93//! code. Notable, the use of [`Signal`] will likely be common, along with possible timeouts.
94//!
95//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore
96//! [`sync::channel`]: crate::sync::channel
97//! [`sync::Mutex`]: crate::sync::Mutex
98//! [`join`]: futures::JoinHandle::join
99//! [`join_async`]: futures::JoinHandle::join_async
100
101extern crate alloc;
102
103use core::{
104 cell::UnsafeCell,
105 ffi::{c_int, c_uint, CStr},
106 mem,
107 pin::Pin,
108};
109
110use zephyr_sys::{
111 k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
112 k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
113 k_work_queue_start, k_work_submit, k_work_submit_to_queue,
114};
115
116use crate::{
117 error::to_result_void,
118 object::Fixed,
119 simpletls::SimpleTls,
120 sync::{Arc, Mutex},
121 sys::thread::ThreadStack,
122};
123
124/// A builder for work queues themselves.
125///
126/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
127/// queue. Various types of `Work` can be submitted to these queues, along with various types of
128/// triggering conditions.
129pub struct WorkQueueBuilder {
130 /// The "config" value passed in.
131 config: k_work_queue_config,
132 /// Priority for the work queue thread.
133 priority: c_int,
134}
135
136impl WorkQueueBuilder {
137 /// Construct a new WorkQueueBuilder with default values.
138 pub fn new() -> Self {
139 Self {
140 config: Default::default(),
141 priority: 0,
142 }
143 }
144
145 /// Set the name for the WorkQueue thread.
146 ///
147 /// This name shows up in debuggers and some analysis tools.
148 pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
149 self.config.name = name.as_ptr();
150 self
151 }
152
153 /// Set the "no yield" flag for the created worker.
154 ///
155 /// If this is not set, the work queue will call `k_yield` between each enqueued work item. For
156 /// non-preemptible threads, this will allow other threads to run. For preemptible threads,
157 /// this will allow other threads at the same priority to run.
158 ///
159 /// This method has a negative in the name, which goes against typical conventions. This is
160 /// done to match the field in the Zephyr config.
161 pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
162 self.config.no_yield = value;
163 self
164 }
165
166 /// Set the "essential" flag for the created worker.
167 ///
168 /// This sets the essential flag on the running thread. The system considers the termination of
169 /// an essential thread to be a fatal error.
170 pub fn set_essential(&mut self, value: bool) -> &mut Self {
171 self.config.essential = value;
172 self
173 }
174
175 /// Set the priority for the worker thread.
176 ///
177 /// See the Zephyr docs for the meaning of priority.
178 pub fn set_priority(&mut self, value: c_int) -> &mut Self {
179 self.priority = value;
180 self
181 }
182
183 /// Start the given work queue thread.
184 ///
185 /// TODO: Implement a 'start' that works from a static work queue.
186 pub fn start(&self, stack: ThreadStack) -> WorkQueue {
187 let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
188 unsafe {
189 // SAFETY: Initialize zeroed memory.
190 k_work_queue_init(item.get());
191
192 // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is
193 // a pointer into this work item, which will not move, because of the Fixed.
194 let this = &mut *item.get();
195 WORK_QUEUES
196 .lock()
197 .unwrap()
198 .insert(&this.thread, WorkQueueRef(item.get()));
199
200 // SAFETY: Start work queue thread. The main issue here is that the work queue cannot
201 // be deallocated once the thread has started. We enforce this by making Drop panic.
202 k_work_queue_start(
203 item.get(),
204 stack.base,
205 stack.size,
206 self.priority,
207 &self.config,
208 );
209 }
210
211 WorkQueue { item }
212 }
213}
214
215/// A running work queue thread.
216///
217/// # Panic
218///
219/// Allowing a work queue to drop will result in a panic. There are two ways to handle this,
220/// depending on whether the WorkQueue is in a Box, or an Arc:
221/// ```
222/// // Leak a work queue in an Arc.
223/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
224/// // If the Arc is used after this:
225/// let _ = Arc::into_raw(wq.clone());
226/// // If the Arc is no longer needed:
227/// let _ = Arc::into_raw(wq);
228///
229/// // Leak a work queue in a Box.
230/// let wq = Box::new(WorkQueueBuilder::new().start(...));
231/// let _ = Box::leak(wq);
232/// ```
233pub struct WorkQueue {
234 #[allow(dead_code)]
235 item: Fixed<k_work_q>,
236}
237
238/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
239unsafe impl Send for WorkQueue {}
240unsafe impl Sync for WorkQueue {}
241
242impl Drop for WorkQueue {
243 fn drop(&mut self) {
244 panic!("WorkQueues must not be dropped");
245 }
246}
247
248/// A simple mapping to get the current work_queue from the currently running thread.
249///
250/// This assumes that Zephyr's works queues have a 1:1 mapping between the work queue and the
251/// thread.
252///
253/// # Safety
254///
255/// The work queue is protected with a sync Mutex (which uses an underlying Zephyr mutex). It is,
256/// in general, not a good idea to use a mutex in a work queue, as deadlock can happen. So it is
257/// important to both never .await while holding the lock, as well as to make sure operations within
258/// it are relatively fast. In this case, `insert` and `get` on the SimpleTls are reasonably fast.
259/// `insert` is usually done just at startup as well.
260///
261/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
262/// manually right now.
263static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
264
265/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
266/// implement stop.
267#[derive(Copy, Clone)]
268struct WorkQueueRef(*mut k_work_q);
269
270// SAFETY: The work queue reference is also safe for both Send and Sync per Zephyr semantics.
271unsafe impl Send for WorkQueueRef {}
272unsafe impl Sync for WorkQueueRef {}
273
274/// Retrieve the current work queue, if we are running within one.
275pub fn get_current_workq() -> Option<*mut k_work_q> {
276 WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
277}
278
279/// A Rust wrapper for `k_poll_signal`.
280///
281/// A signal in Zephyr is an event mechanism that can be used to trigger actions in event queues to
282/// run. The work somewhat like a kind of half boolean semaphore. The signaling is robust in the
283/// direction of the event happening, as in a blocked task will definitely wake when the signal happens. However, the clearing of the signal is racy. Generally, there are two ways to do this:
284///
285/// - A work action can clear the signal as soon as it wakes up, before it starts processing any
286/// data the signal was meant to indicate. If the race happens, the processing will handle the
287/// extra data.
288/// - A work action can clear the signal after it does it's processing. This is useful for things
289/// like periodic timers, where if it is still processing when an additional timer tick comes in,
290/// that timer tick will be ignored. This is useful for periodic events where it is better to
291/// just skip a tick rather than for them to "stack up" and get behind.
292///
293/// Notably, as long as the `reset` method is only ever called by the worker that is waiting upon
294/// it, there shouldn't ever be a race in the `wait_async` itself.
295///
296/// Signals can pass a `c_int` from the signalling task to the task that is waiting for the signal.
297/// It is not specified in the Zephyr documentation what value will be passed if `raise` is called
298/// multiple times before a task waits upon a signal. The current implementation will return the
299/// most recent raised `result` value.
300///
301/// For most other use cases, channels or semaphores are likely to be better solutions.
302pub struct Signal {
303 /// The raw Zephyr `k_poll_signal`.
304 pub(crate) item: Fixed<k_poll_signal>,
305}
306
307// SAFETY: Zephyr's API maintains thread safety.
308unsafe impl Send for Signal {}
309unsafe impl Sync for Signal {}
310
311impl Signal {
312 /// Create a new `Signal`.
313 ///
314 /// The Signal will be in the non-signaled state.
315 pub fn new() -> Signal {
316 // SAFETY: The memory is zero initialized, and Fixed ensure that it never changes address.
317 let item: Fixed<k_poll_signal> = Fixed::new(unsafe { mem::zeroed() });
318 unsafe {
319 k_poll_signal_init(item.get());
320 }
321 Signal { item }
322 }
323
324 /// Reset the Signal
325 ///
326 /// This resets the signal state to unsignaled.
327 ///
328 /// Please see the [`Signal`] documentation on how to handle the races that this implies.
329 pub fn reset(&self) {
330 // SAFETY: This is safe with a non-mut reference, as the purpose of the Zephyr API is to
331 // coordinate this information between threads.
332 unsafe {
333 k_poll_signal_reset(self.item.get());
334 }
335 }
336
337 /// Check the status of a signal.
338 ///
339 /// This reads the status of the signal. If the state is "signalled", this will return
340 /// `Some(result)` where the `result` is the result value given to [`raise`].
341 ///
342 /// [`raise`]: Self::raise
343 pub fn check(&self) -> Option<c_int> {
344 let mut signaled: c_uint = 0;
345 let mut result: c_int = 0;
346 unsafe {
347 // SAFETY: Zephyr's signal API coordinates access across threads.
348 k_poll_signal_check(self.item.get(), &mut signaled, &mut result);
349 }
350
351 if signaled != 0 {
352 Some(result)
353 } else {
354 None
355 }
356 }
357
358 /// Signal a signal object.
359 ///
360 /// This will signal to any worker that is waiting on this object that the event has happened.
361 /// The `result` will be returned from the worker's `wait` call.
362 ///
363 /// As per the Zephyr docs, this could return an EAGAIN error if the polling thread is in the
364 /// process of expiring. The implication is that the signal will not be raised in this case.
365 /// ...
366 pub fn raise(&self, result: c_int) -> crate::Result<()> {
367 to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) })
368 }
369}
370
371impl Default for Signal {
372 fn default() -> Self {
373 Signal::new()
374 }
375}
376
377/// Possible returns from work queue submission.
378#[derive(Debug, Clone, Copy)]
379pub enum SubmitResult {
380 /// This work was already in a queue.
381 AlreadySubmitted,
382 /// The work has been added to the specified queue.
383 Enqueued,
384 /// The queue was called from the worker itself, and has been queued to the queue that was
385 /// running it.
386 WasRunning,
387}
388
389impl SubmitResult {
390 /// Does this result indicate that the work was enqueued?
391 pub fn enqueued(self) -> bool {
392 matches!(self, Self::Enqueued | Self::WasRunning)
393 }
394
395 /// Convert an int result from a work submit function.
396 fn to_result(value: c_int) -> crate::Result<Self> {
397 crate::error::to_result(value).map(|code| match code {
398 0 => Self::AlreadySubmitted,
399 1 => Self::Enqueued,
400 2 => Self::WasRunning,
401 _ => panic!("Unexpected result {} from Zephyr work submission", code),
402 })
403 }
404}
405
406/// A simple action that just does something with its data.
407///
408/// This is similar to a Future, except there is no concept of it completing. It manages its
409/// associated data however it wishes, and is responsible for re-queuing as needed.
410///
411/// Note, specifically, that the Act does not take a mutable reference. This is because the Work
412/// below uses an Arc, so this data can be shared.
413pub trait SimpleAction {
414 /// Perform the action.
415 fn act(self: Pin<&Self>);
416}
417
418/// A basic Zephyr work item.
419///
420/// Holds a `k_work`, along with the data associated with that work. When the work is queued, the
421/// `act` method will be called on the provided `SimpleAction`.
422pub struct Work<T> {
423 work: UnsafeCell<k_work>,
424 action: T,
425}
426
427/// SAFETY: Work queues can be sent as long as the action itself can be.
428unsafe impl<F> Send for Work<F>
429where
430 F: SimpleAction,
431 F: Send,
432{
433}
434
435/// SAFETY: Work queues are Sync when the action is.
436unsafe impl<F> Sync for Work<F>
437where
438 F: SimpleAction,
439 F: Sync,
440{
441}
442
443impl<T: SimpleAction + Send> Work<T> {
444 /// Construct a new Work from the given action.
445 ///
446 /// Note that the data will be moved into the pinned Work. The data is internal, and only
447 /// accessible to the work thread (the `act` method). If shared data is needed, normal
448 /// inter-thread sharing mechanisms are needed.
449 ///
450 /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
451 pub fn new(action: T) -> Pin<Arc<Self>> {
452 let this = Arc::pin(Self {
453 // SAFETY: will be initialized below, after this is pinned.
454 work: unsafe { mem::zeroed() },
455 action,
456 });
457
458 // SAFETY: Initializes above zero-initialized struct.
459 unsafe {
460 k_work_init(this.work.get(), Some(Self::handler));
461 }
462
463 this
464 }
465
466 /// Submit this work to the system work queue.
467 ///
468 /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an
469 /// explanation of them.
470 pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
471 // We "leak" the arc, so that when the handler runs, it can be safely turned back into an
472 // Arc, and the drop on the arc will then run.
473 let work = this.work.get();
474
475 // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
476 // back into a Pin when it reconstructs the Arc.
477 let this = unsafe { Pin::into_inner_unchecked(this) };
478 let _ = Arc::into_raw(this);
479
480 // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
481 // work item is no longer queued when the data is dropped.
482 SubmitResult::to_result(unsafe { k_work_submit(work) })
483 }
484
485 /// Submit this work to a specified work queue.
486 ///
487 /// TODO: Change when we have better wrappers for work queues.
488 pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
489 let work = this.work.get();
490
491 // "leak" the arc to give to C. We'll reconstruct it in the handler.
492 // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
493 // back into a Pin when it reconstructs the Arc.
494 let this = unsafe { Pin::into_inner_unchecked(this) };
495 let _ = Arc::into_raw(this);
496
497 // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
498 // work item is no longer queued when the data is dropped.
499 SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
500 }
501
502 /// Callback, through C, but bound by a specific type.
503 extern "C" fn handler(work: *mut k_work) {
504 // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
505 // necessarily at the beginning of the struct.
506 // SAFETY: Converts raw pointer to work back into the box.
507 let this = unsafe { Self::from_raw(work) };
508
509 // Access the action within, still pinned.
510 // SAFETY: It is safe to keep the pin on the interior.
511 let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
512
513 action.act();
514 }
515
516 /*
517 /// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw`
518 /// called to avoid leaking the item.
519 fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
520 // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
521 // generally use move.
522 let this = unsafe { Pin::into_inner_unchecked(this) };
523 Arc::into_raw(this)
524 }
525 */
526
527 /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
528 unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
529 // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the
530 // pointer is valid.
531 let ptr = ptr
532 .cast::<u8>()
533 .sub(mem::offset_of!(Self, work))
534 .cast::<Self>();
535 let this = Arc::from_raw(ptr);
536 Pin::new_unchecked(this)
537 }
538
539 /// Access the inner action.
540 pub fn action(&self) -> &T {
541 &self.action
542 }
543}