zephyr/sys/sync/
semaphore.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright (c) 2024 Linaro LTD
// SPDX-License-Identifier: Apache-2.0

//! Zephyr Semaphore support
//!
//! This is a thin wrapper around Zephyr's `k_sem`.  This is one of the few of the `sys` primitives
//! in Zephyr that is actually perfectly usable on its own, without needing additional wrappers.
//!
//! Zephyr implements counting semaphores, with both an upper and lower bound on the count.  Note
//! that calling 'give' on a semaphore that is at the maximum count will discard the 'give'
//! operation, which in situation where counting is actually desired, will result in the count being
//! incorrect.

use core::ffi::c_uint;
use core::fmt;
#[cfg(CONFIG_RUST_ALLOC)]
use core::future::Future;
#[cfg(CONFIG_RUST_ALLOC)]
use core::pin::Pin;
#[cfg(CONFIG_RUST_ALLOC)]
use core::task::{Context, Poll};

#[cfg(CONFIG_RUST_ALLOC)]
use zephyr_sys::ETIMEDOUT;

#[cfg(CONFIG_RUST_ALLOC)]
use crate::kio::ContextExt;
use crate::object::{ObjectInit, ZephyrObject};
#[cfg(CONFIG_RUST_ALLOC)]
use crate::time::NoWait;
use crate::{
    error::{to_result_void, Result},
    raw::{k_sem, k_sem_count_get, k_sem_give, k_sem_init, k_sem_reset, k_sem_take},
    time::Timeout,
};

pub use crate::raw::K_SEM_MAX_LIMIT;

/// General Zephyr Semaphores
pub struct Semaphore(pub(crate) ZephyrObject<k_sem>);

/// By nature, Semaphores are both Sync and Send.  Safety is handled by the underlying Zephyr
/// implementation (which is why Clone is also implemented).
unsafe impl Sync for Semaphore {}
unsafe impl Send for Semaphore {}

impl Semaphore {
    /// Create a new semaphore.
    ///
    /// Create a new dynamically allocated Semaphore.  This semaphore can only be used from system
    /// threads.  The arguments are as described in [the
    /// docs](https://docs.zephyrproject.org/latest/kernel/services/synchronization/semaphores.html).
    ///
    /// Note that this API has changed, and it now doesn't return a Result, since the Result time
    /// generally doesn't work (in stable rust) with const.
    #[cfg(CONFIG_RUST_ALLOC)]
    pub const fn new(initial_count: c_uint, limit: c_uint) -> Semaphore {
        // Due to delayed init, we need to replicate the object checks in the C `k_sem_init`.

        if limit == 0 || initial_count > limit {
            panic!("Invalid semaphore initialization");
        }

        let this = <ZephyrObject<k_sem>>::new_raw();

        unsafe {
            let addr = this.get_uninit();
            (*addr).count = initial_count;
            (*addr).limit = limit;
        }

        // to_result_void(k_sem_init(item.get(), initial_count, limit))?;
        Semaphore(this)
    }

    /// Take a semaphore.
    ///
    /// Can be called from ISR if called with [`NoWait`].
    ///
    /// [`NoWait`]: crate::time::NoWait
    pub fn take<T>(&self, timeout: T) -> Result<()>
    where
        T: Into<Timeout>,
    {
        let timeout: Timeout = timeout.into();
        let ret = unsafe { k_sem_take(self.0.get(), timeout.0) };
        to_result_void(ret)
    }

    /// Take a semaphore, async version.
    ///
    /// Returns a future that either waits for the semaphore, or returns status.
    #[cfg(CONFIG_RUST_ALLOC)]
    pub fn take_async<'a>(
        &'a self,
        timeout: impl Into<Timeout>,
    ) -> impl Future<Output = Result<()>> + 'a {
        SemTake {
            sem: self,
            timeout: timeout.into(),
            ran: false,
        }
    }

    /// Give a semaphore.
    ///
    /// This routine gives to the semaphore, unless the semaphore is already at its maximum
    /// permitted count.
    pub fn give(&self) {
        unsafe { k_sem_give(self.0.get()) }
    }

    /// Resets a semaphor's count to zero.
    ///
    /// This resets the count to zero.  Any outstanding [`take`] calls will be aborted with
    /// `Error(EAGAIN)`.
    ///
    /// [`take`]: Self::take
    pub fn reset(&self) {
        unsafe { k_sem_reset(self.0.get()) }
    }

    /// Get a semaphore's count.
    ///
    /// Returns the current count.
    pub fn count_get(&self) -> usize {
        unsafe { k_sem_count_get(self.0.get()) as usize }
    }
}

impl ObjectInit<k_sem> for ZephyrObject<k_sem> {
    fn init(item: *mut k_sem) {
        // SAFEFY: Get the initial values used in new.  The address may have changed, but only due
        // to a move.
        unsafe {
            let count = (*item).count;
            let limit = (*item).limit;

            if k_sem_init(item, count, limit) != 0 {
                // Note that with delayed init, we cannot do anything with invalid values.  We're
                // replicated the check in `new` above, so would only catch semantic changes in the
                // implementation of `k_sem_init`.
                unreachable!();
            }
        }
    }
}

/// The async 'take' Future
#[cfg(CONFIG_RUST_ALLOC)]
struct SemTake<'a> {
    /// The semaphore we're waiting on.
    sem: &'a Semaphore,
    /// The timeout to use.
    timeout: Timeout,
    /// Set after we've waited once.
    ran: bool,
}

#[cfg(CONFIG_RUST_ALLOC)]
impl<'a> Future for SemTake<'a> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Always check if data is available.
        if let Ok(()) = self.sem.take(NoWait) {
            return Poll::Ready(Ok(()));
        }

        if self.ran {
            // If we ran once, and still don't have any data, indicate this as a timeout.
            return Poll::Ready(Err(crate::Error(ETIMEDOUT)));
        }

        // TODO: Clean this up.
        cx.add_semaphore(self.sem, self.timeout);
        self.ran = true;

        Poll::Pending
    }
}

impl fmt::Debug for Semaphore {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "sys::Semaphore")
    }
}