minint/
lib.rs

1//!
2//! # MiniNT
3//!
4//! A simple NetworkTables library implemented in Rust
5//!
6//! NetworkTables is a pub-sub messaging system used for FRC.
7//!
8//! The entrypoint is [NtConn].
9//!
10
11// TODO: this needs some cleanup
12
13#[macro_use]
14extern crate log;
15extern crate rmp;
16extern crate serde;
17extern crate serde_json;
18extern crate tokio;
19extern crate tokio_tungstenite;
20
21mod datatype;
22mod error;
23mod messages;
24
25pub use error::{NtError, Result};
26
27use std::collections::{BTreeMap, HashMap};
28use std::marker::PhantomData;
29use std::net::IpAddr;
30use std::ops::DerefMut;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use rmp::decode::Bytes;
40use tokio::net::TcpStream;
41use tokio::sync::{watch, RwLock, Semaphore};
42use tokio::{
43    sync::{mpsc, Mutex},
44    task::AbortHandle,
45};
46use tokio_tungstenite::tungstenite::{
47    client::IntoClientRequest,
48    http::{header, HeaderValue},
49    Error as TungsteniteError, Message,
50};
51use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
52
53// I wanna keep my face on today
54
55/// A NetworkTables connection
56pub struct NtConn {
57    reconnect: Arc<Semaphore>,
58
59    /// Next sequential ID
60    next_id: Arc<Mutex<i32>>,
61
62    /// Incoming request receiver event loop abort handle
63    incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
64    /// Outgoing request sender event loop abort handle
65    outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
66
67    /// Outgoing client-to-server message queue
68    c2s_tx: mpsc::UnboundedSender<Message>,
69
70    client_ident: Arc<RwLock<String>>,
71    server: Arc<RwLock<String>>,
72    sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
73    sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
74
75    server_client: Arc<RwLock<HashMap<i32, i32>>>,
76    client_server: Arc<RwLock<HashMap<i32, i32>>>,
77
78    /// Mapping from topic names to topic IDs for topics we've received from server
79    server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
80
81    values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
82    value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
83}
84impl NtConn {
85    /// Connect to a NetworkTables server
86    ///
87    /// # Arguments
88    ///
89    /// * `server` - The IP address of the NetworkTables server.
90    /// * `client_ident` - The client identifier to use for this connection.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use minint::NtConn;
96    ///
97    /// #[tokio::main]
98    /// async fn main() {
99    ///     // Connect to the NetworkTables server at 10.0.0.2
100    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
101    ///
102    ///     // ...
103    /// }
104    /// ```
105    pub async fn new(server: impl Into<IpAddr>, client_ident: impl Into<String>) -> Result<Self> {
106        let client_server = Arc::new(RwLock::new(HashMap::new()));
107        let server_client = Arc::new(RwLock::new(HashMap::new()));
108        let sock_wr: Arc<
109            RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
110        > = Arc::new(RwLock::new(None));
111        let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
112            Arc::new(RwLock::new(None));
113
114        // Setup channels for control
115        let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
116
117        let server_topics = Arc::new(RwLock::new(HashMap::new()));
118        let values = Arc::new(RwLock::new(HashMap::new()));
119        let value_tx = Arc::new(RwLock::new(HashMap::new()));
120
121        let conn = Self {
122            reconnect: Arc::new(Semaphore::const_new(1)),
123            next_id: Arc::new(Mutex::const_new(0)),
124            c2s_tx,
125
126            client_server,
127            server_client,
128
129            client_ident: Arc::new(RwLock::new(client_ident.into())),
130            server: Arc::new(RwLock::new(server.into().to_string())),
131            sock_rd,
132            sock_wr,
133
134            incoming_abort: Arc::new(RwLock::new(None)),
135            outgoing_abort: Arc::new(RwLock::new(None)),
136
137            server_topics,
138            values,
139            value_tx,
140        };
141
142        trace!("initializing background event loop...");
143        conn.init_background_event_loops(c2s_rx).await;
144        trace!("initialized background event loop...");
145
146        conn.connect().await?;
147
148        Ok(conn)
149    }
150
151    async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
152        // Spawn event loop to read and process incoming messages
153
154        let mut incoming_abort = self.incoming_abort.write().await;
155        let mut outgoing_abort = self.outgoing_abort.write().await;
156
157        if (*incoming_abort).is_none() {
158            let conn = self.clone();
159
160            let jh = tokio::spawn(async move {
161                let mut reconnect = false;
162
163                loop {
164                    if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
165                        while let Some(msg) = sock_rd.next().await {
166                            match conn.handle_incoming_msg(msg).await {
167                                Err(NtError::NeedReconnect) => {
168                                    reconnect = true;
169                                    break;
170                                }
171                                _ => {}
172                            }
173                        }
174                    }
175
176                    if reconnect {
177                        match conn.reconnect.try_acquire() {
178                            Ok(_) => {
179                                while let Err(err) = conn.connect().await {
180                                    error!("failed to reconnect: {err:?}");
181                                    tokio::time::sleep(Duration::from_millis(20)).await;
182                                }
183                            }
184                            _ => {}
185                        }
186                        reconnect = false;
187                    }
188
189                    tokio::task::yield_now().await;
190                }
191            });
192
193            *incoming_abort = Some(jh.abort_handle());
194        }
195
196        // Spawn event loop to send outgoing messages
197        if (*outgoing_abort).is_none() {
198            let conn = self.clone();
199
200            let jh = tokio::spawn(async move {
201                let mut reconnect = false;
202                loop {
203                    while let Some(outgoing) = c2s_rx.recv().await {
204                        trace!("sending {outgoing:?}");
205
206                        if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
207                            match sock_wr.send(outgoing).await {
208                                Ok(()) => {
209                                    trace!("sent outgoing message successfully");
210                                }
211                                Err(TungsteniteError::ConnectionClosed)
212                                | Err(TungsteniteError::Io(_))
213                                | Err(TungsteniteError::AlreadyClosed)
214                                | Err(TungsteniteError::Protocol(_)) => {
215                                    reconnect = true;
216                                    break;
217                                }
218                                Err(err) => {
219                                    error!("error writing outgoing message: {err:?}");
220                                }
221                            }
222
223                            trace!("sent");
224                        }
225                    }
226
227                    if reconnect {
228                        match conn.reconnect.try_acquire() {
229                            Ok(_) => {
230                                while let Err(err) = conn.connect().await {
231                                    error!("failed to reconnect: {err:?}");
232                                    tokio::time::sleep(Duration::from_millis(20)).await;
233                                }
234                            }
235                            _ => {}
236                        }
237                        reconnect = false;
238                    }
239
240                    tokio::task::yield_now().await;
241                }
242            });
243
244            *outgoing_abort = Some(jh.abort_handle());
245        }
246    }
247
248    async fn handle_incoming_msg(
249        &self,
250        msg: core::result::Result<Message, TungsteniteError>,
251    ) -> Result<()> {
252        match msg {
253            Ok(Message::Text(json)) => {
254                let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
255
256                for msg in messages {
257                    match msg {
258                        ServerMsg::Announce {
259                            name,
260                            id,
261                            r#type,
262                            pubuid,
263                            ..
264                        } => {
265                            // Store server topic info
266                            self.server_topics
267                                .write()
268                                .await
269                                .insert(name.clone(), (id, r#type.clone()));
270
271                            if let Some(pubuid) = pubuid {
272                                (*self.client_server.write().await).insert(pubuid, id);
273                                (*self.server_client.write().await).insert(id, pubuid);
274
275                                debug!(
276                                    "{name} ({type}): published successfully with topic id {id}"
277                                );
278                            } else {
279                                debug!("{name} ({type}): announced with topic id {id}");
280                            }
281                        }
282                        ServerMsg::Unannounce { name, id } => {
283                            let topic_pubuids = self.server_client.read().await;
284                            let mut pubuid_topics = self.client_server.write().await;
285
286                            if let Some(pubuid) = (*topic_pubuids).get(&id) {
287                                (*pubuid_topics).remove(pubuid);
288                            }
289
290                            drop(pubuid_topics);
291                            drop(topic_pubuids);
292
293                            debug!("{name}: unannounced");
294                        }
295                        _ => unimplemented!(),
296                    }
297                }
298            }
299            Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
300                Ok((topic_id, timestamp, data)) => {
301                    trace!(
302                        "received binary frame with topic_id {}, ts={}",
303                        topic_id,
304                        timestamp
305                    );
306
307                    if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
308                        if let Err(err) = value_tx.send((timestamp, data)) {
309                            error!("failed to send value to subscriber: {err:?}");
310                        }
311                    }
312                }
313                Err(err) => {
314                    error!("Failed to parse binary frame: {}", err);
315                }
316            },
317            Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
318            Err(TungsteniteError::ConnectionClosed)
319            | Err(TungsteniteError::Io(_))
320            | Err(TungsteniteError::AlreadyClosed)
321            | Err(TungsteniteError::Protocol(_)) => {
322                return Err(NtError::NeedReconnect);
323            }
324            Err(err) => error!("error reading incoming message: {err:?}"),
325        }
326
327        Ok(())
328    }
329
330    async fn connect(&self) -> Result<()> {
331        // Get server and client_ident into something we can work with
332        let server = self.server.read().await.clone();
333        let client_ident = self.client_ident.read().await.clone();
334
335        // Build the WebSocket URL and turn it into tungstenite's client req type
336        let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
337
338        // Add header as specified in WPILib's spec
339        req.headers_mut().append(
340            header::SEC_WEBSOCKET_PROTOCOL,
341            HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
342        );
343
344        // Connect to the server and split into read and write
345        loop {
346            match tokio_tungstenite::connect_async(req.clone()).await {
347                Ok((sock, _)) => {
348                    let (sock_wr, sock_rd) = sock.split();
349
350                    {
351                        let mut sock_rd_ = self.sock_rd.write().await;
352                        let mut sock_wr_ = self.sock_wr.write().await;
353                        if let Some(sock_wr) = sock_wr_.deref_mut() {
354                            sock_wr.close().await.unwrap();
355                        }
356                        *sock_rd_ = Some(sock_rd);
357                        *sock_wr_ = Some(sock_wr);
358                    }
359
360                    self.server_client.write().await.clear();
361                    self.client_server.write().await.clear();
362
363                    break;
364                }
365                Err(err) => {
366                    error!("failed to connect: {err:?}");
367                }
368            }
369            tokio::time::sleep(Duration::from_millis(100)).await;
370        }
371
372        Ok(())
373    }
374
375    async fn next_id(&self) -> i32 {
376        let next = &mut *self.next_id.lock().await;
377        let curr = (*next).clone();
378        *next += 1;
379
380        curr
381    }
382
383    /// Publish a topic
384    ///
385    /// The topic will be unpublished when the [NtTopic] is dropped.
386    ///
387    /// # Arguments
388    ///
389    /// * `name` - The name of the topic to publish.
390    ///
391    /// # Type Parameters
392    ///
393    /// * `T` - The type of data to be published on the topic. Must implement the `DataType` trait.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// use minint::{NtConn, datatype::DataType};
399    ///
400    /// #[tokio::main]
401    /// async fn main() {
402    ///     // Connect to the NetworkTables server
403    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
404    ///
405    ///     // Publish a new topic named "my_topic" with data type f64
406    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
407    ///
408    ///     // ...
409    /// }
410    /// ```
411    pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
412        let pubuid = self.next_id().await;
413        let name = name.into();
414
415        trace!("publishing {name} with pubuid {pubuid}");
416
417        self.publish_::<T>(name.clone(), pubuid).await?;
418
419        Ok(NtTopic {
420            conn: &*self,
421            name,
422            pubuid,
423            _marker: PhantomData,
424        })
425    }
426    async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
427        trace!("publishing {name} with pubuid {pubuid}");
428
429        let buf = serde_json::to_string(&[ClientMsg::Publish {
430            pubuid,
431            name: name.clone(),
432            r#type: T::STRING.to_string(),
433            properties: Some(PublishProps {
434                persistent: Some(false),
435                retained: Some(false),
436            }),
437        }])?;
438
439        self.c2s_tx
440            .send(Message::Text(buf.into()))
441            .map_err(|e| NtError::SendError(e.to_string()))
442            .unwrap();
443
444        debug!(
445            "{name} ({data_type}): publishing with pubuid {pubuid}",
446            data_type = T::STRING.to_string()
447        );
448
449        Ok(())
450    }
451
452    /// Unpublish topic
453    ///
454    /// This method is typically called when an `NtTopic` is dropped.
455    fn unpublish(&self, pubuid: i32) -> Result<()> {
456        let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
457        self.c2s_tx
458            .send(Message::Text(buf.into()))
459            .map_err(|e| NtError::SendError(e.to_string()))?;
460
461        Ok(())
462    }
463
464    /// Subscribe to a topic
465    ///
466    /// # Arguments
467    ///
468    /// * `topic` - The name of the topic to subscribe to.
469    ///
470    /// # Examples
471    ///
472    /// ```
473    /// use minint::NtConn;
474    ///
475    /// #[tokio::main]
476    /// async fn main() {
477    ///     // Connect to the NetworkTables server
478    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
479    ///
480    ///     // Subscribe to the topic named "my_topic"
481    ///     let subscription = conn.subscribe("my_topic").await.unwrap();
482    ///
483    ///     // ...
484    /// }
485    /// ```
486    pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
487        let subuid = self.next_id().await;
488
489        let buf = serde_json::to_string(&[ClientMsg::Subscribe {
490            topics: Vec::from_iter([topic.to_string()]),
491            subuid,
492            options: BTreeMap::new(),
493        }])?;
494        self.c2s_tx
495            .send(Message::Text(buf.into()))
496            .map_err(|e| NtError::SendError(e.to_string()))?;
497
498        Ok(NtSubscription {
499            conn: &*self,
500            subuid,
501        })
502    }
503
504    /// Unsubscribe from a topic
505    ///
506    /// This method is typically called when an `NtSubscription` is dropped.
507    fn unsubscribe(&self, subuid: i32) -> Result<()> {
508        let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
509        self.c2s_tx
510            .send(Message::Text(buf.into()))
511            .map_err(|e| NtError::SendError(e.to_string()))?;
512
513        Ok(())
514    }
515
516    /// Read/parse a binary frame
517    ///
518    /// This method is used internally to process incoming data values for subscribed topics.
519    ///
520    /// Returns `(uid, timestamp, data)`
521    fn read_bin_frame(buf: Vec<u8>) -> Result<(u64, u64, Data)> {
522        let mut bytes = Bytes::new(&buf);
523        let len = rmp::decode::read_array_len(&mut bytes)?;
524
525        if len == 4 {
526            let uid = rmp::decode::read_u64(&mut bytes)?;
527            let ts = rmp::decode::read_u64(&mut bytes)?;
528            let data_type = rmp::decode::read_u8(&mut bytes)?;
529            let data = Data::from(&mut bytes, data_type)
530                .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
531
532            Ok((uid, ts, data))
533        } else {
534            Err(NtError::BinaryFrameError)
535        }
536    }
537
538    /// Write a binary frame
539    ///
540    /// This method is used internally to send data values to the NetworkTables server.
541    fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
542        let mut buf = Vec::new();
543        rmp::encode::write_array_len(&mut buf, 4)?;
544
545        rmp::encode::write_i32(&mut buf, uid)?;
546        rmp::encode::write_uint(&mut buf, ts)?;
547        rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
548        T::encode(&mut buf, value).map_err(|_| {
549            NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
550        })?;
551
552        self.c2s_tx
553            .send(Message::Binary(buf.into()))
554            .map_err(|e| NtError::SendError(e.to_string()))?;
555
556        Ok(())
557    }
558
559    /// Shutdown the connection
560    ///
561    /// This method stops the event loops for sending and receiving messages. All `NtTopic`
562    /// instances associated with this connection must be dropped before calling this method.
563    pub async fn stop(self) {
564        // Attempt to unwrap and use incoming and outgoing abort handles
565
566        if let Some(ah) = self.incoming_abort.read().await.as_ref() {
567            ah.abort();
568        }
569        if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
570            ah.abort();
571        }
572    }
573}
574impl Clone for NtConn {
575    fn clone(&self) -> Self {
576        Self {
577            reconnect: self.reconnect.clone(),
578            next_id: self.next_id.clone(),
579
580            incoming_abort: self.incoming_abort.clone(),
581            outgoing_abort: self.outgoing_abort.clone(),
582
583            c2s_tx: self.c2s_tx.clone(),
584
585            client_ident: self.client_ident.clone(),
586            server: self.server.clone(),
587            sock_wr: self.sock_wr.clone(),
588            sock_rd: self.sock_rd.clone(),
589
590            client_server: self.client_server.clone(),
591            server_client: self.server_client.clone(),
592
593            server_topics: self.server_topics.clone(),
594            values: self.values.clone(),
595            value_tx: self.value_tx.clone(),
596        }
597    }
598}
599
600/// A NetworkTables topic
601///
602/// This structure represents a published topic on the NetworkTables server. It allows you to set
603/// the value of the topic. The topic is automatically unpublished when this structure is dropped.
604pub struct NtTopic<'nt, T: DataWrap> {
605    conn: &'nt NtConn,
606    name: String,
607    pubuid: i32,
608    _marker: PhantomData<T>,
609}
610impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
611    /// Set the value of the topic.
612    ///
613    /// # Arguments
614    ///
615    /// * `val` - The new value to set the topic to.
616    ///
617    /// # Examples
618    ///
619    /// ```
620    /// use minint::{NtConn, datatype::DataType};
621    ///
622    /// #[tokio::main]
623    /// async fn main() {
624    ///     // Connect to the NetworkTables server
625    ///     let conn = NtConn::new("10.0.0.2", "my_client").await.unwrap();
626    ///
627    ///     // Publish a new topic
628    ///     let mut topic = conn.publish::<f64>("my_topic").await.unwrap();
629    ///
630    ///     // Set the value of the topic
631    ///     topic.set(3.14159).await.unwrap();
632    ///
633    ///     // ...
634    /// }
635    /// ```
636    pub async fn set(&mut self, val: T) -> Result<()> {
637        debug!(
638            "{name} ({data_type}): set to {val:?}",
639            name = self.name,
640            data_type = T::STRING.to_string()
641        );
642
643        if !self
644            .conn
645            .client_server
646            .read()
647            .await
648            .contains_key(&self.pubuid)
649        {
650            self.conn
651                .publish_::<T>(self.name.clone(), self.pubuid)
652                .await?;
653        }
654
655        trace!("writing binary frame");
656        match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
657            Err(_) => {}
658            _ => {}
659        }
660
661        Ok(())
662    }
663}
664impl<T: DataWrap> Drop for NtTopic<'_, T> {
665    fn drop(&mut self) {
666        if let Err(e) = self.conn.unpublish(self.pubuid) {
667            error!("Failed to unpublish topic: {}", e);
668        }
669    }
670}
671
672/// A NetworkTables subscription
673///
674/// This structure represents a subscription to a topic on the NetworkTables server. It is
675/// automatically unsubscribed when this structure is dropped.
676pub struct NtSubscription<'nt> {
677    conn: &'nt NtConn,
678    subuid: i32,
679}
680impl NtSubscription<'_> {
681    pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
682        Ok(self.conn.values.read().await.get(&self.subuid).cloned())
683    }
684}
685impl Drop for NtSubscription<'_> {
686    fn drop(&mut self) {
687        self.conn.unsubscribe(self.subuid).unwrap();
688    }
689}