123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- use super::*;
- use crate::{
- loom::{
- atomic::{self, Ordering},
- sync::Arc,
- thread::{self, Thread},
- },
- Ref, ThingBuf,
- };
- use core::fmt;
- pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
- let inner = Arc::new(Inner::new(thingbuf));
- let tx = Sender {
- inner: inner.clone(),
- };
- let rx = Receiver { inner };
- (tx, rx)
- }
- #[derive(Debug)]
- pub struct Sender<T> {
- inner: Arc<Inner<T, Thread>>,
- }
- #[derive(Debug)]
- pub struct Receiver<T> {
- inner: Arc<Inner<T, Thread>>,
- }
- impl_send_ref! {
- pub struct SendRef<Thread>;
- }
- impl_recv_ref! {
- pub struct RecvRef<Thread>;
- }
- impl<T: Default> Sender<T> {
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
- self.inner.try_send_ref().map(SendRef)
- }
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
- self.inner.try_send(val)
- }
- pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
- loop {
-
- if let Poll::Ready(result) = self.inner.poll_send_ref(thread::current) {
- return result.map(SendRef);
- }
-
- thread::park();
- }
- }
- pub fn send(&self, val: T) -> Result<(), Closed<T>> {
- match self.send_ref() {
- Err(Closed(())) => Err(Closed(val)),
- Ok(mut slot) => {
- slot.with_mut(|slot| *slot = val);
- Ok(())
- }
- }
- }
- }
- impl<T> Clone for Sender<T> {
- fn clone(&self) -> Self {
- test_dbg!(self.inner.tx_count.fetch_add(1, Ordering::Relaxed));
- Self {
- inner: self.inner.clone(),
- }
- }
- }
- impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- if test_dbg!(self.inner.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
- return;
- }
-
- test_dbg!(atomic::fence(Ordering::SeqCst));
- if self.inner.thingbuf.core.close() {
- self.inner.rx_wait.close_tx();
- }
- }
- }
- impl<T: Default> Receiver<T> {
- pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
- loop {
- match self.inner.poll_recv_ref(thread::current) {
- Poll::Ready(r) => {
- return r.map(|slot| RecvRef {
- slot,
- inner: &*self.inner,
- })
- }
- Poll::Pending => {
- test_println!("parking ({:?})", thread::current());
- thread::park();
- }
- }
- }
- }
- pub fn try_recv_ref(&self) -> Option<Ref<'_, T>> {
- self.inner.thingbuf.pop_ref()
- }
- pub fn recv(&self) -> Option<T> {
- let val = self.recv_ref()?.with_mut(core::mem::take);
- Some(val)
- }
- pub fn is_closed(&self) -> bool {
- test_dbg!(self.inner.tx_count.load(Ordering::SeqCst)) <= 1
- }
- }
- impl<'a, T: Default> Iterator for &'a Receiver<T> {
- type Item = RecvRef<'a, T>;
- fn next(&mut self) -> Option<Self::Item> {
- self.recv_ref()
- }
- }
- impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- self.inner.close_rx();
- }
- }
|