chalkydri/cameras/
mjpeg.rs1use std::task::Poll;
2
3use actix_web::web::Bytes;
4use futures_core::Stream;
5use tokio::sync::watch;
6
7use crate::error::Error;
8
9#[derive(Clone)]
11pub struct MjpegStream {
12 pub(super) rx: watch::Receiver<Option<Vec<u8>>>,
13}
14impl Stream for MjpegStream {
15 type Item = Result<Bytes, Error>;
16
17 fn poll_next(
18 self: std::pin::Pin<&mut Self>,
19 _cx: &mut std::task::Context<'_>,
20 ) -> std::task::Poll<Option<Self::Item>> {
21 loop {
22 match self.rx.has_changed() {
23 Ok(true) => {
24 info!("working!!!");
25
26 let bytes = if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
27 [
28 b"--frame\r\nContent-Length: ",
29 frame.len().to_string().as_bytes(),
30 b"\r\nContent-Type: image/jpeg\r\n\r\n",
31 frame,
32 ]
33 .concat()
34 } else {
35 Vec::new()
36 };
37
38 return Poll::Ready(Some(Ok(bytes.into())));
39 }
40 Ok(false) => {}
41 Err(err) => {
42 error!("error getting frame: {err:?}");
43
44 return Poll::Ready(None);
45 }
46 }
47 }
48 }
49}