minint/
lib.rs

1//!
2//! # MiniNT
3//!
4//! A simple NetworkTables library implemented in Rust
5//!
6//! NetworkTables is a pub-sub messaging system used for FRC.
7//!
8//! The entrypoint is [NtConn].
9//!
10
11// TODO: this needs some cleanup
12
13#[macro_use]
14extern crate tracing;
15extern crate quanta;
16extern crate rmp;
17extern crate serde;
18extern crate serde_json;
19extern crate tokio;
20extern crate tokio_tungstenite;
21
22mod datatype;
23mod error;
24mod messages;
25
26pub use error::{NtError, Result};
27use tokio::time::{interval, Interval};
28
29use std::collections::{BTreeMap, HashMap};
30use std::marker::PhantomData;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{BsInt, Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use quanta::{Clock, Instant};
40use rmp::decode::Bytes;
41use tokio::net::TcpStream;
42use tokio::sync::{watch, RwLock};
43use tokio::{
44    sync::{mpsc, Mutex},
45    task::AbortHandle,
46};
47use tokio_tungstenite::tungstenite::{
48    client::IntoClientRequest,
49    http::{header, HeaderValue},
50    Error as TungsteniteError, Message,
51};
52use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
53
54// I wanna keep my face on today
55
56async fn reconnector(
57    mut rx: mpsc::Receiver<()>,
58    server: String,
59    client_ident: String,
60    sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
61    sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
62) -> Result<()> {
63    // Build the WebSocket URL and turn it into tungstenite's client req type
64    let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
65
66    // Add header as specified in WPILib's spec
67    req.headers_mut().append(
68        header::SEC_WEBSOCKET_PROTOCOL,
69        HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
70    );
71
72    loop {
73        rx.recv().await;
74        {
75            let mut sock_rd = sock_rd.write().await;
76            let mut sock_wr = sock_wr.write().await;
77
78            *sock_rd = None;
79            if let Some(sock_wr) = sock_wr.as_mut() {
80                sock_wr.close().await.unwrap();
81            }
82
83            // Repeatedly attempt to connect to the server
84            loop {
85                match tokio_tungstenite::connect_async(req.clone()).await {
86                    Ok((sock, _)) => {
87                        let (sock_wr_, sock_rd_) = sock.split();
88
89                        {
90                            *sock_rd = Some(sock_rd_);
91                            *sock_wr = Some(sock_wr_);
92                        }
93
94                        info!("connected");
95                        break;
96                    }
97                    Err(err) => {
98                        error!("failed to connect: {err:?}");
99                    }
100                }
101                tokio::time::sleep(Duration::from_millis(100)).await;
102            }
103        }
104
105        // Clear remaining trash
106        while let Some(_) = rx.recv().await {}
107    }
108}
109
110/// A NetworkTables connection
111pub struct NtConn {
112    start_time: Instant,
113    offset: Arc<RwLock<Duration>>,
114
115    reconnect_tx: mpsc::Sender<()>,
116
117    /// Next sequential ID
118    next_id: Arc<Mutex<i32>>,
119
120    /// Outgoing client-to-server message queue
121    c2s_tx: mpsc::UnboundedSender<Message>,
122
123    /// Incoming request receiver event loop abort handle
124    incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
125    /// Outgoing request sender event loop abort handle
126    outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
127    task_abort: Arc<RwLock<Option<AbortHandle>>>,
128
129    sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
130    sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
131
132    server_client: Arc<RwLock<HashMap<i32, i32>>>,
133    client_server: Arc<RwLock<HashMap<i32, i32>>>,
134
135    /// Mapping from topic names to topic IDs for topics we've received from server
136    server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
137
138    values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
139    value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
140}
141impl NtConn {
142    /// Connect to a NetworkTables server
143    ///
144    /// # Arguments
145    ///
146    /// * `server` - The IP address of the NetworkTables server.
147    /// * `client_ident` - The client identifier to use for this connection.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use minint::NtConn;
153    ///
154    /// #[tokio::main]
155    /// async fn main() {
156    ///     // Connect to the NetworkTables server at 10.0.0.2
157    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
158    ///
159    ///     // ...
160    /// }
161    /// ```
162    pub async fn new(server: impl Into<String>, client_ident: impl Into<String>) -> Result<Self> {
163        let server = server.into();
164        let client_ident = client_ident.into();
165
166        let client_server = Arc::new(RwLock::new(HashMap::new()));
167        let server_client = Arc::new(RwLock::new(HashMap::new()));
168        let sock_wr: Arc<
169            RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
170        > = Arc::new(RwLock::new(None));
171        let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
172            Arc::new(RwLock::new(None));
173
174        // Setup channels for control
175        let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
176
177        let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(1);
178        let sock_rd_ = sock_rd.clone();
179        let sock_wr_ = sock_wr.clone();
180        tokio::spawn(async move {
181            reconnector(reconnect_rx, server, client_ident, sock_rd_, sock_wr_)
182                .await
183                .unwrap();
184        });
185
186        let server_topics = Arc::new(RwLock::new(HashMap::new()));
187        let values = Arc::new(RwLock::new(HashMap::new()));
188        let value_tx = Arc::new(RwLock::new(HashMap::new()));
189
190        let start_time = Instant::now();
191
192        let conn = Self {
193            start_time,
194            offset: Arc::new(RwLock::new(Duration::ZERO)),
195
196            reconnect_tx,
197            next_id: Arc::new(Mutex::const_new(0)),
198
199            client_server,
200            server_client,
201
202            c2s_tx,
203
204            sock_rd,
205            sock_wr,
206
207            incoming_abort: Arc::new(RwLock::new(None)),
208            outgoing_abort: Arc::new(RwLock::new(None)),
209            task_abort: Arc::new(RwLock::new(None)),
210
211            server_topics,
212            values,
213            value_tx,
214        };
215
216        trace!("initializing background event loop...");
217        conn.init_background_event_loops(c2s_rx).await;
218        trace!("initialized background event loop...");
219
220        conn.reconnect_tx.send(()).await.unwrap();
221
222        Ok(conn)
223    }
224
225    /// Initialize the background event loops
226    async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
227        // Spawn event loop to read and process incoming messages
228
229        let mut incoming_abort = self.incoming_abort.write().await;
230        let mut outgoing_abort = self.outgoing_abort.write().await;
231        let mut task_abort = self.task_abort.write().await;
232
233        if (*incoming_abort).is_none() {
234            let conn = self.clone();
235
236            let jh = tokio::spawn(async move {
237                loop {
238                    if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
239                        while let Some(msg) = sock_rd.next().await {
240                            match conn.handle_incoming_msg(msg).await {
241                                Err(NtError::NeedReconnect) => {
242                                    conn.reconnect_tx.send(()).await;
243                                    break;
244                                }
245                                _ => {}
246                            }
247                        }
248                    }
249
250                    tokio::task::yield_now().await;
251                }
252            });
253
254            *incoming_abort = Some(jh.abort_handle());
255        }
256
257        // Spawn event loop to send outgoing messages
258        if (*outgoing_abort).is_none() {
259            let conn = self.clone();
260
261            let jh = tokio::spawn(async move {
262                loop {
263                    while let Some(outgoing) = c2s_rx.recv().await {
264                        trace!("sending {outgoing:?}");
265
266                        if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
267                            match sock_wr.send(outgoing).await {
268                                Ok(()) => {
269                                    trace!("sent outgoing message successfully");
270                                }
271                                Err(TungsteniteError::ConnectionClosed)
272                                | Err(TungsteniteError::Io(_))
273                                | Err(TungsteniteError::AlreadyClosed)
274                                | Err(TungsteniteError::Protocol(_)) => {
275                                    conn.reconnect_tx.send(()).await;
276                                    break;
277                                }
278                                Err(err) => {
279                                    error!("error writing outgoing message: {err:?}");
280                                }
281                            }
282
283                            trace!("sent");
284                        }
285                    }
286
287                    tokio::task::yield_now().await;
288                }
289            });
290
291            *outgoing_abort = Some(jh.abort_handle());
292        }
293
294        if (*task_abort).is_none() {
295            let conn = self.clone();
296
297            let jh = tokio::spawn(async move {
298                let mut ping_interval = interval(Duration::from_millis(500));
299                let mut time_correct_interval = interval(Duration::from_secs(3));
300
301                loop {
302                    tokio::select! {
303                        _ = ping_interval.tick() => conn.ping().await.unwrap(),
304                        _ = time_correct_interval.tick() => conn.time_correct().await.unwrap(),
305                    }
306
307                    //    if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
308                    //        match sock_wr.send(outgoing).await {
309                    //            Ok(()) => {
310                    //                trace!("sent outgoing message successfully");
311                    //            }
312                    //            Err(TungsteniteError::ConnectionClosed)
313                    //            | Err(TungsteniteError::Io(_))
314                    //            | Err(TungsteniteError::AlreadyClosed)
315                    //            | Err(TungsteniteError::Protocol(_)) => {
316                    //                conn.reconnect_tx.send(()).await;
317                    //                break;
318                    //            }
319                    //            Err(err) => {
320                    //                error!("error writing outgoing message: {err:?}");
321                    //            }
322                    //        }
323
324                    //        trace!("sent");
325                    //    }
326                    //}
327
328                    tokio::task::yield_now().await;
329                }
330            });
331
332            *task_abort = Some(jh.abort_handle());
333        }
334    }
335
336    async fn ping(&self) -> Result<()> {
337        self.c2s_tx
338            .send(Message::Ping(tungstenite::Bytes::from_static(
339                b"sigma sigma boy",
340            )))
341            .unwrap();
342
343        Ok(())
344    }
345    async fn time_correct(&self) -> Result<()> {
346        self.write_bin_frame::<BsInt>(
347            -1,
348            Duration::ZERO.as_micros() as u64,
349            (self.start_time.elapsed().as_micros() as u64).into(),
350        )
351        .unwrap();
352
353        Ok(())
354    }
355
356    async fn handle_incoming_msg(
357        &self,
358        msg: core::result::Result<Message, TungsteniteError>,
359    ) -> Result<()> {
360        match msg {
361            Ok(Message::Text(json)) => {
362                let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
363
364                for msg in messages {
365                    match msg {
366                        ServerMsg::Announce {
367                            name,
368                            id,
369                            r#type,
370                            pubuid,
371                            ..
372                        } => {
373                            // Store server topic info
374                            self.server_topics
375                                .write()
376                                .await
377                                .insert(name.clone(), (id, r#type.clone()));
378
379                            if let Some(pubuid) = pubuid {
380                                (*self.client_server.write().await).insert(pubuid, id);
381                                (*self.server_client.write().await).insert(id, pubuid);
382
383                                debug!(
384                                    "{name} ({type}): published successfully with topic id {id}"
385                                );
386                            } else {
387                                debug!("{name} ({type}): announced with topic id {id}");
388                            }
389                        }
390                        ServerMsg::Unannounce {
391                            name,
392                            id: server_id,
393                        } => {
394                            if let Some(pubuid) = self.server_client.read().await.get(&server_id) {
395                                self.client_server.write().await.remove(pubuid);
396                            }
397                            self.server_client.write().await.remove(&server_id);
398
399                            debug!("{name}: unannounced");
400                        }
401                        _ => unimplemented!(),
402                    }
403                }
404            }
405            Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
406                Ok((topic_id, timestamp, data)) => {
407                    trace!(
408                        "received binary frame with topic_id {}, ts={}",
409                        topic_id,
410                        timestamp
411                    );
412
413                    if topic_id == -1 {
414                        let curr_ts = self.start_time.elapsed();
415                        if let Data::Int(BsInt::U64(pre_ts)) = data {
416                            let rtt = curr_ts - Duration::from_micros(pre_ts);
417                            *self.offset.write().await =
418                                Duration::from_micros(timestamp) + (rtt / 2);
419                        }
420                    }
421
422                    if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
423                        if let Err(err) = value_tx.send((timestamp, data)) {
424                            error!("failed to send value to subscriber: {err:?}");
425                        }
426                    }
427                }
428                Err(err) => {
429                    error!("Failed to parse binary frame: {}", err);
430                }
431            },
432            Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
433            Err(TungsteniteError::ConnectionClosed)
434            | Err(TungsteniteError::Io(_))
435            | Err(TungsteniteError::AlreadyClosed)
436            | Err(TungsteniteError::Protocol(_)) => {
437                return Err(NtError::NeedReconnect);
438            }
439            Err(err) => error!("error reading incoming message: {err:?}"),
440        }
441
442        Ok(())
443    }
444
445    async fn next_id(&self) -> i32 {
446        let next = &mut *self.next_id.lock().await;
447        let curr = (*next).clone();
448        *next += 1;
449
450        curr
451    }
452
453    /// Publish a topic
454    ///
455    /// The topic will be unpublished when the [NtTopic] is dropped.
456    ///
457    /// # Arguments
458    ///
459    /// * `name` - The name of the topic to publish.
460    ///
461    /// # Type Parameters
462    ///
463    /// * `T` - The type of data to be published on the topic. Must implement the `DataType` trait.
464    ///
465    /// # Examples
466    ///
467    /// ```
468    /// use minint::{NtConn, datatype::DataType};
469    ///
470    /// #[tokio::main]
471    /// async fn main() {
472    ///     // Connect to the NetworkTables server
473    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
474    ///
475    ///     // Publish a new topic named "my_topic" with data type f64
476    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
477    ///
478    ///     // ...
479    /// }
480    /// ```
481    pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
482        let pubuid = self.next_id().await;
483        let name = name.into();
484
485        trace!("publishing {name} with pubuid {pubuid}");
486
487        self.publish_::<T>(name.clone(), pubuid).await?;
488
489        Ok(NtTopic {
490            conn: &*self,
491            name,
492            pubuid,
493            _marker: PhantomData,
494        })
495    }
496    async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
497        trace!("publishing {name} with pubuid {pubuid}");
498
499        let buf = serde_json::to_string(&[ClientMsg::Publish {
500            pubuid,
501            name: name.clone(),
502            r#type: T::STRING.to_string(),
503            properties: Some(PublishProps {
504                persistent: Some(false),
505                retained: Some(false),
506            }),
507        }])?;
508
509        self.c2s_tx
510            .send(Message::Text(buf.into()))
511            .map_err(|e| NtError::SendError(e.to_string()))
512            .unwrap();
513
514        debug!(
515            "{name} ({data_type}): publishing with pubuid {pubuid}",
516            data_type = T::STRING.to_string()
517        );
518
519        Ok(())
520    }
521
522    /// Unpublish topic
523    ///
524    /// This method is typically called when an `NtTopic` is dropped.
525    fn unpublish(&self, pubuid: i32) -> Result<()> {
526        let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
527        self.c2s_tx
528            .send(Message::Text(buf.into()))
529            .map_err(|e| NtError::SendError(e.to_string()))?;
530
531        Ok(())
532    }
533
534    /// Subscribe to a topic
535    ///
536    /// # Arguments
537    ///
538    /// * `topic` - The name of the topic to subscribe to.
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// use minint::NtConn;
544    ///
545    /// #[tokio::main]
546    /// async fn main() {
547    ///     // Connect to the NetworkTables server
548    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
549    ///
550    ///     // Subscribe to the topic named "my_topic"
551    ///     let subscription = conn.subscribe("my_topic").await.unwrap();
552    ///
553    ///     // ...
554    /// }
555    /// ```
556    pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
557        let subuid = self.next_id().await;
558
559        let buf = serde_json::to_string(&[ClientMsg::Subscribe {
560            topics: Vec::from_iter([topic.to_string()]),
561            subuid,
562            options: BTreeMap::new(),
563        }])?;
564        self.c2s_tx
565            .send(Message::Text(buf.into()))
566            .map_err(|e| NtError::SendError(e.to_string()))?;
567
568        Ok(NtSubscription {
569            conn: &*self,
570            subuid,
571        })
572    }
573
574    /// Unsubscribe from a topic
575    ///
576    /// This method is typically called when an `NtSubscription` is dropped.
577    fn unsubscribe(&self, subuid: i32) -> Result<()> {
578        let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
579        self.c2s_tx
580            .send(Message::Text(buf.into()))
581            .map_err(|e| NtError::SendError(e.to_string()))?;
582
583        Ok(())
584    }
585
586    /// Read/parse a binary frame
587    ///
588    /// This method is used internally to process incoming data values for subscribed topics.
589    ///
590    /// Returns `(uid, timestamp, data)`
591    fn read_bin_frame(buf: Vec<u8>) -> Result<(i32, u64, Data)> {
592        let mut bytes = Bytes::new(&buf);
593        let len = rmp::decode::read_array_len(&mut bytes)?;
594
595        if len == 4 {
596            let uid = rmp::decode::read_i32(&mut bytes)?;
597            let ts = rmp::decode::read_u64(&mut bytes)?;
598            let data_type = rmp::decode::read_u8(&mut bytes)?;
599            let data = Data::from(&mut bytes, data_type)
600                .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
601
602            Ok((uid, ts, data))
603        } else {
604            Err(NtError::BinaryFrameError)
605        }
606    }
607
608    /// Write a binary frame
609    ///
610    /// This method is used internally to send data values to the NetworkTables server.
611    fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
612        let mut buf = Vec::new();
613        rmp::encode::write_array_len(&mut buf, 4)?;
614
615        rmp::encode::write_i32(&mut buf, uid)?;
616        rmp::encode::write_uint(&mut buf, ts)?;
617        rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
618        T::encode(&mut buf, value).map_err(|_| {
619            NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
620        })?;
621
622        self.c2s_tx
623            .send(Message::Binary(buf.into()))
624            .map_err(|e| NtError::SendError(e.to_string()))?;
625
626        Ok(())
627    }
628
629    /// Shutdown the connection
630    ///
631    /// This method stops the event loops for sending and receiving messages. All `NtTopic`
632    /// instances associated with this connection must be dropped before calling this method.
633    pub async fn stop(self) {
634        // Attempt to unwrap and use incoming and outgoing abort handles
635
636        if let Some(ah) = self.incoming_abort.read().await.as_ref() {
637            ah.abort();
638        }
639        if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
640            ah.abort();
641        }
642    }
643}
644impl Clone for NtConn {
645    fn clone(&self) -> Self {
646        Self {
647            start_time: self.start_time.clone(),
648            offset: self.offset.clone(),
649
650            reconnect_tx: self.reconnect_tx.clone(),
651            next_id: self.next_id.clone(),
652
653            incoming_abort: self.incoming_abort.clone(),
654            outgoing_abort: self.outgoing_abort.clone(),
655            task_abort: self.task_abort.clone(),
656
657            c2s_tx: self.c2s_tx.clone(),
658
659            sock_wr: self.sock_wr.clone(),
660            sock_rd: self.sock_rd.clone(),
661
662            client_server: self.client_server.clone(),
663            server_client: self.server_client.clone(),
664
665            server_topics: self.server_topics.clone(),
666            values: self.values.clone(),
667            value_tx: self.value_tx.clone(),
668        }
669    }
670}
671
672/// A NetworkTables topic
673///
674/// This structure represents a published topic on the NetworkTables server. It allows you to set
675/// the value of the topic. The topic is automatically unpublished when this structure is dropped.
676pub struct NtTopic<'nt, T: DataWrap> {
677    conn: &'nt NtConn,
678    name: String,
679    pubuid: i32,
680    _marker: PhantomData<T>,
681}
682impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
683    /// Set the value of the topic.
684    ///
685    /// # Arguments
686    ///
687    /// * `val` - The new value to set the topic to.
688    ///
689    /// # Examples
690    ///
691    /// ```
692    /// use minint::{NtConn, datatype::DataType};
693    ///
694    /// #[tokio::main]
695    /// async fn main() {
696    ///     // Connect to the NetworkTables server
697    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
698    ///
699    ///     // Publish a new topic
700    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
701    ///
702    ///     // Set the value of the topic
703    ///     topic.set(3.14159).await.unwrap();
704    ///
705    ///     // ...
706    /// }
707    /// ```
708    pub async fn set(&mut self, val: T) -> Result<()> {
709        debug!(
710            "{name} ({data_type}): set to {val:?}",
711            name = self.name,
712            data_type = T::STRING.to_string()
713        );
714
715        if !self
716            .conn
717            .client_server
718            .read()
719            .await
720            .contains_key(&self.pubuid)
721        {
722            self.conn
723                .publish_::<T>(self.name.clone(), self.pubuid)
724                .await?;
725        }
726
727        trace!("writing binary frame");
728        match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
729            Err(_) => {}
730            _ => {}
731        }
732
733        Ok(())
734    }
735}
736impl<T: DataWrap> Drop for NtTopic<'_, T> {
737    fn drop(&mut self) {
738        if let Err(e) = self.conn.unpublish(self.pubuid) {
739            error!("Failed to unpublish topic: {}", e);
740        }
741    }
742}
743
744/// A NetworkTables subscription
745///
746/// This structure represents a subscription to a topic on the NetworkTables server. It is
747/// automatically unsubscribed when this structure is dropped.
748pub struct NtSubscription<'nt> {
749    conn: &'nt NtConn,
750    subuid: i32,
751}
752impl NtSubscription<'_> {
753    pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
754        Ok(self.conn.values.read().await.get(&self.subuid).cloned())
755    }
756}
757impl Drop for NtSubscription<'_> {
758    fn drop(&mut self) {
759        self.conn.unsubscribe(self.subuid).unwrap();
760    }
761}