1#[macro_use]
14extern crate tracing;
15extern crate quanta;
16extern crate rmp;
17extern crate serde;
18extern crate serde_json;
19extern crate tokio;
20extern crate tokio_tungstenite;
21
22mod datatype;
23mod error;
24mod messages;
25
26pub use error::{NtError, Result};
27use tokio::time::{interval, Interval};
28
29use std::collections::{BTreeMap, HashMap};
30use std::marker::PhantomData;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{BsInt, Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use quanta::{Clock, Instant};
40use rmp::decode::Bytes;
41use tokio::net::TcpStream;
42use tokio::sync::{watch, RwLock};
43use tokio::{
44 sync::{mpsc, Mutex},
45 task::AbortHandle,
46};
47use tokio_tungstenite::tungstenite::{
48 client::IntoClientRequest,
49 http::{header, HeaderValue},
50 Error as TungsteniteError, Message,
51};
52use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
53
54async fn reconnector(
57 mut rx: mpsc::Receiver<()>,
58 server: String,
59 client_ident: String,
60 sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
61 sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
62) -> Result<()> {
63 let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
65
66 req.headers_mut().append(
68 header::SEC_WEBSOCKET_PROTOCOL,
69 HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
70 );
71
72 loop {
73 rx.recv().await;
74 {
75 let mut sock_rd = sock_rd.write().await;
76 let mut sock_wr = sock_wr.write().await;
77
78 *sock_rd = None;
79 if let Some(sock_wr) = sock_wr.as_mut() {
80 sock_wr.close().await.unwrap();
81 }
82
83 loop {
85 match tokio_tungstenite::connect_async(req.clone()).await {
86 Ok((sock, _)) => {
87 let (sock_wr_, sock_rd_) = sock.split();
88
89 {
90 *sock_rd = Some(sock_rd_);
91 *sock_wr = Some(sock_wr_);
92 }
93
94 info!("connected");
95 break;
96 }
97 Err(err) => {
98 error!("failed to connect: {err:?}");
99 }
100 }
101 tokio::time::sleep(Duration::from_millis(100)).await;
102 }
103 }
104
105 while let Some(_) = rx.recv().await {}
107 }
108}
109
110pub struct NtConn {
112 start_time: Instant,
113 offset: Arc<RwLock<Duration>>,
114
115 reconnect_tx: mpsc::Sender<()>,
116
117 next_id: Arc<Mutex<i32>>,
119
120 c2s_tx: mpsc::UnboundedSender<Message>,
122
123 incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
125 outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
127 task_abort: Arc<RwLock<Option<AbortHandle>>>,
128
129 sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
130 sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
131
132 server_client: Arc<RwLock<HashMap<i32, i32>>>,
133 client_server: Arc<RwLock<HashMap<i32, i32>>>,
134
135 server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
137
138 values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
139 value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
140}
141impl NtConn {
142 pub async fn new(server: impl Into<String>, client_ident: impl Into<String>) -> Result<Self> {
163 let server = server.into();
164 let client_ident = client_ident.into();
165
166 let client_server = Arc::new(RwLock::new(HashMap::new()));
167 let server_client = Arc::new(RwLock::new(HashMap::new()));
168 let sock_wr: Arc<
169 RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
170 > = Arc::new(RwLock::new(None));
171 let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
172 Arc::new(RwLock::new(None));
173
174 let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
176
177 let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(1);
178 let sock_rd_ = sock_rd.clone();
179 let sock_wr_ = sock_wr.clone();
180 tokio::spawn(async move {
181 reconnector(reconnect_rx, server, client_ident, sock_rd_, sock_wr_)
182 .await
183 .unwrap();
184 });
185
186 let server_topics = Arc::new(RwLock::new(HashMap::new()));
187 let values = Arc::new(RwLock::new(HashMap::new()));
188 let value_tx = Arc::new(RwLock::new(HashMap::new()));
189
190 let start_time = Instant::now();
191
192 let conn = Self {
193 start_time,
194 offset: Arc::new(RwLock::new(Duration::ZERO)),
195
196 reconnect_tx,
197 next_id: Arc::new(Mutex::const_new(0)),
198
199 client_server,
200 server_client,
201
202 c2s_tx,
203
204 sock_rd,
205 sock_wr,
206
207 incoming_abort: Arc::new(RwLock::new(None)),
208 outgoing_abort: Arc::new(RwLock::new(None)),
209 task_abort: Arc::new(RwLock::new(None)),
210
211 server_topics,
212 values,
213 value_tx,
214 };
215
216 trace!("initializing background event loop...");
217 conn.init_background_event_loops(c2s_rx).await;
218 trace!("initialized background event loop...");
219
220 conn.reconnect_tx.send(()).await.unwrap();
221
222 Ok(conn)
223 }
224
225 async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
227 let mut incoming_abort = self.incoming_abort.write().await;
230 let mut outgoing_abort = self.outgoing_abort.write().await;
231 let mut task_abort = self.task_abort.write().await;
232
233 if (*incoming_abort).is_none() {
234 let conn = self.clone();
235
236 let jh = tokio::spawn(async move {
237 loop {
238 if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
239 while let Some(msg) = sock_rd.next().await {
240 match conn.handle_incoming_msg(msg).await {
241 Err(NtError::NeedReconnect) => {
242 conn.reconnect_tx.send(()).await;
243 break;
244 }
245 _ => {}
246 }
247 }
248 }
249
250 tokio::task::yield_now().await;
251 }
252 });
253
254 *incoming_abort = Some(jh.abort_handle());
255 }
256
257 if (*outgoing_abort).is_none() {
259 let conn = self.clone();
260
261 let jh = tokio::spawn(async move {
262 loop {
263 while let Some(outgoing) = c2s_rx.recv().await {
264 trace!("sending {outgoing:?}");
265
266 if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
267 match sock_wr.send(outgoing).await {
268 Ok(()) => {
269 trace!("sent outgoing message successfully");
270 }
271 Err(TungsteniteError::ConnectionClosed)
272 | Err(TungsteniteError::Io(_))
273 | Err(TungsteniteError::AlreadyClosed)
274 | Err(TungsteniteError::Protocol(_)) => {
275 conn.reconnect_tx.send(()).await;
276 break;
277 }
278 Err(err) => {
279 error!("error writing outgoing message: {err:?}");
280 }
281 }
282
283 trace!("sent");
284 }
285 }
286
287 tokio::task::yield_now().await;
288 }
289 });
290
291 *outgoing_abort = Some(jh.abort_handle());
292 }
293
294 if (*task_abort).is_none() {
295 let conn = self.clone();
296
297 let jh = tokio::spawn(async move {
298 let mut ping_interval = interval(Duration::from_millis(500));
299 let mut time_correct_interval = interval(Duration::from_secs(3));
300
301 loop {
302 tokio::select! {
303 _ = ping_interval.tick() => conn.ping().await.unwrap(),
304 _ = time_correct_interval.tick() => conn.time_correct().await.unwrap(),
305 }
306
307 tokio::task::yield_now().await;
329 }
330 });
331
332 *task_abort = Some(jh.abort_handle());
333 }
334 }
335
336 async fn ping(&self) -> Result<()> {
337 self.c2s_tx
338 .send(Message::Ping(tungstenite::Bytes::from_static(
339 b"sigma sigma boy",
340 )))
341 .unwrap();
342
343 Ok(())
344 }
345 async fn time_correct(&self) -> Result<()> {
346 self.write_bin_frame::<BsInt>(
347 -1,
348 Duration::ZERO.as_micros() as u64,
349 (self.start_time.elapsed().as_micros() as u64).into(),
350 )
351 .unwrap();
352
353 Ok(())
354 }
355
356 async fn handle_incoming_msg(
357 &self,
358 msg: core::result::Result<Message, TungsteniteError>,
359 ) -> Result<()> {
360 match msg {
361 Ok(Message::Text(json)) => {
362 let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
363
364 for msg in messages {
365 match msg {
366 ServerMsg::Announce {
367 name,
368 id,
369 r#type,
370 pubuid,
371 ..
372 } => {
373 self.server_topics
375 .write()
376 .await
377 .insert(name.clone(), (id, r#type.clone()));
378
379 if let Some(pubuid) = pubuid {
380 (*self.client_server.write().await).insert(pubuid, id);
381 (*self.server_client.write().await).insert(id, pubuid);
382
383 debug!(
384 "{name} ({type}): published successfully with topic id {id}"
385 );
386 } else {
387 debug!("{name} ({type}): announced with topic id {id}");
388 }
389 }
390 ServerMsg::Unannounce {
391 name,
392 id: server_id,
393 } => {
394 if let Some(pubuid) = self.server_client.read().await.get(&server_id) {
395 self.client_server.write().await.remove(pubuid);
396 }
397 self.server_client.write().await.remove(&server_id);
398
399 debug!("{name}: unannounced");
400 }
401 _ => unimplemented!(),
402 }
403 }
404 }
405 Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
406 Ok((topic_id, timestamp, data)) => {
407 trace!(
408 "received binary frame with topic_id {}, ts={}",
409 topic_id,
410 timestamp
411 );
412
413 if topic_id == -1 {
414 let curr_ts = self.start_time.elapsed();
415 if let Data::Int(BsInt::U64(pre_ts)) = data {
416 let rtt = curr_ts - Duration::from_micros(pre_ts);
417 *self.offset.write().await =
418 Duration::from_micros(timestamp) + (rtt / 2);
419 }
420 }
421
422 if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
423 if let Err(err) = value_tx.send((timestamp, data)) {
424 error!("failed to send value to subscriber: {err:?}");
425 }
426 }
427 }
428 Err(err) => {
429 error!("Failed to parse binary frame: {}", err);
430 }
431 },
432 Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
433 Err(TungsteniteError::ConnectionClosed)
434 | Err(TungsteniteError::Io(_))
435 | Err(TungsteniteError::AlreadyClosed)
436 | Err(TungsteniteError::Protocol(_)) => {
437 return Err(NtError::NeedReconnect);
438 }
439 Err(err) => error!("error reading incoming message: {err:?}"),
440 }
441
442 Ok(())
443 }
444
445 async fn next_id(&self) -> i32 {
446 let next = &mut *self.next_id.lock().await;
447 let curr = (*next).clone();
448 *next += 1;
449
450 curr
451 }
452
453 pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
482 let pubuid = self.next_id().await;
483 let name = name.into();
484
485 trace!("publishing {name} with pubuid {pubuid}");
486
487 self.publish_::<T>(name.clone(), pubuid).await?;
488
489 Ok(NtTopic {
490 conn: &*self,
491 name,
492 pubuid,
493 _marker: PhantomData,
494 })
495 }
496 async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
497 trace!("publishing {name} with pubuid {pubuid}");
498
499 let buf = serde_json::to_string(&[ClientMsg::Publish {
500 pubuid,
501 name: name.clone(),
502 r#type: T::STRING.to_string(),
503 properties: Some(PublishProps {
504 persistent: Some(false),
505 retained: Some(false),
506 }),
507 }])?;
508
509 self.c2s_tx
510 .send(Message::Text(buf.into()))
511 .map_err(|e| NtError::SendError(e.to_string()))
512 .unwrap();
513
514 debug!(
515 "{name} ({data_type}): publishing with pubuid {pubuid}",
516 data_type = T::STRING.to_string()
517 );
518
519 Ok(())
520 }
521
522 fn unpublish(&self, pubuid: i32) -> Result<()> {
526 let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
527 self.c2s_tx
528 .send(Message::Text(buf.into()))
529 .map_err(|e| NtError::SendError(e.to_string()))?;
530
531 Ok(())
532 }
533
534 pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
557 let subuid = self.next_id().await;
558
559 let buf = serde_json::to_string(&[ClientMsg::Subscribe {
560 topics: Vec::from_iter([topic.to_string()]),
561 subuid,
562 options: BTreeMap::new(),
563 }])?;
564 self.c2s_tx
565 .send(Message::Text(buf.into()))
566 .map_err(|e| NtError::SendError(e.to_string()))?;
567
568 Ok(NtSubscription {
569 conn: &*self,
570 subuid,
571 })
572 }
573
574 fn unsubscribe(&self, subuid: i32) -> Result<()> {
578 let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
579 self.c2s_tx
580 .send(Message::Text(buf.into()))
581 .map_err(|e| NtError::SendError(e.to_string()))?;
582
583 Ok(())
584 }
585
586 fn read_bin_frame(buf: Vec<u8>) -> Result<(i32, u64, Data)> {
592 let mut bytes = Bytes::new(&buf);
593 let len = rmp::decode::read_array_len(&mut bytes)?;
594
595 if len == 4 {
596 let uid = rmp::decode::read_i32(&mut bytes)?;
597 let ts = rmp::decode::read_u64(&mut bytes)?;
598 let data_type = rmp::decode::read_u8(&mut bytes)?;
599 let data = Data::from(&mut bytes, data_type)
600 .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
601
602 Ok((uid, ts, data))
603 } else {
604 Err(NtError::BinaryFrameError)
605 }
606 }
607
608 fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
612 let mut buf = Vec::new();
613 rmp::encode::write_array_len(&mut buf, 4)?;
614
615 rmp::encode::write_i32(&mut buf, uid)?;
616 rmp::encode::write_uint(&mut buf, ts)?;
617 rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
618 T::encode(&mut buf, value).map_err(|_| {
619 NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
620 })?;
621
622 self.c2s_tx
623 .send(Message::Binary(buf.into()))
624 .map_err(|e| NtError::SendError(e.to_string()))?;
625
626 Ok(())
627 }
628
629 pub async fn stop(self) {
634 if let Some(ah) = self.incoming_abort.read().await.as_ref() {
637 ah.abort();
638 }
639 if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
640 ah.abort();
641 }
642 }
643}
644impl Clone for NtConn {
645 fn clone(&self) -> Self {
646 Self {
647 start_time: self.start_time.clone(),
648 offset: self.offset.clone(),
649
650 reconnect_tx: self.reconnect_tx.clone(),
651 next_id: self.next_id.clone(),
652
653 incoming_abort: self.incoming_abort.clone(),
654 outgoing_abort: self.outgoing_abort.clone(),
655 task_abort: self.task_abort.clone(),
656
657 c2s_tx: self.c2s_tx.clone(),
658
659 sock_wr: self.sock_wr.clone(),
660 sock_rd: self.sock_rd.clone(),
661
662 client_server: self.client_server.clone(),
663 server_client: self.server_client.clone(),
664
665 server_topics: self.server_topics.clone(),
666 values: self.values.clone(),
667 value_tx: self.value_tx.clone(),
668 }
669 }
670}
671
672pub struct NtTopic<'nt, T: DataWrap> {
677 conn: &'nt NtConn,
678 name: String,
679 pubuid: i32,
680 _marker: PhantomData<T>,
681}
682impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
683 pub async fn set(&mut self, val: T) -> Result<()> {
709 debug!(
710 "{name} ({data_type}): set to {val:?}",
711 name = self.name,
712 data_type = T::STRING.to_string()
713 );
714
715 if !self
716 .conn
717 .client_server
718 .read()
719 .await
720 .contains_key(&self.pubuid)
721 {
722 self.conn
723 .publish_::<T>(self.name.clone(), self.pubuid)
724 .await?;
725 }
726
727 trace!("writing binary frame");
728 match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
729 Err(_) => {}
730 _ => {}
731 }
732
733 Ok(())
734 }
735}
736impl<T: DataWrap> Drop for NtTopic<'_, T> {
737 fn drop(&mut self) {
738 if let Err(e) = self.conn.unpublish(self.pubuid) {
739 error!("Failed to unpublish topic: {}", e);
740 }
741 }
742}
743
744pub struct NtSubscription<'nt> {
749 conn: &'nt NtConn,
750 subuid: i32,
751}
752impl NtSubscription<'_> {
753 pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
754 Ok(self.conn.values.read().await.get(&self.subuid).cloned())
755 }
756}
757impl Drop for NtSubscription<'_> {
758 fn drop(&mut self) {
759 self.conn.unsubscribe(self.subuid).unwrap();
760 }
761}