minint/
lib.rs

1//!
2//! # MiniNT
3//!
4//! A simple NetworkTables library implemented in Rust
5//!
6//! NetworkTables is a pub-sub messaging system used for FRC.
7//!
8//! The entrypoint is [NtConn].
9//!
10
11// TODO: this needs some cleanup
12
13#[macro_use]
14extern crate tracing;
15extern crate rmp;
16extern crate serde;
17extern crate serde_json;
18extern crate tokio;
19extern crate tokio_tungstenite;
20extern crate quanta;
21
22mod datatype;
23mod error;
24mod messages;
25
26pub use error::{NtError, Result};
27use tokio::time::{interval, Interval};
28
29use std::collections::{BTreeMap, HashMap};
30use std::marker::PhantomData;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{BsInt, Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use rmp::decode::Bytes;
40use tokio::net::TcpStream;
41use tokio::sync::{watch, RwLock};
42use tokio::{
43    sync::{mpsc, Mutex},
44    task::AbortHandle,
45};
46use tokio_tungstenite::tungstenite::{
47    client::IntoClientRequest,
48    http::{header, HeaderValue},
49    Error as TungsteniteError, Message,
50};
51use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
52use quanta::{Clock, Instant};
53
54// I wanna keep my face on today
55
56async fn reconnector(
57    mut rx: mpsc::Receiver<()>,
58    server: String,
59    client_ident: String,
60    sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
61    sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
62) -> Result<()> {
63    // Build the WebSocket URL and turn it into tungstenite's client req type
64    let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
65
66    // Add header as specified in WPILib's spec
67    req.headers_mut().append(
68        header::SEC_WEBSOCKET_PROTOCOL,
69        HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
70    );
71
72    loop {
73        rx.recv().await;
74        {
75            let mut sock_rd = sock_rd.write().await;
76            let mut sock_wr = sock_wr.write().await;
77
78            *sock_rd = None;
79            if let Some(sock_wr) = sock_wr.as_mut() {
80                sock_wr.close().await.unwrap();
81            }
82
83            // Repeatedly attempt to connect to the server
84            loop {
85                match tokio_tungstenite::connect_async(req.clone()).await {
86                    Ok((sock, _)) => {
87                        let (sock_wr_, sock_rd_) = sock.split();
88
89                        {
90                            *sock_rd = Some(sock_rd_);
91                            *sock_wr = Some(sock_wr_);
92                        }
93
94                        info!("connected");
95                        break;
96                    }
97                    Err(err) => {
98                        error!("failed to connect: {err:?}");
99                    }
100                }
101                tokio::time::sleep(Duration::from_millis(100)).await;
102            }
103        }
104
105        // Clear remaining trash
106        while let Some(_) = rx.recv().await {}
107    }
108}
109
110/// A NetworkTables connection
111pub struct NtConn {
112    start_time: Instant,
113    offset: Arc<RwLock<Duration>>,
114
115    reconnect_tx: mpsc::Sender<()>,
116
117    /// Next sequential ID
118    next_id: Arc<Mutex<i32>>,
119
120    /// Outgoing client-to-server message queue
121    c2s_tx: mpsc::UnboundedSender<Message>,
122
123    /// Incoming request receiver event loop abort handle
124    incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
125    /// Outgoing request sender event loop abort handle
126    outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
127    task_abort: Arc<RwLock<Option<AbortHandle>>>,
128
129    sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
130    sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
131
132    server_client: Arc<RwLock<HashMap<i32, i32>>>,
133    client_server: Arc<RwLock<HashMap<i32, i32>>>,
134
135    /// Mapping from topic names to topic IDs for topics we've received from server
136    server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
137
138    values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
139    value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
140}
141impl NtConn {
142    /// Connect to a NetworkTables server
143    ///
144    /// # Arguments
145    ///
146    /// * `server` - The IP address of the NetworkTables server.
147    /// * `client_ident` - The client identifier to use for this connection.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use minint::NtConn;
153    ///
154    /// #[tokio::main]
155    /// async fn main() {
156    ///     // Connect to the NetworkTables server at 10.0.0.2
157    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
158    ///
159    ///     // ...
160    /// }
161    /// ```
162    pub async fn new(server: impl Into<String>, client_ident: impl Into<String>) -> Result<Self> {
163        let server = server.into();
164        let client_ident = client_ident.into();
165
166        let client_server = Arc::new(RwLock::new(HashMap::new()));
167        let server_client = Arc::new(RwLock::new(HashMap::new()));
168        let sock_wr: Arc<
169            RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
170        > = Arc::new(RwLock::new(None));
171        let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
172            Arc::new(RwLock::new(None));
173
174        // Setup channels for control
175        let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
176
177        let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(1);
178        let sock_rd_ = sock_rd.clone();
179        let sock_wr_ = sock_wr.clone();
180        tokio::spawn(async move {
181            reconnector(reconnect_rx, server, client_ident, sock_rd_, sock_wr_)
182                .await
183                .unwrap();
184        });
185
186        let server_topics = Arc::new(RwLock::new(HashMap::new()));
187        let values = Arc::new(RwLock::new(HashMap::new()));
188        let value_tx = Arc::new(RwLock::new(HashMap::new()));
189
190        let start_time = Instant::now();
191
192        let conn = Self {
193            start_time,
194            offset: Arc::new(RwLock::new(Duration::ZERO)),
195
196            reconnect_tx,
197            next_id: Arc::new(Mutex::const_new(0)),
198
199            client_server,
200            server_client,
201
202            c2s_tx,
203
204            sock_rd,
205            sock_wr,
206
207            incoming_abort: Arc::new(RwLock::new(None)),
208            outgoing_abort: Arc::new(RwLock::new(None)),
209            task_abort: Arc::new(RwLock::new(None)),
210
211            server_topics,
212            values,
213            value_tx,
214        };
215
216        trace!("initializing background event loop...");
217        conn.init_background_event_loops(c2s_rx).await;
218        trace!("initialized background event loop...");
219
220        conn.reconnect_tx.send(()).await.unwrap();
221
222        Ok(conn)
223    }
224
225    /// Initialize the background event loops
226    async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
227        // Spawn event loop to read and process incoming messages
228
229        let mut incoming_abort = self.incoming_abort.write().await;
230        let mut outgoing_abort = self.outgoing_abort.write().await;
231        let mut task_abort = self.task_abort.write().await;
232
233        if (*incoming_abort).is_none() {
234            let conn = self.clone();
235
236            let jh = tokio::spawn(async move {
237                loop {
238                    if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
239                        while let Some(msg) = sock_rd.next().await {
240                            match conn.handle_incoming_msg(msg).await {
241                                Err(NtError::NeedReconnect) => {
242                                    conn.reconnect_tx.send(()).await;
243                                    break;
244                                }
245                                _ => {}
246                            }
247                        }
248                    }
249
250                    tokio::task::yield_now().await;
251                }
252            });
253
254            *incoming_abort = Some(jh.abort_handle());
255        }
256
257        // Spawn event loop to send outgoing messages
258        if (*outgoing_abort).is_none() {
259            let conn = self.clone();
260
261            let jh = tokio::spawn(async move {
262                loop {
263                    while let Some(outgoing) = c2s_rx.recv().await {
264                        trace!("sending {outgoing:?}");
265
266                        if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
267                            match sock_wr.send(outgoing).await {
268                                Ok(()) => {
269                                    trace!("sent outgoing message successfully");
270                                }
271                                Err(TungsteniteError::ConnectionClosed)
272                                | Err(TungsteniteError::Io(_))
273                                | Err(TungsteniteError::AlreadyClosed)
274                                | Err(TungsteniteError::Protocol(_)) => {
275                                    conn.reconnect_tx.send(()).await;
276                                    break;
277                                }
278                                Err(err) => {
279                                    error!("error writing outgoing message: {err:?}");
280                                }
281                            }
282
283                            trace!("sent");
284                        }
285                    }
286
287                    tokio::task::yield_now().await;
288                }
289            });
290
291            *outgoing_abort = Some(jh.abort_handle());
292        }
293
294        if (*task_abort).is_none() {
295            let conn = self.clone();
296
297            let jh = tokio::spawn(async move {
298                let mut ping_interval = interval(Duration::from_millis(500));
299                let mut time_correct_interval = interval(Duration::from_secs(3));
300
301                loop {
302                    tokio::select! {
303                        _ = ping_interval.tick() => conn.ping().await.unwrap(),
304                        _ = time_correct_interval.tick() => conn.time_correct().await.unwrap(),
305                    }
306
307                    //    if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
308                    //        match sock_wr.send(outgoing).await {
309                    //            Ok(()) => {
310                    //                trace!("sent outgoing message successfully");
311                    //            }
312                    //            Err(TungsteniteError::ConnectionClosed)
313                    //            | Err(TungsteniteError::Io(_))
314                    //            | Err(TungsteniteError::AlreadyClosed)
315                    //            | Err(TungsteniteError::Protocol(_)) => {
316                    //                conn.reconnect_tx.send(()).await;
317                    //                break;
318                    //            }
319                    //            Err(err) => {
320                    //                error!("error writing outgoing message: {err:?}");
321                    //            }
322                    //        }
323
324                    //        trace!("sent");
325                    //    }
326                    //}
327
328                    tokio::task::yield_now().await;
329                }
330            });
331
332            *task_abort = Some(jh.abort_handle());
333        }
334    }
335
336    async fn ping(&self) -> Result<()> {
337        self.c2s_tx.send(Message::Ping(tungstenite::Bytes::from_static(b"sigma sigma boy"))).unwrap();
338
339        Ok(())
340    }
341    async fn time_correct(&self) -> Result<()> {
342        self.write_bin_frame::<BsInt>(-1, Duration::ZERO.as_micros() as u64, (self.start_time.elapsed().as_micros() as u64).into()).unwrap();
343
344        Ok(())
345    }
346
347    async fn handle_incoming_msg(
348        &self,
349        msg: core::result::Result<Message, TungsteniteError>,
350    ) -> Result<()> {
351        match msg {
352            Ok(Message::Text(json)) => {
353                let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
354
355                for msg in messages {
356                    match msg {
357                        ServerMsg::Announce {
358                            name,
359                            id,
360                            r#type,
361                            pubuid,
362                            ..
363                        } => {
364                            // Store server topic info
365                            self.server_topics
366                                .write()
367                                .await
368                                .insert(name.clone(), (id, r#type.clone()));
369
370                            if let Some(pubuid) = pubuid {
371                                (*self.client_server.write().await).insert(pubuid, id);
372                                (*self.server_client.write().await).insert(id, pubuid);
373
374                                debug!(
375                                    "{name} ({type}): published successfully with topic id {id}"
376                                );
377                            } else {
378                                debug!("{name} ({type}): announced with topic id {id}");
379                            }
380                        }
381                        ServerMsg::Unannounce {
382                            name,
383                            id: server_id,
384                        } => {
385                            if let Some(pubuid) = self.server_client.read().await.get(&server_id) {
386                                self.client_server.write().await.remove(pubuid);
387                            }
388                            self.server_client.write().await.remove(&server_id);
389
390                            debug!("{name}: unannounced");
391                        }
392                        _ => unimplemented!(),
393                    }
394                }
395            }
396            Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
397                Ok((topic_id, timestamp, data)) => {
398                    trace!(
399                        "received binary frame with topic_id {}, ts={}",
400                        topic_id,
401                        timestamp
402                    );
403
404                    if topic_id == -1 {
405                        let curr_ts = self.start_time.elapsed();
406                        if let Data::Int(BsInt::U64(pre_ts)) = data {
407                            let rtt = curr_ts - Duration::from_micros(pre_ts);
408                            *self.offset.write().await = Duration::from_micros(timestamp) + (rtt/2);
409                        }
410                    }
411
412
413                    if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
414                        if let Err(err) = value_tx.send((timestamp, data)) {
415                            error!("failed to send value to subscriber: {err:?}");
416                        }
417                    }
418                }
419                Err(err) => {
420                    error!("Failed to parse binary frame: {}", err);
421                }
422            },
423            Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
424            Err(TungsteniteError::ConnectionClosed)
425            | Err(TungsteniteError::Io(_))
426            | Err(TungsteniteError::AlreadyClosed)
427            | Err(TungsteniteError::Protocol(_)) => {
428                return Err(NtError::NeedReconnect);
429            }
430            Err(err) => error!("error reading incoming message: {err:?}"),
431        }
432
433        Ok(())
434    }
435
436    async fn next_id(&self) -> i32 {
437        let next = &mut *self.next_id.lock().await;
438        let curr = (*next).clone();
439        *next += 1;
440
441        curr
442    }
443
444    /// Publish a topic
445    ///
446    /// The topic will be unpublished when the [NtTopic] is dropped.
447    ///
448    /// # Arguments
449    ///
450    /// * `name` - The name of the topic to publish.
451    ///
452    /// # Type Parameters
453    ///
454    /// * `T` - The type of data to be published on the topic. Must implement the `DataType` trait.
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// use minint::{NtConn, datatype::DataType};
460    ///
461    /// #[tokio::main]
462    /// async fn main() {
463    ///     // Connect to the NetworkTables server
464    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
465    ///
466    ///     // Publish a new topic named "my_topic" with data type f64
467    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
468    ///
469    ///     // ...
470    /// }
471    /// ```
472    pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
473        let pubuid = self.next_id().await;
474        let name = name.into();
475
476        trace!("publishing {name} with pubuid {pubuid}");
477
478        self.publish_::<T>(name.clone(), pubuid).await?;
479
480        Ok(NtTopic {
481            conn: &*self,
482            name,
483            pubuid,
484            _marker: PhantomData,
485        })
486    }
487    async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
488        trace!("publishing {name} with pubuid {pubuid}");
489
490        let buf = serde_json::to_string(&[ClientMsg::Publish {
491            pubuid,
492            name: name.clone(),
493            r#type: T::STRING.to_string(),
494            properties: Some(PublishProps {
495                persistent: Some(false),
496                retained: Some(false),
497            }),
498        }])?;
499
500        self.c2s_tx
501            .send(Message::Text(buf.into()))
502            .map_err(|e| NtError::SendError(e.to_string()))
503            .unwrap();
504
505        debug!(
506            "{name} ({data_type}): publishing with pubuid {pubuid}",
507            data_type = T::STRING.to_string()
508        );
509
510        Ok(())
511    }
512
513    /// Unpublish topic
514    ///
515    /// This method is typically called when an `NtTopic` is dropped.
516    fn unpublish(&self, pubuid: i32) -> Result<()> {
517        let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
518        self.c2s_tx
519            .send(Message::Text(buf.into()))
520            .map_err(|e| NtError::SendError(e.to_string()))?;
521
522        Ok(())
523    }
524
525    /// Subscribe to a topic
526    ///
527    /// # Arguments
528    ///
529    /// * `topic` - The name of the topic to subscribe to.
530    ///
531    /// # Examples
532    ///
533    /// ```
534    /// use minint::NtConn;
535    ///
536    /// #[tokio::main]
537    /// async fn main() {
538    ///     // Connect to the NetworkTables server
539    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
540    ///
541    ///     // Subscribe to the topic named "my_topic"
542    ///     let subscription = conn.subscribe("my_topic").await.unwrap();
543    ///
544    ///     // ...
545    /// }
546    /// ```
547    pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
548        let subuid = self.next_id().await;
549
550        let buf = serde_json::to_string(&[ClientMsg::Subscribe {
551            topics: Vec::from_iter([topic.to_string()]),
552            subuid,
553            options: BTreeMap::new(),
554        }])?;
555        self.c2s_tx
556            .send(Message::Text(buf.into()))
557            .map_err(|e| NtError::SendError(e.to_string()))?;
558
559        Ok(NtSubscription {
560            conn: &*self,
561            subuid,
562        })
563    }
564
565    /// Unsubscribe from a topic
566    ///
567    /// This method is typically called when an `NtSubscription` is dropped.
568    fn unsubscribe(&self, subuid: i32) -> Result<()> {
569        let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
570        self.c2s_tx
571            .send(Message::Text(buf.into()))
572            .map_err(|e| NtError::SendError(e.to_string()))?;
573
574        Ok(())
575    }
576
577    /// Read/parse a binary frame
578    ///
579    /// This method is used internally to process incoming data values for subscribed topics.
580    ///
581    /// Returns `(uid, timestamp, data)`
582    fn read_bin_frame(buf: Vec<u8>) -> Result<(i32, u64, Data)> {
583        let mut bytes = Bytes::new(&buf);
584        let len = rmp::decode::read_array_len(&mut bytes)?;
585
586        if len == 4 {
587            let uid = rmp::decode::read_i32(&mut bytes)?;
588            let ts = rmp::decode::read_u64(&mut bytes)?;
589            let data_type = rmp::decode::read_u8(&mut bytes)?;
590            let data = Data::from(&mut bytes, data_type)
591                .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
592
593            Ok((uid, ts, data))
594        } else {
595            Err(NtError::BinaryFrameError)
596        }
597    }
598
599    /// Write a binary frame
600    ///
601    /// This method is used internally to send data values to the NetworkTables server.
602    fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
603        let mut buf = Vec::new();
604        rmp::encode::write_array_len(&mut buf, 4)?;
605
606        rmp::encode::write_i32(&mut buf, uid)?;
607        rmp::encode::write_uint(&mut buf, ts)?;
608        rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
609        T::encode(&mut buf, value).map_err(|_| {
610            NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
611        })?;
612
613        self.c2s_tx
614            .send(Message::Binary(buf.into()))
615            .map_err(|e| NtError::SendError(e.to_string()))?;
616
617        Ok(())
618    }
619
620    /// Shutdown the connection
621    ///
622    /// This method stops the event loops for sending and receiving messages. All `NtTopic`
623    /// instances associated with this connection must be dropped before calling this method.
624    pub async fn stop(self) {
625        // Attempt to unwrap and use incoming and outgoing abort handles
626
627        if let Some(ah) = self.incoming_abort.read().await.as_ref() {
628            ah.abort();
629        }
630        if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
631            ah.abort();
632        }
633    }
634}
635impl Clone for NtConn {
636    fn clone(&self) -> Self {
637        Self {
638            start_time: self.start_time.clone(),
639            offset: self.offset.clone(),
640
641            reconnect_tx: self.reconnect_tx.clone(),
642            next_id: self.next_id.clone(),
643
644            incoming_abort: self.incoming_abort.clone(),
645            outgoing_abort: self.outgoing_abort.clone(),
646            task_abort: self.task_abort.clone(),
647
648            c2s_tx: self.c2s_tx.clone(),
649
650            sock_wr: self.sock_wr.clone(),
651            sock_rd: self.sock_rd.clone(),
652
653            client_server: self.client_server.clone(),
654            server_client: self.server_client.clone(),
655
656            server_topics: self.server_topics.clone(),
657            values: self.values.clone(),
658            value_tx: self.value_tx.clone(),
659        }
660    }
661}
662
663/// A NetworkTables topic
664///
665/// This structure represents a published topic on the NetworkTables server. It allows you to set
666/// the value of the topic. The topic is automatically unpublished when this structure is dropped.
667pub struct NtTopic<'nt, T: DataWrap> {
668    conn: &'nt NtConn,
669    name: String,
670    pubuid: i32,
671    _marker: PhantomData<T>,
672}
673impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
674    /// Set the value of the topic.
675    ///
676    /// # Arguments
677    ///
678    /// * `val` - The new value to set the topic to.
679    ///
680    /// # Examples
681    ///
682    /// ```
683    /// use minint::{NtConn, datatype::DataType};
684    ///
685    /// #[tokio::main]
686    /// async fn main() {
687    ///     // Connect to the NetworkTables server
688    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
689    ///
690    ///     // Publish a new topic
691    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
692    ///
693    ///     // Set the value of the topic
694    ///     topic.set(3.14159).await.unwrap();
695    ///
696    ///     // ...
697    /// }
698    /// ```
699    pub async fn set(&mut self, val: T) -> Result<()> {
700        debug!(
701            "{name} ({data_type}): set to {val:?}",
702            name = self.name,
703            data_type = T::STRING.to_string()
704        );
705
706        if !self
707            .conn
708            .client_server
709            .read()
710            .await
711            .contains_key(&self.pubuid)
712        {
713            self.conn
714                .publish_::<T>(self.name.clone(), self.pubuid)
715                .await?;
716        }
717
718        trace!("writing binary frame");
719        match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
720            Err(_) => {}
721            _ => {}
722        }
723
724        Ok(())
725    }
726}
727impl<T: DataWrap> Drop for NtTopic<'_, T> {
728    fn drop(&mut self) {
729        if let Err(e) = self.conn.unpublish(self.pubuid) {
730            error!("Failed to unpublish topic: {}", e);
731        }
732    }
733}
734
735/// A NetworkTables subscription
736///
737/// This structure represents a subscription to a topic on the NetworkTables server. It is
738/// automatically unsubscribed when this structure is dropped.
739pub struct NtSubscription<'nt> {
740    conn: &'nt NtConn,
741    subuid: i32,
742}
743impl NtSubscription<'_> {
744    pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
745        Ok(self.conn.values.read().await.get(&self.subuid).cloned())
746    }
747}
748impl Drop for NtSubscription<'_> {
749    fn drop(&mut self) {
750        self.conn.unsubscribe(self.subuid).unwrap();
751    }
752}