zephyr/embassy/executor.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
//! An embassy executor tailored for Zephyr
use core::{marker::PhantomData, sync::atomic::Ordering};
use embassy_executor::{raw, Spawner};
use zephyr_sys::{k_current_get, k_thread_resume, k_thread_suspend, k_tid_t};
use crate::sync::atomic::AtomicBool;
/// Zephyr-thread based executor.
pub struct Executor {
inner: Option<raw::Executor>,
id: k_tid_t,
pend: AtomicBool,
not_send: PhantomData<*mut ()>,
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
let id = unsafe { k_current_get() };
Self {
inner: None,
pend: AtomicBool::new(false),
id,
not_send: PhantomData,
}
}
/// Run the executor.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
let context = self as *mut _ as *mut ();
self.inner.replace(raw::Executor::new(context));
let inner = self.inner.as_mut().unwrap();
init(inner.spawner());
loop {
unsafe {
// The raw executor's poll only runs things that were queued _before_ this poll
// itself is actually run. This means, specifically, that if the polled execution
// causes this, or other threads to enqueue, this will return without running them.
// `__pender` _will_ be called, but it isn't "sticky" like `wfe/sev` are. To
// simulate this, we will use the 'pend' atomic to count
inner.poll();
if !self.pend.swap(false, Ordering::SeqCst) {
// printkln!("_suspend");
k_thread_suspend(k_current_get());
}
}
}
}
}
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}
#[export_name = "__pender"]
fn __pender(context: *mut ()) {
unsafe {
let myself = k_current_get();
let this = context as *const Executor;
let other = (*this).id;
// If the other is a different thread, resume it.
if other != myself {
// printkln!("_resume");
k_thread_resume(other);
}
// Otherwise, we need to make sure our own next suspend doesn't happen.
// We need to also prevent a suspend from happening in the case where the only running
// thread causes other work to become pending. The resume below will do nothing, as we
// are just running.
(*this).pend.store(true, Ordering::SeqCst);
}
}