zephyr/sync/
channel.rs

1//! Close-to-Zephyr channels
2//!
3//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we
4//! can get, directly using Zephyr primitives.
5//!
6//! The channels are built around `k_queue` in Zephyr.  As is the case with most Zephyr types,
7//! these are typically statically allocated.  Similar to the other close-to-zephyr primitives,
8//! this means that there is a constructor that can directly take one of these primitives.
9//!
10//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr.
11//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an
12//! internal `Arc` inside them, but without the overhead of an actual Arc.
13//!
14//! ## IRQ safety
15//!
16//! These channels are usable from IRQ context on Zephyr in very limited situations.  Notably, all
17//! of the following must be true:
18//! - The channel has been created with `bounded()`, which pre-allocates all of the messages.
19//! - If the type `T` has a Drop implementation, this implementation can be called from IRQ context.
20//! - Only `try_send` or `try_recv` are used on the channel.
21//!
22//! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops
23//! received message.  If the message is *always* sent over another channel or otherwise not
24//! dropped, it *might* be safe to use these messages.
25//!
26//! ## Dropping of Sender/Receiver
27//!
28//! Crossbeam channels support detecting when all senders or all receivers have been dropped on a
29//! channel, which will cause the handles on the other end to error, including waking up current
30//! threads waiting on those channels.
31//!
32//! At this time, this isn't implementable in Zephyr, as there is no API to wake up all threads
33//! blocked on a given `k_queue`.  As such, this scenario is not supported.  What actually happens
34//! is that when all senders or receivers on a channel are dropped, operations on the other end of
35//! the channel may just block (or queue forever with unbounded queues).  If all handles (both
36//! sender and receiver) are dropped, the last drop will cause a panic.  It maybe be better to just
37//! leak the entire channel, as any data associated with the channels would be leaked at this point,
38//! including the underlying Zephyr `k_queue`.  Until APIs are added to Zephyr to allow the channel
39//! information to be safely freed, these can't actually be freed.
40
41extern crate alloc;
42
43use alloc::boxed::Box;
44
45use core::cell::UnsafeCell;
46use core::ffi::c_void;
47use core::fmt;
48use core::marker::PhantomData;
49use core::mem::MaybeUninit;
50use core::pin::Pin;
51
52use crate::sys::queue::Queue;
53use crate::time::{Forever, NoWait, Timeout};
54
55mod counter;
56
57// The zephyr queue does not allocate or manage the data of the messages, so we need to handle
58// allocation as such as well.  However, we don't need to manage anything, so it is sufficient to
59// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap
60// it back into a Box, and give it to the recipient.
61
62/// Create a multi-producer multi-consumer channel of unbounded capacity, using an existing Queue
63/// object.
64///
65/// The messages are allocated individually as "Box", and the queue is managed by the underlying
66/// Zephyr queue.
67pub fn unbounded_from<T>(queue: Queue) -> (Sender<T>, Receiver<T>) {
68    let (s, r) = counter::new(queue);
69    let s = Sender {
70        flavor: SenderFlavor::Unbounded {
71            queue: s,
72            _phantom: PhantomData,
73        },
74    };
75    let r = Receiver {
76        flavor: ReceiverFlavor::Unbounded {
77            queue: r,
78            _phantom: PhantomData,
79        },
80    };
81    (s, r)
82}
83
84/// Create a multi-producer multi-consumer channel of unbounded capacity.
85///
86/// The messages are allocated individually as "Box".  The underlying Zephyr queue will be
87/// dynamically allocated.
88///
89/// **Note**: Currently Drop is not propertly supported on Zephyr.  If all senders are dropped, any
90/// receivers will likely be blocked forever.  Any data that has been queued and not received will
91/// be leaked when all receivers have been droped.
92pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
93    unbounded_from(Queue::new())
94}
95
96/// Create a multi-producer multi-consumer channel with bounded capacity.
97///
98/// The messages are allocated at channel creation time.  If there are no messages at `send` time,
99/// send will block (possibly waiting for a timeout).
100///
101/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called
102/// a rendezvous, where both threads wait until in the same region.  `bounded` will panic if called
103/// with a capacity of zero.
104pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
105    if cap == 0 {
106        panic!("Zero capacity queues are not supported on Zephyr");
107    }
108
109    let (s, r) = counter::new(Bounded::new(cap));
110    let s = Sender {
111        flavor: SenderFlavor::Bounded(s),
112    };
113    let r = Receiver {
114        flavor: ReceiverFlavor::Bounded(r),
115    };
116    (s, r)
117}
118
119/// The underlying type for Messages through Zephyr's [`Queue`].
120///
121/// This wrapper is used internally to wrap user messages through the queue.  It is not useful in
122/// safe code, but may be useful for implementing other types of message queues.
123#[repr(C)]
124pub struct Message<T> {
125    /// The private data used by the kernel to enqueue messages and such.
126    _private: usize,
127    /// The actual data being transported.
128    data: T,
129}
130
131impl<T> Message<T> {
132    fn new(data: T) -> Message<T> {
133        Message { _private: 0, data }
134    }
135}
136
137/// The sending side of a channel.
138pub struct Sender<T> {
139    flavor: SenderFlavor<T>,
140}
141
142// SAFETY: We implement Send and Sync for the Sender itself, as long as the underlying data can be
143// sent.  The underlying zephyr primitives used for the channel provide the Sync safety.
144unsafe impl<T: Send> Send for Sender<T> {}
145unsafe impl<T: Send> Sync for Sender<T> {}
146
147impl<T> Sender<T> {
148    /// Waits for a message to be sent into the channel, but only for a limited time.
149    ///
150    /// This call will block until the send operation can proceed or the operation times out.
151    ///
152    /// For unbounded channels, this will perform an allocation (and always send immediately).  For
153    /// bounded channels, no allocation will be performed.
154    pub fn send_timeout<D>(&self, msg: T, timeout: D) -> Result<(), SendError<T>>
155    where
156        D: Into<Timeout>,
157    {
158        match &self.flavor {
159            SenderFlavor::Unbounded { queue, .. } => {
160                let msg = Box::new(Message::new(msg));
161                let msg = Box::into_raw(msg);
162                // SAFETY: Zephyr requires, for as long as the message remains in the queue, that
163                // the first `usize` of the message be available for its use, and that the message
164                // not be moved.  The `into_raw` of the box consumes the box, so this is entirely a
165                // raw pointer with no references from the Rust code.  The item is not used until it
166                // has been removed from the queue.
167                unsafe {
168                    queue.send(msg as *mut c_void);
169                }
170            }
171            SenderFlavor::Bounded(chan) => {
172                // Retrieve a message buffer from the free list.
173                // SAFETY: Please see the safety discussion on `Bounded` on what makes this safe.
174                let buf = unsafe { chan.free.recv(timeout) };
175                if buf.is_null() {
176                    return Err(SendError(msg));
177                }
178                let buf = buf as *mut Message<T>;
179                unsafe {
180                    buf.write(Message::new(msg));
181                    chan.chan.send(buf as *mut c_void);
182                }
183            }
184        }
185        Ok(())
186    }
187
188    /// Sends a message over the given channel.  Waiting if necessary.
189    ///
190    /// For unbounded channels, this will allocate space for a message, and immediately send it.
191    /// For bounded channels, this will block until a message slot is available, and then send the
192    /// message.
193    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
194        self.send_timeout(msg, Forever)
195    }
196
197    /// Attempts to send a message into the channel without blocking.
198    ///
199    /// This message will either send a message into the channel immediately or return an error if
200    /// the channel is full.  The returned error contains the original message.
201    pub fn try_send(&self, msg: T) -> Result<(), SendError<T>> {
202        self.send_timeout(msg, NoWait)
203    }
204}
205
206impl<T> Drop for Sender<T> {
207    fn drop(&mut self) {
208        match &self.flavor {
209            SenderFlavor::Unbounded { queue, .. } => {
210                // SAFETY: It is not possible to free from Zephyr queues.  This means drop has to
211                // either leak or panic.  We will panic for now.
212                unsafe {
213                    queue.release(|_| {
214                        panic!("Unbounded queues cannot currently be dropped");
215                    })
216                }
217            }
218            SenderFlavor::Bounded(chan) => {
219                // SAFETY: It is not possible to free from Zephyr queues.  This means drop has to
220                // either leak or panic.  We will panic for now.
221                unsafe {
222                    chan.release(|_| {
223                        panic!("Bounded queues cannot currently be dropped");
224                    })
225                }
226            }
227        }
228    }
229}
230
231impl<T> Clone for Sender<T> {
232    fn clone(&self) -> Self {
233        let flavor = match &self.flavor {
234            SenderFlavor::Unbounded { queue, .. } => SenderFlavor::Unbounded {
235                queue: queue.acquire(),
236                _phantom: PhantomData,
237            },
238            SenderFlavor::Bounded(chan) => SenderFlavor::Bounded(chan.acquire()),
239        };
240
241        Sender { flavor }
242    }
243}
244
245/// The "flavor" of a sender.  This maps to the type of channel.
246enum SenderFlavor<T> {
247    /// An unbounded queue.  Messages are allocated with Box, and sent directly.
248    Unbounded {
249        queue: counter::Sender<Queue>,
250        _phantom: PhantomData<T>,
251    },
252    Bounded(counter::Sender<Bounded<T>>),
253}
254
255impl<T: fmt::Debug> fmt::Debug for Sender<T> {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        write!(f, "Sender")
258    }
259}
260
261/// The receiving side of a channel.
262pub struct Receiver<T> {
263    flavor: ReceiverFlavor<T>,
264}
265
266// SAFETY: We implement Send and Sync for the Receiver itself, as long as the underlying data can be
267// sent.  The underlying zephyr primitives used for the channel provide the Sync safety.
268unsafe impl<T: Send> Send for Receiver<T> {}
269unsafe impl<T: Send> Sync for Receiver<T> {}
270
271impl<T> Receiver<T> {
272    /// Waits for a message to be received from the channel, but only for a limited time.
273    ///
274    /// If the channel is empty and not disconnected, this call will block until the receive
275    /// operation can proceed or the operation times out.
276    /// wake up and return an error.
277    pub fn recv_timeout<D>(&self, timeout: D) -> Result<T, RecvError>
278    where
279        D: Into<Timeout>,
280    {
281        match &self.flavor {
282            ReceiverFlavor::Unbounded { queue, .. } => {
283                // SAFETY: Messages were sent by converting a Box through `into_raw()`.
284                let msg = unsafe {
285                    let msg = queue.recv(timeout);
286                    if msg.is_null() {
287                        return Err(RecvError);
288                    }
289                    msg
290                };
291                let msg = msg as *mut Message<T>;
292                // SAFETY: After receiving the message from the queue's `recv` method, Zephyr will
293                // no longer use the `usize` at the beginning, and it is safe for us to convert the
294                // message back into a box, copy the field out of it, an allow the Box itself to be
295                // freed.
296                let msg = unsafe { Box::from_raw(msg) };
297                Ok(msg.data)
298            }
299            ReceiverFlavor::Bounded(chan) => {
300                // SAFETY: Please see the safety discussion on Bounded.
301                let rawbuf = unsafe {
302                    let buf = chan.chan.recv(timeout);
303                    if buf.is_null() {
304                        return Err(RecvError);
305                    }
306                    buf
307                };
308                let buf = rawbuf as *mut Message<T>;
309                let msg: Message<T> = unsafe { buf.read() };
310                unsafe {
311                    chan.free.send(buf as *mut c_void);
312                }
313                Ok(msg.data)
314            }
315        }
316    }
317
318    /// Blocks the current thread until a message is received or the channel is empty and
319    /// disconnected.
320    ///
321    /// If the channel is empty and not disconnected, this call will block until the receive
322    /// operation can proceed.
323    pub fn recv(&self) -> Result<T, RecvError> {
324        self.recv_timeout(Forever)
325    }
326
327    /// Attempts to receive a message from the channel without blocking.
328    ///
329    /// This method will either receive a message from the channel immediately, or return an error
330    /// if the channel is empty.
331    ///
332    /// This method is safe to use from IRQ context, if and only if the channel was created as a
333    /// bounded channel.
334    pub fn try_recv(&self) -> Result<T, RecvError> {
335        self.recv_timeout(NoWait)
336    }
337}
338
339impl<T> Drop for Receiver<T> {
340    fn drop(&mut self) {
341        match &self.flavor {
342            ReceiverFlavor::Unbounded { queue, .. } => {
343                // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
344                // Chose panic for now.
345                unsafe {
346                    queue.release(|_| {
347                        panic!("Unnbounded channel cannot be dropped");
348                    })
349                }
350            }
351            ReceiverFlavor::Bounded(chan) => {
352                // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
353                // Chose panic for now.
354                unsafe {
355                    chan.release(|_| {
356                        panic!("Bounded channels cannot be dropped");
357                    })
358                }
359            }
360        }
361    }
362}
363
364impl<T> Clone for Receiver<T> {
365    fn clone(&self) -> Self {
366        let flavor = match &self.flavor {
367            ReceiverFlavor::Unbounded { queue, .. } => ReceiverFlavor::Unbounded {
368                queue: queue.acquire(),
369                _phantom: PhantomData,
370            },
371            ReceiverFlavor::Bounded(chan) => ReceiverFlavor::Bounded(chan.acquire()),
372        };
373
374        Receiver { flavor }
375    }
376}
377
378impl<T> fmt::Debug for Receiver<T> {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        write!(f, "Sender")
381    }
382}
383
384/// The "flavor" of a receiver.  This maps to the type of the channel.
385enum ReceiverFlavor<T> {
386    /// An unbounded queue.  Messages were allocated with Box, and will be freed upon receipt.
387    Unbounded {
388        queue: counter::Receiver<Queue>,
389        _phantom: PhantomData<T>,
390    },
391    Bounded(counter::Receiver<Bounded<T>>),
392}
393
394type Slot<T> = UnsafeCell<MaybeUninit<Message<T>>>;
395
396// SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box.
397// This Box is held for an eventual future implementation that is able to free the messages, once
398// they have all been taken from Zephyr's knowledge.  For now, they could also be leaked.
399// It is a `Pin<Box<...>>` because it is important that the data never be moved, as we maintain
400// pointers to the items in Zephyr queues.
401//
402// There are two `Queue`s used here: `free` is the free list of messages that are not being sent,
403// and `chan` for messages that have been sent but not received.  Initially, all slots are placed on
404// the `free` queue.  At any time, outside of the calls in this module, each slot must live inside
405// of one of the two queues.  This means that the messages cannot be moved or accessed, except
406// inside of the individual send/receive operations.  Zephyr makes use of the initial `usize` field
407// at the beginning of each Slot.
408//
409// We use MaybeUninit for the messages to avoid needing to initialize the messages.  The individual
410// messages are accessed through pointers when they are retrieved from the Zephyr `Queue`, so these
411// values are never marked as initialized.
412/// Bounded channel implementation.
413struct Bounded<T> {
414    /// The messages themselves.  This Box owns the allocation of the messages, although it is
415    /// unsafe to drop this with any messages stored in either of the Zephyr queues.
416    ///
417    /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware
418    /// of.  MaybeUninit allows us to create this without allocation.
419    _slots: Pin<Box<[Slot<T>]>>,
420    /// The free queue, holds messages that aren't be used.  The free queue has to be in a box,
421    /// because it cannot move after the constructor runs.  The chan is fine to wait until first
422    /// use, when the object has settled.  As we are also boxing the messages, this isn't really
423    /// that costly.
424    free: Box<Queue>,
425    /// The channel queue.  These are messages that have been sent and are waiting to be received.
426    chan: Queue,
427}
428
429impl<T> Bounded<T> {
430    fn new(cap: usize) -> Self {
431        let slots: Box<[Slot<T>]> = (0..cap)
432            .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
433            .collect();
434        let slots = Box::into_pin(slots);
435
436        let free = Box::new(Queue::new());
437        let chan = Queue::new();
438
439        // Add each of the boxes to the free list.
440        for slot in slots.as_ref().iter() {
441            // SAFETY: See safety discussion on `Bounded`.
442            unsafe {
443                free.send(slot.get() as *mut c_void);
444            }
445        }
446
447        Bounded {
448            _slots: slots,
449            free,
450            chan,
451        }
452    }
453}
454
455// TODO: Move to err
456
457/// An error returned from the [`send`] method.
458///
459/// The message could not be sent because the channel is disconnected.
460///
461/// The error contains the message so it can be recovered.
462///
463/// [`send`]: Sender::send
464#[derive(PartialEq, Eq, Clone, Copy)]
465pub struct SendError<T>(pub T);
466
467impl<T> fmt::Debug for SendError<T> {
468    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469        "SendError(..)".fmt(f)
470    }
471}
472
473/// An error returned from the [`recv`] method.
474///
475/// A message could not be received because the channel is empty and disconnected.
476///
477/// [`recv`]: Receiver::recv
478#[derive(PartialEq, Eq, Clone, Copy, Debug)]
479pub struct RecvError;