chalkydri/cameras/
mjpeg.rs

1use std::{sync::Arc, task::Poll};
2
3use actix_web::web::Bytes;
4use futures_core::Stream;
5use gstreamer::{
6    Caps, Element, ElementFactory, FlowSuccess, Pipeline, glib::object::Cast,
7    prelude::GstBinExtManual,
8};
9use gstreamer_app::{AppSink, AppSinkCallbacks};
10use tokio::sync::watch;
11
12use crate::{error::Error, subsystems::NoopSubsys};
13
14use super::pipeline::Preprocessor;
15
16// /// Wrapper over frame buffer receiver
17
18#[derive(Clone)]
19pub struct MjpegProc {
20    videorate: Arc<Element>,
21    videoconvertscale: Arc<Element>,
22    filter: Arc<Element>,
23    tx: watch::Sender<Option<Vec<u8>>>,
24    rx: watch::Receiver<Option<Vec<u8>>>,
25}
26impl Preprocessor for MjpegProc {
27    type Subsys = NoopSubsys<Self>;
28    type Frame = Vec<u8>;
29
30    fn new(pipeline: &Pipeline) -> Self {
31        let videorate = ElementFactory::make("videorate")
32            .property("max-rate", 20)
33            .property("drop-only", true)
34            .build()
35            .unwrap();
36
37        let videoconvertscale = ElementFactory::make("videoconvertscale")
38            .property_from_str("method", "nearest-neighbour")
39            .build()
40            .unwrap();
41
42        let filter = ElementFactory::make("capsfilter")
43            .property(
44                "caps",
45                &Caps::builder("video/x-raw")
46                    .field("width", &640)
47                    .field("height", &480)
48                    .field("format", "RGB")
49                    .build(),
50            )
51            .build()
52            .unwrap();
53
54        pipeline
55            .add_many([&videorate, &videoconvertscale, &filter])
56            .unwrap();
57
58        let (tx, rx) = watch::channel(None);
59
60        MjpegProc {
61            videorate: videorate.into(),
62            videoconvertscale: videoconvertscale.into(),
63            filter: filter.into(),
64            tx,
65            rx,
66        }
67    }
68    fn link(&self, src: Element, sink: Element) {
69        Element::link_many([
70            &src,
71            &self.videorate,
72            &self.videoconvertscale,
73            &self.filter,
74            &sink,
75        ])
76        .unwrap();
77    }
78    fn unlink(&self, src: Element, sink: Element) {
79        Element::unlink_many([
80            &src,
81            &self.videorate,
82            &self.videoconvertscale,
83            &self.filter,
84            &sink,
85        ]);
86    }
87    fn sampler(
88        appsink: &AppSink,
89        tx: watch::Sender<Option<Arc<Self::Frame>>>,
90    ) -> Result<Option<()>, Error> {
91        let sample = appsink
92            .pull_sample()
93            .map_err(|_| Error::FailedToPullSample)
94            .unwrap();
95        match sample.buffer() {
96            Some(buf) => {
97                let jpeg = turbojpeg::compress(
98                    turbojpeg::Image {
99                        width: 640,
100                        height: 480,
101                        pitch: 640 * 3,
102                        format: turbojpeg::PixelFormat::RGB,
103                        pixels: buf
104                            .to_owned()
105                            .into_mapped_buffer_readable()
106                            .unwrap()
107                            .to_vec()
108                            .as_slice(),
109                    },
110                    50,
111                    turbojpeg::Subsamp::None,
112                )
113                .unwrap();
114                while let Err(err) = tx.send(Some(jpeg.to_vec().into())) {
115                    error!("error sending frame: {err:?}");
116                }
117            }
118            None => {
119                error!("failed to get buffer");
120            }
121        }
122        Ok(Some(()))
123    }
124}
125impl Stream for MjpegProc {
126    type Item = Result<Bytes, Error>;
127
128    fn poll_next(
129        self: std::pin::Pin<&mut Self>,
130        _cx: &mut std::task::Context<'_>,
131    ) -> std::task::Poll<Option<Self::Item>> {
132        loop {
133            match self.rx.has_changed() {
134                Ok(true) => {
135                    info!("working!!!");
136
137                    let bytes =
138                        if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
139                            [
140                                b"--frame\r\nContent-Length: ",
141                                frame.len().to_string().as_bytes(),
142                                b"\r\nContent-Type: image/jpeg\r\n\r\n",
143                                frame,
144                            ]
145                            .concat()
146                        } else {
147                            Vec::new()
148                        };
149
150                    return Poll::Ready(Some(Ok(bytes.into())));
151                }
152                Ok(false) => {}
153                Err(err) => {
154                    error!("error getting frame: {err:?}");
155
156                    return Poll::Ready(None);
157                }
158            }
159        }
160    }
161}