1#[macro_use]
14extern crate log;
15extern crate rmp;
16extern crate serde;
17extern crate serde_json;
18extern crate tokio;
19extern crate tokio_tungstenite;
20
21mod datatype;
22mod error;
23mod messages;
24
25pub use error::{NtError, Result};
26
27use std::collections::{BTreeMap, HashMap};
28use std::marker::PhantomData;
29use std::net::IpAddr;
30use std::ops::DerefMut;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use rmp::decode::Bytes;
40use tokio::net::TcpStream;
41use tokio::sync::{watch, RwLock, Semaphore};
42use tokio::{
43 sync::{mpsc, Mutex},
44 task::AbortHandle,
45};
46use tokio_tungstenite::tungstenite::{
47 client::IntoClientRequest,
48 http::{header, HeaderValue},
49 Error as TungsteniteError, Message,
50};
51use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
52
53pub struct NtConn {
57 reconnect: Arc<Semaphore>,
58
59 next_id: Arc<Mutex<i32>>,
61
62 incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
64 outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
66
67 c2s_tx: mpsc::UnboundedSender<Message>,
69
70 client_ident: Arc<RwLock<String>>,
71 server: Arc<RwLock<String>>,
72 sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
73 sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
74
75 server_client: Arc<RwLock<HashMap<i32, i32>>>,
76 client_server: Arc<RwLock<HashMap<i32, i32>>>,
77
78 server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
80
81 values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
82 value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
83}
84impl NtConn {
85 pub async fn new(server: impl Into<IpAddr>, client_ident: impl Into<String>) -> Result<Self> {
106 let client_server = Arc::new(RwLock::new(HashMap::new()));
107 let server_client = Arc::new(RwLock::new(HashMap::new()));
108 let sock_wr: Arc<
109 RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
110 > = Arc::new(RwLock::new(None));
111 let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
112 Arc::new(RwLock::new(None));
113
114 let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
116
117 let server_topics = Arc::new(RwLock::new(HashMap::new()));
118 let values = Arc::new(RwLock::new(HashMap::new()));
119 let value_tx = Arc::new(RwLock::new(HashMap::new()));
120
121 let conn = Self {
122 reconnect: Arc::new(Semaphore::const_new(1)),
123 next_id: Arc::new(Mutex::const_new(0)),
124 c2s_tx,
125
126 client_server,
127 server_client,
128
129 client_ident: Arc::new(RwLock::new(client_ident.into())),
130 server: Arc::new(RwLock::new(server.into().to_string())),
131 sock_rd,
132 sock_wr,
133
134 incoming_abort: Arc::new(RwLock::new(None)),
135 outgoing_abort: Arc::new(RwLock::new(None)),
136
137 server_topics,
138 values,
139 value_tx,
140 };
141
142 trace!("initializing background event loop...");
143 conn.init_background_event_loops(c2s_rx).await;
144 trace!("initialized background event loop...");
145
146 conn.connect().await?;
147
148 Ok(conn)
149 }
150
151 async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
152 let mut incoming_abort = self.incoming_abort.write().await;
155 let mut outgoing_abort = self.outgoing_abort.write().await;
156
157 if (*incoming_abort).is_none() {
158 let conn = self.clone();
159
160 let jh = tokio::spawn(async move {
161 let mut reconnect = false;
162
163 loop {
164 if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
165 while let Some(msg) = sock_rd.next().await {
166 match conn.handle_incoming_msg(msg).await {
167 Err(NtError::NeedReconnect) => {
168 reconnect = true;
169 break;
170 }
171 _ => {}
172 }
173 }
174 }
175
176 if reconnect {
177 match conn.reconnect.try_acquire() {
178 Ok(_) => {
179 while let Err(err) = conn.connect().await {
180 error!("failed to reconnect: {err:?}");
181 tokio::time::sleep(Duration::from_millis(20)).await;
182 }
183 }
184 _ => {}
185 }
186 reconnect = false;
187 }
188
189 tokio::task::yield_now().await;
190 }
191 });
192
193 *incoming_abort = Some(jh.abort_handle());
194 }
195
196 if (*outgoing_abort).is_none() {
198 let conn = self.clone();
199
200 let jh = tokio::spawn(async move {
201 let mut reconnect = false;
202 loop {
203 while let Some(outgoing) = c2s_rx.recv().await {
204 trace!("sending {outgoing:?}");
205
206 if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
207 match sock_wr.send(outgoing).await {
208 Ok(()) => {
209 trace!("sent outgoing message successfully");
210 }
211 Err(TungsteniteError::ConnectionClosed)
212 | Err(TungsteniteError::Io(_))
213 | Err(TungsteniteError::AlreadyClosed)
214 | Err(TungsteniteError::Protocol(_)) => {
215 reconnect = true;
216 break;
217 }
218 Err(err) => {
219 error!("error writing outgoing message: {err:?}");
220 }
221 }
222
223 trace!("sent");
224 }
225 }
226
227 if reconnect {
228 match conn.reconnect.try_acquire() {
229 Ok(_) => {
230 while let Err(err) = conn.connect().await {
231 error!("failed to reconnect: {err:?}");
232 tokio::time::sleep(Duration::from_millis(20)).await;
233 }
234 }
235 _ => {}
236 }
237 reconnect = false;
238 }
239
240 tokio::task::yield_now().await;
241 }
242 });
243
244 *outgoing_abort = Some(jh.abort_handle());
245 }
246 }
247
248 async fn handle_incoming_msg(
249 &self,
250 msg: core::result::Result<Message, TungsteniteError>,
251 ) -> Result<()> {
252 match msg {
253 Ok(Message::Text(json)) => {
254 let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
255
256 for msg in messages {
257 match msg {
258 ServerMsg::Announce {
259 name,
260 id,
261 r#type,
262 pubuid,
263 ..
264 } => {
265 self.server_topics
267 .write()
268 .await
269 .insert(name.clone(), (id, r#type.clone()));
270
271 if let Some(pubuid) = pubuid {
272 (*self.client_server.write().await).insert(pubuid, id);
273 (*self.server_client.write().await).insert(id, pubuid);
274
275 debug!(
276 "{name} ({type}): published successfully with topic id {id}"
277 );
278 } else {
279 debug!("{name} ({type}): announced with topic id {id}");
280 }
281 }
282 ServerMsg::Unannounce { name, id } => {
283 let topic_pubuids = self.server_client.read().await;
284 let mut pubuid_topics = self.client_server.write().await;
285
286 if let Some(pubuid) = (*topic_pubuids).get(&id) {
287 (*pubuid_topics).remove(pubuid);
288 }
289
290 drop(pubuid_topics);
291 drop(topic_pubuids);
292
293 debug!("{name}: unannounced");
294 }
295 _ => unimplemented!(),
296 }
297 }
298 }
299 Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
300 Ok((topic_id, timestamp, data)) => {
301 trace!(
302 "received binary frame with topic_id {}, ts={}",
303 topic_id,
304 timestamp
305 );
306
307 if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
308 if let Err(err) = value_tx.send((timestamp, data)) {
309 error!("failed to send value to subscriber: {err:?}");
310 }
311 }
312 }
313 Err(err) => {
314 error!("Failed to parse binary frame: {}", err);
315 }
316 },
317 Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
318 Err(TungsteniteError::ConnectionClosed)
319 | Err(TungsteniteError::Io(_))
320 | Err(TungsteniteError::AlreadyClosed)
321 | Err(TungsteniteError::Protocol(_)) => {
322 return Err(NtError::NeedReconnect);
323 }
324 Err(err) => error!("error reading incoming message: {err:?}"),
325 }
326
327 Ok(())
328 }
329
330 async fn connect(&self) -> Result<()> {
331 let server = self.server.read().await.clone();
333 let client_ident = self.client_ident.read().await.clone();
334
335 let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
337
338 req.headers_mut().append(
340 header::SEC_WEBSOCKET_PROTOCOL,
341 HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
342 );
343
344 loop {
346 match tokio_tungstenite::connect_async(req.clone()).await {
347 Ok((sock, _)) => {
348 let (sock_wr, sock_rd) = sock.split();
349
350 {
351 let mut sock_rd_ = self.sock_rd.write().await;
352 let mut sock_wr_ = self.sock_wr.write().await;
353 if let Some(sock_wr) = sock_wr_.deref_mut() {
354 sock_wr.close().await.unwrap();
355 }
356 *sock_rd_ = Some(sock_rd);
357 *sock_wr_ = Some(sock_wr);
358 }
359
360 self.server_client.write().await.clear();
361 self.client_server.write().await.clear();
362
363 break;
364 }
365 Err(err) => {
366 error!("failed to connect: {err:?}");
367 }
368 }
369 tokio::time::sleep(Duration::from_millis(100)).await;
370 }
371
372 Ok(())
373 }
374
375 async fn next_id(&self) -> i32 {
376 let next = &mut *self.next_id.lock().await;
377 let curr = (*next).clone();
378 *next += 1;
379
380 curr
381 }
382
383 pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
412 let pubuid = self.next_id().await;
413 let name = name.into();
414
415 trace!("publishing {name} with pubuid {pubuid}");
416
417 self.publish_::<T>(name.clone(), pubuid).await?;
418
419 Ok(NtTopic {
420 conn: &*self,
421 name,
422 pubuid,
423 _marker: PhantomData,
424 })
425 }
426 async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
427 trace!("publishing {name} with pubuid {pubuid}");
428
429 let buf = serde_json::to_string(&[ClientMsg::Publish {
430 pubuid,
431 name: name.clone(),
432 r#type: T::STRING.to_string(),
433 properties: Some(PublishProps {
434 persistent: Some(false),
435 retained: Some(false),
436 }),
437 }])?;
438
439 self.c2s_tx
440 .send(Message::Text(buf.into()))
441 .map_err(|e| NtError::SendError(e.to_string()))
442 .unwrap();
443
444 debug!(
445 "{name} ({data_type}): publishing with pubuid {pubuid}",
446 data_type = T::STRING.to_string()
447 );
448
449 Ok(())
450 }
451
452 fn unpublish(&self, pubuid: i32) -> Result<()> {
456 let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
457 self.c2s_tx
458 .send(Message::Text(buf.into()))
459 .map_err(|e| NtError::SendError(e.to_string()))?;
460
461 Ok(())
462 }
463
464 pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
487 let subuid = self.next_id().await;
488
489 let buf = serde_json::to_string(&[ClientMsg::Subscribe {
490 topics: Vec::from_iter([topic.to_string()]),
491 subuid,
492 options: BTreeMap::new(),
493 }])?;
494 self.c2s_tx
495 .send(Message::Text(buf.into()))
496 .map_err(|e| NtError::SendError(e.to_string()))?;
497
498 Ok(NtSubscription {
499 conn: &*self,
500 subuid,
501 })
502 }
503
504 fn unsubscribe(&self, subuid: i32) -> Result<()> {
508 let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
509 self.c2s_tx
510 .send(Message::Text(buf.into()))
511 .map_err(|e| NtError::SendError(e.to_string()))?;
512
513 Ok(())
514 }
515
516 fn read_bin_frame(buf: Vec<u8>) -> Result<(u64, u64, Data)> {
522 let mut bytes = Bytes::new(&buf);
523 let len = rmp::decode::read_array_len(&mut bytes)?;
524
525 if len == 4 {
526 let uid = rmp::decode::read_u64(&mut bytes)?;
527 let ts = rmp::decode::read_u64(&mut bytes)?;
528 let data_type = rmp::decode::read_u8(&mut bytes)?;
529 let data = Data::from(&mut bytes, data_type)
530 .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
531
532 Ok((uid, ts, data))
533 } else {
534 Err(NtError::BinaryFrameError)
535 }
536 }
537
538 fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
542 let mut buf = Vec::new();
543 rmp::encode::write_array_len(&mut buf, 4)?;
544
545 rmp::encode::write_i32(&mut buf, uid)?;
546 rmp::encode::write_uint(&mut buf, ts)?;
547 rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
548 T::encode(&mut buf, value).map_err(|_| {
549 NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
550 })?;
551
552 self.c2s_tx
553 .send(Message::Binary(buf.into()))
554 .map_err(|e| NtError::SendError(e.to_string()))?;
555
556 Ok(())
557 }
558
559 pub async fn stop(self) {
564 if let Some(ah) = self.incoming_abort.read().await.as_ref() {
567 ah.abort();
568 }
569 if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
570 ah.abort();
571 }
572 }
573}
574impl Clone for NtConn {
575 fn clone(&self) -> Self {
576 Self {
577 reconnect: self.reconnect.clone(),
578 next_id: self.next_id.clone(),
579
580 incoming_abort: self.incoming_abort.clone(),
581 outgoing_abort: self.outgoing_abort.clone(),
582
583 c2s_tx: self.c2s_tx.clone(),
584
585 client_ident: self.client_ident.clone(),
586 server: self.server.clone(),
587 sock_wr: self.sock_wr.clone(),
588 sock_rd: self.sock_rd.clone(),
589
590 client_server: self.client_server.clone(),
591 server_client: self.server_client.clone(),
592
593 server_topics: self.server_topics.clone(),
594 values: self.values.clone(),
595 value_tx: self.value_tx.clone(),
596 }
597 }
598}
599
600pub struct NtTopic<'nt, T: DataWrap> {
605 conn: &'nt NtConn,
606 name: String,
607 pubuid: i32,
608 _marker: PhantomData<T>,
609}
610impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
611 pub async fn set(&mut self, val: T) -> Result<()> {
637 debug!(
638 "{name} ({data_type}): set to {val:?}",
639 name = self.name,
640 data_type = T::STRING.to_string()
641 );
642
643 if !self
644 .conn
645 .client_server
646 .read()
647 .await
648 .contains_key(&self.pubuid)
649 {
650 self.conn
651 .publish_::<T>(self.name.clone(), self.pubuid)
652 .await?;
653 }
654
655 trace!("writing binary frame");
656 match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
657 Err(_) => {}
658 _ => {}
659 }
660
661 Ok(())
662 }
663}
664impl<T: DataWrap> Drop for NtTopic<'_, T> {
665 fn drop(&mut self) {
666 if let Err(e) = self.conn.unpublish(self.pubuid) {
667 error!("Failed to unpublish topic: {}", e);
668 }
669 }
670}
671
672pub struct NtSubscription<'nt> {
677 conn: &'nt NtConn,
678 subuid: i32,
679}
680impl NtSubscription<'_> {
681 pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
682 Ok(self.conn.values.read().await.get(&self.subuid).cloned())
683 }
684}
685impl Drop for NtSubscription<'_> {
686 fn drop(&mut self) {
687 self.conn.unsubscribe(self.subuid).unwrap();
688 }
689}