1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use tokio::sync::{broadcast, mpsc};
pub struct Broadcaster<R: Clone = (), A = ()> {
broadcaster: broadcast::Sender<R>,
shutdown_watcher: mpsc::UnboundedReceiver<A>,
mpsc_copy: mpsc::UnboundedSender<A>,
}
impl<R: Clone, A> Broadcaster<R, A> {
pub fn new() -> Self {
let (b_sender, _) = broadcast::channel(1);
let (mpsc_sender, mpsc_receiver) = mpsc::unbounded_channel();
Self {
broadcaster: b_sender,
shutdown_watcher: mpsc_receiver,
mpsc_copy: mpsc_sender,
}
}
pub fn new_listener(&self) -> Listener<R, A> {
let b_receiver = self.broadcaster.subscribe();
let mpsc_sender = self.mpsc_copy.clone();
Listener {
b_receiver,
mpsc_sender,
shutdown_reason: None,
}
}
pub fn num_listeners(&self) -> usize {
self.broadcaster.receiver_count()
}
pub fn signal_shutdown(self, reason: Option<R>) -> mpsc::UnboundedReceiver<A> {
if let Some(r) = reason {
self.broadcaster.send(r).ok();
}
self.shutdown_watcher
}
}
impl<R: Clone, A> Default for Broadcaster<R, A> {
fn default() -> Self {
Self::new()
}
}
pub struct Listener<R: Clone = (), A = ()> {
b_receiver: broadcast::Receiver<R>,
mpsc_sender: mpsc::UnboundedSender<A>,
shutdown_reason: Option<ShutdownReason<R>>,
}
impl<R: Clone, I> Listener<R, I> {
pub async fn recv(&mut self) -> &ShutdownReason<R> {
if let Some(ref r) = self.shutdown_reason {
return r;
}
let reason = match self.b_receiver.recv().await {
Ok(r) => ShutdownReason::Reason(r),
Err(broadcast::error::RecvError::Closed) => {
ShutdownReason::BroadcasterClosed
}
Err(_) => unreachable!(
"we shouldn't be able to lag, only 1 shutdown is ever sent."
),
};
self.shutdown_reason = Some(reason);
self.shutdown_reason.as_ref().unwrap()
}
pub fn try_recv(&mut self) -> Option<&ShutdownReason<R>> {
let reason = match self.b_receiver.try_recv() {
Ok(r) => ShutdownReason::Reason(r),
Err(broadcast::error::TryRecvError::Closed) => {
ShutdownReason::BroadcasterClosed
}
Err(broadcast::error::TryRecvError::Empty) => return None,
Err(broadcast::error::TryRecvError::Lagged(_)) => {
unreachable!(
"we shouldn't be able to lag, only 1 shutdown is ever sent."
)
}
};
self.shutdown_reason = Some(reason);
self.shutdown_reason.as_ref()
}
pub fn into_reason(self) -> Option<ShutdownReason<R>> {
self.shutdown_reason
}
pub fn acknowledge(self, info: I) -> Option<ShutdownReason<R>> {
self.mpsc_sender.send(info).ok();
self.shutdown_reason
}
}
#[derive(Clone)]
pub enum ShutdownReason<R: Clone> {
BroadcasterClosed,
Reason(R),
}