1#[macro_use]
14extern crate tracing;
15extern crate rmp;
16extern crate serde;
17extern crate serde_json;
18extern crate tokio;
19extern crate tokio_tungstenite;
20extern crate quanta;
21
22mod datatype;
23mod error;
24mod messages;
25
26pub use error::{NtError, Result};
27use tokio::time::{interval, Interval};
28
29use std::collections::{BTreeMap, HashMap};
30use std::marker::PhantomData;
31use std::sync::Arc;
32use std::time::Duration;
33
34use datatype::{BsInt, Data, DataWrap};
35use futures_util::stream::{SplitSink, SplitStream};
36use messages::*;
37
38use futures_util::{SinkExt, StreamExt};
39use rmp::decode::Bytes;
40use tokio::net::TcpStream;
41use tokio::sync::{watch, RwLock};
42use tokio::{
43 sync::{mpsc, Mutex},
44 task::AbortHandle,
45};
46use tokio_tungstenite::tungstenite::{
47 client::IntoClientRequest,
48 http::{header, HeaderValue},
49 Error as TungsteniteError, Message,
50};
51use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
52use quanta::{Clock, Instant};
53
54async fn reconnector(
57 mut rx: mpsc::Receiver<()>,
58 server: String,
59 client_ident: String,
60 sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
61 sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
62) -> Result<()> {
63 let mut req = format!("ws://{server}:5810/nt/{client_ident}").into_client_request()?;
65
66 req.headers_mut().append(
68 header::SEC_WEBSOCKET_PROTOCOL,
69 HeaderValue::from_static("v4.1.networktables.first.wpi.edu"),
70 );
71
72 loop {
73 rx.recv().await;
74 {
75 let mut sock_rd = sock_rd.write().await;
76 let mut sock_wr = sock_wr.write().await;
77
78 *sock_rd = None;
79 if let Some(sock_wr) = sock_wr.as_mut() {
80 sock_wr.close().await.unwrap();
81 }
82
83 loop {
85 match tokio_tungstenite::connect_async(req.clone()).await {
86 Ok((sock, _)) => {
87 let (sock_wr_, sock_rd_) = sock.split();
88
89 {
90 *sock_rd = Some(sock_rd_);
91 *sock_wr = Some(sock_wr_);
92 }
93
94 info!("connected");
95 break;
96 }
97 Err(err) => {
98 error!("failed to connect: {err:?}");
99 }
100 }
101 tokio::time::sleep(Duration::from_millis(100)).await;
102 }
103 }
104
105 while let Some(_) = rx.recv().await {}
107 }
108}
109
110pub struct NtConn {
112 start_time: Instant,
113 offset: Arc<RwLock<Duration>>,
114
115 reconnect_tx: mpsc::Sender<()>,
116
117 next_id: Arc<Mutex<i32>>,
119
120 c2s_tx: mpsc::UnboundedSender<Message>,
122
123 incoming_abort: Arc<RwLock<Option<AbortHandle>>>,
125 outgoing_abort: Arc<RwLock<Option<AbortHandle>>>,
127 task_abort: Arc<RwLock<Option<AbortHandle>>>,
128
129 sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
130 sock_wr: Arc<RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
131
132 server_client: Arc<RwLock<HashMap<i32, i32>>>,
133 client_server: Arc<RwLock<HashMap<i32, i32>>>,
134
135 server_topics: Arc<RwLock<HashMap<String, (i32, String)>>>,
137
138 values: Arc<RwLock<HashMap<i32, watch::Receiver<(u64, Data)>>>>,
139 value_tx: Arc<RwLock<HashMap<i32, watch::Sender<(u64, Data)>>>>,
140}
141impl NtConn {
142 pub async fn new(server: impl Into<String>, client_ident: impl Into<String>) -> Result<Self> {
163 let server = server.into();
164 let client_ident = client_ident.into();
165
166 let client_server = Arc::new(RwLock::new(HashMap::new()));
167 let server_client = Arc::new(RwLock::new(HashMap::new()));
168 let sock_wr: Arc<
169 RwLock<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
170 > = Arc::new(RwLock::new(None));
171 let sock_rd: Arc<RwLock<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>> =
172 Arc::new(RwLock::new(None));
173
174 let (c2s_tx, c2s_rx) = mpsc::unbounded_channel::<Message>();
176
177 let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(1);
178 let sock_rd_ = sock_rd.clone();
179 let sock_wr_ = sock_wr.clone();
180 tokio::spawn(async move {
181 reconnector(reconnect_rx, server, client_ident, sock_rd_, sock_wr_)
182 .await
183 .unwrap();
184 });
185
186 let server_topics = Arc::new(RwLock::new(HashMap::new()));
187 let values = Arc::new(RwLock::new(HashMap::new()));
188 let value_tx = Arc::new(RwLock::new(HashMap::new()));
189
190 let start_time = Instant::now();
191
192 let conn = Self {
193 start_time,
194 offset: Arc::new(RwLock::new(Duration::ZERO)),
195
196 reconnect_tx,
197 next_id: Arc::new(Mutex::const_new(0)),
198
199 client_server,
200 server_client,
201
202 c2s_tx,
203
204 sock_rd,
205 sock_wr,
206
207 incoming_abort: Arc::new(RwLock::new(None)),
208 outgoing_abort: Arc::new(RwLock::new(None)),
209 task_abort: Arc::new(RwLock::new(None)),
210
211 server_topics,
212 values,
213 value_tx,
214 };
215
216 trace!("initializing background event loop...");
217 conn.init_background_event_loops(c2s_rx).await;
218 trace!("initialized background event loop...");
219
220 conn.reconnect_tx.send(()).await.unwrap();
221
222 Ok(conn)
223 }
224
225 async fn init_background_event_loops(&self, mut c2s_rx: mpsc::UnboundedReceiver<Message>) {
227 let mut incoming_abort = self.incoming_abort.write().await;
230 let mut outgoing_abort = self.outgoing_abort.write().await;
231 let mut task_abort = self.task_abort.write().await;
232
233 if (*incoming_abort).is_none() {
234 let conn = self.clone();
235
236 let jh = tokio::spawn(async move {
237 loop {
238 if let Some(sock_rd) = conn.sock_rd.write().await.as_mut() {
239 while let Some(msg) = sock_rd.next().await {
240 match conn.handle_incoming_msg(msg).await {
241 Err(NtError::NeedReconnect) => {
242 conn.reconnect_tx.send(()).await;
243 break;
244 }
245 _ => {}
246 }
247 }
248 }
249
250 tokio::task::yield_now().await;
251 }
252 });
253
254 *incoming_abort = Some(jh.abort_handle());
255 }
256
257 if (*outgoing_abort).is_none() {
259 let conn = self.clone();
260
261 let jh = tokio::spawn(async move {
262 loop {
263 while let Some(outgoing) = c2s_rx.recv().await {
264 trace!("sending {outgoing:?}");
265
266 if let Some(sock_wr) = conn.sock_wr.write().await.as_mut() {
267 match sock_wr.send(outgoing).await {
268 Ok(()) => {
269 trace!("sent outgoing message successfully");
270 }
271 Err(TungsteniteError::ConnectionClosed)
272 | Err(TungsteniteError::Io(_))
273 | Err(TungsteniteError::AlreadyClosed)
274 | Err(TungsteniteError::Protocol(_)) => {
275 conn.reconnect_tx.send(()).await;
276 break;
277 }
278 Err(err) => {
279 error!("error writing outgoing message: {err:?}");
280 }
281 }
282
283 trace!("sent");
284 }
285 }
286
287 tokio::task::yield_now().await;
288 }
289 });
290
291 *outgoing_abort = Some(jh.abort_handle());
292 }
293
294 if (*task_abort).is_none() {
295 let conn = self.clone();
296
297 let jh = tokio::spawn(async move {
298 let mut ping_interval = interval(Duration::from_millis(500));
299 let mut time_correct_interval = interval(Duration::from_secs(3));
300
301 loop {
302 tokio::select! {
303 _ = ping_interval.tick() => conn.ping().await.unwrap(),
304 _ = time_correct_interval.tick() => conn.time_correct().await.unwrap(),
305 }
306
307 tokio::task::yield_now().await;
329 }
330 });
331
332 *task_abort = Some(jh.abort_handle());
333 }
334 }
335
336 async fn ping(&self) -> Result<()> {
337 self.c2s_tx.send(Message::Ping(tungstenite::Bytes::from_static(b"sigma sigma boy"))).unwrap();
338
339 Ok(())
340 }
341 async fn time_correct(&self) -> Result<()> {
342 self.write_bin_frame::<BsInt>(-1, Duration::ZERO.as_micros() as u64, (self.start_time.elapsed().as_micros() as u64).into()).unwrap();
343
344 Ok(())
345 }
346
347 async fn handle_incoming_msg(
348 &self,
349 msg: core::result::Result<Message, TungsteniteError>,
350 ) -> Result<()> {
351 match msg {
352 Ok(Message::Text(json)) => {
353 let messages: Vec<ServerMsg> = serde_json::from_str(&json).unwrap();
354
355 for msg in messages {
356 match msg {
357 ServerMsg::Announce {
358 name,
359 id,
360 r#type,
361 pubuid,
362 ..
363 } => {
364 self.server_topics
366 .write()
367 .await
368 .insert(name.clone(), (id, r#type.clone()));
369
370 if let Some(pubuid) = pubuid {
371 (*self.client_server.write().await).insert(pubuid, id);
372 (*self.server_client.write().await).insert(id, pubuid);
373
374 debug!(
375 "{name} ({type}): published successfully with topic id {id}"
376 );
377 } else {
378 debug!("{name} ({type}): announced with topic id {id}");
379 }
380 }
381 ServerMsg::Unannounce {
382 name,
383 id: server_id,
384 } => {
385 if let Some(pubuid) = self.server_client.read().await.get(&server_id) {
386 self.client_server.write().await.remove(pubuid);
387 }
388 self.server_client.write().await.remove(&server_id);
389
390 debug!("{name}: unannounced");
391 }
392 _ => unimplemented!(),
393 }
394 }
395 }
396 Ok(Message::Binary(bin)) => match Self::read_bin_frame(bin.to_vec()) {
397 Ok((topic_id, timestamp, data)) => {
398 trace!(
399 "received binary frame with topic_id {}, ts={}",
400 topic_id,
401 timestamp
402 );
403
404 if topic_id == -1 {
405 let curr_ts = self.start_time.elapsed();
406 if let Data::Int(BsInt::U64(pre_ts)) = data {
407 let rtt = curr_ts - Duration::from_micros(pre_ts);
408 *self.offset.write().await = Duration::from_micros(timestamp) + (rtt/2);
409 }
410 }
411
412
413 if let Some(value_tx) = self.value_tx.write().await.get(&(topic_id as i32)) {
414 if let Err(err) = value_tx.send((timestamp, data)) {
415 error!("failed to send value to subscriber: {err:?}");
416 }
417 }
418 }
419 Err(err) => {
420 error!("Failed to parse binary frame: {}", err);
421 }
422 },
423 Ok(msg) => warn!("unhandled incoming message: {msg:?}"),
424 Err(TungsteniteError::ConnectionClosed)
425 | Err(TungsteniteError::Io(_))
426 | Err(TungsteniteError::AlreadyClosed)
427 | Err(TungsteniteError::Protocol(_)) => {
428 return Err(NtError::NeedReconnect);
429 }
430 Err(err) => error!("error reading incoming message: {err:?}"),
431 }
432
433 Ok(())
434 }
435
436 async fn next_id(&self) -> i32 {
437 let next = &mut *self.next_id.lock().await;
438 let curr = (*next).clone();
439 *next += 1;
440
441 curr
442 }
443
444 pub async fn publish<T: DataWrap>(&self, name: impl Into<String>) -> Result<NtTopic<T>> {
473 let pubuid = self.next_id().await;
474 let name = name.into();
475
476 trace!("publishing {name} with pubuid {pubuid}");
477
478 self.publish_::<T>(name.clone(), pubuid).await?;
479
480 Ok(NtTopic {
481 conn: &*self,
482 name,
483 pubuid,
484 _marker: PhantomData,
485 })
486 }
487 async fn publish_<T: DataWrap>(&self, name: String, pubuid: i32) -> Result<()> {
488 trace!("publishing {name} with pubuid {pubuid}");
489
490 let buf = serde_json::to_string(&[ClientMsg::Publish {
491 pubuid,
492 name: name.clone(),
493 r#type: T::STRING.to_string(),
494 properties: Some(PublishProps {
495 persistent: Some(false),
496 retained: Some(false),
497 }),
498 }])?;
499
500 self.c2s_tx
501 .send(Message::Text(buf.into()))
502 .map_err(|e| NtError::SendError(e.to_string()))
503 .unwrap();
504
505 debug!(
506 "{name} ({data_type}): publishing with pubuid {pubuid}",
507 data_type = T::STRING.to_string()
508 );
509
510 Ok(())
511 }
512
513 fn unpublish(&self, pubuid: i32) -> Result<()> {
517 let buf = serde_json::to_string(&[ClientMsg::Unpublish { pubuid }])?;
518 self.c2s_tx
519 .send(Message::Text(buf.into()))
520 .map_err(|e| NtError::SendError(e.to_string()))?;
521
522 Ok(())
523 }
524
525 pub async fn subscribe(&self, topic: &str) -> Result<NtSubscription> {
548 let subuid = self.next_id().await;
549
550 let buf = serde_json::to_string(&[ClientMsg::Subscribe {
551 topics: Vec::from_iter([topic.to_string()]),
552 subuid,
553 options: BTreeMap::new(),
554 }])?;
555 self.c2s_tx
556 .send(Message::Text(buf.into()))
557 .map_err(|e| NtError::SendError(e.to_string()))?;
558
559 Ok(NtSubscription {
560 conn: &*self,
561 subuid,
562 })
563 }
564
565 fn unsubscribe(&self, subuid: i32) -> Result<()> {
569 let buf = serde_json::to_string(&[ClientMsg::Unsubscribe { subuid }])?;
570 self.c2s_tx
571 .send(Message::Text(buf.into()))
572 .map_err(|e| NtError::SendError(e.to_string()))?;
573
574 Ok(())
575 }
576
577 fn read_bin_frame(buf: Vec<u8>) -> Result<(i32, u64, Data)> {
583 let mut bytes = Bytes::new(&buf);
584 let len = rmp::decode::read_array_len(&mut bytes)?;
585
586 if len == 4 {
587 let uid = rmp::decode::read_i32(&mut bytes)?;
588 let ts = rmp::decode::read_u64(&mut bytes)?;
589 let data_type = rmp::decode::read_u8(&mut bytes)?;
590 let data = Data::from(&mut bytes, data_type)
591 .map_err(|_| NtError::MessagePackError("Failed to parse data value".to_string()))?;
592
593 Ok((uid, ts, data))
594 } else {
595 Err(NtError::BinaryFrameError)
596 }
597 }
598
599 fn write_bin_frame<T: DataWrap>(&self, uid: i32, ts: u64, value: T) -> Result<()> {
603 let mut buf = Vec::new();
604 rmp::encode::write_array_len(&mut buf, 4)?;
605
606 rmp::encode::write_i32(&mut buf, uid)?;
607 rmp::encode::write_uint(&mut buf, ts)?;
608 rmp::encode::write_u8(&mut buf, T::MSGPCK)?;
609 T::encode(&mut buf, value).map_err(|_| {
610 NtError::MessagePackError("Failed to encode value to MessagePack format.".to_string())
611 })?;
612
613 self.c2s_tx
614 .send(Message::Binary(buf.into()))
615 .map_err(|e| NtError::SendError(e.to_string()))?;
616
617 Ok(())
618 }
619
620 pub async fn stop(self) {
625 if let Some(ah) = self.incoming_abort.read().await.as_ref() {
628 ah.abort();
629 }
630 if let Some(ah) = self.outgoing_abort.read().await.as_ref() {
631 ah.abort();
632 }
633 }
634}
635impl Clone for NtConn {
636 fn clone(&self) -> Self {
637 Self {
638 start_time: self.start_time.clone(),
639 offset: self.offset.clone(),
640
641 reconnect_tx: self.reconnect_tx.clone(),
642 next_id: self.next_id.clone(),
643
644 incoming_abort: self.incoming_abort.clone(),
645 outgoing_abort: self.outgoing_abort.clone(),
646 task_abort: self.task_abort.clone(),
647
648 c2s_tx: self.c2s_tx.clone(),
649
650 sock_wr: self.sock_wr.clone(),
651 sock_rd: self.sock_rd.clone(),
652
653 client_server: self.client_server.clone(),
654 server_client: self.server_client.clone(),
655
656 server_topics: self.server_topics.clone(),
657 values: self.values.clone(),
658 value_tx: self.value_tx.clone(),
659 }
660 }
661}
662
663pub struct NtTopic<'nt, T: DataWrap> {
668 conn: &'nt NtConn,
669 name: String,
670 pubuid: i32,
671 _marker: PhantomData<T>,
672}
673impl<T: DataWrap + std::fmt::Debug> NtTopic<'_, T> {
674 pub async fn set(&mut self, val: T) -> Result<()> {
700 debug!(
701 "{name} ({data_type}): set to {val:?}",
702 name = self.name,
703 data_type = T::STRING.to_string()
704 );
705
706 if !self
707 .conn
708 .client_server
709 .read()
710 .await
711 .contains_key(&self.pubuid)
712 {
713 self.conn
714 .publish_::<T>(self.name.clone(), self.pubuid)
715 .await?;
716 }
717
718 trace!("writing binary frame");
719 match (*self.conn).write_bin_frame(self.pubuid, 0, val) {
720 Err(_) => {}
721 _ => {}
722 }
723
724 Ok(())
725 }
726}
727impl<T: DataWrap> Drop for NtTopic<'_, T> {
728 fn drop(&mut self) {
729 if let Err(e) = self.conn.unpublish(self.pubuid) {
730 error!("Failed to unpublish topic: {}", e);
731 }
732 }
733}
734
735pub struct NtSubscription<'nt> {
740 conn: &'nt NtConn,
741 subuid: i32,
742}
743impl NtSubscription<'_> {
744 pub async fn get(&self) -> Result<Option<watch::Receiver<(u64, Data)>>> {
745 Ok(self.conn.values.read().await.get(&self.subuid).cloned())
746 }
747}
748impl Drop for NtSubscription<'_> {
749 fn drop(&mut self) {
750 self.conn.unsubscribe(self.subuid).unwrap();
751 }
752}