chalkydri/cameras/
mjpeg.rs

1use std::task::Poll;
2
3use actix_web::web::Bytes;
4use futures_core::Stream;
5use tokio::sync::watch;
6
7use crate::error::Error;
8
9/// Wrapper over frame buffer receiver
10#[derive(Clone)]
11pub struct MjpegStream {
12    pub(super) rx: watch::Receiver<Option<Vec<u8>>>,
13}
14impl Stream for MjpegStream {
15    type Item = Result<Bytes, Error>;
16
17    fn poll_next(
18        self: std::pin::Pin<&mut Self>,
19        _cx: &mut std::task::Context<'_>,
20    ) -> std::task::Poll<Option<Self::Item>> {
21        loop {
22            match self.rx.has_changed() {
23                Ok(true) => {
24                    info!("working!!!");
25
26                    let bytes = if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
27                        [
28                            b"--frame\r\nContent-Length: ",
29                            frame.len().to_string().as_bytes(),
30                            b"\r\nContent-Type: image/jpeg\r\n\r\n",
31                            frame,
32                        ]
33                        .concat()
34                    } else {
35                        Vec::new()
36                    };
37
38                    return Poll::Ready(Some(Ok(bytes.into())));
39                }
40                Ok(false) => {}
41                Err(err) => {
42                    error!("error getting frame: {err:?}");
43
44                    return Poll::Ready(None);
45                }
46            }
47        }
48    }
49}