chalkydri/subsystems/python/
mod.rs

1mod api;
2
3use std::{collections::HashMap, ffi::CStr, sync::Arc};
4
5use gstreamer::{Caps, Element, ElementFactory, prelude::GstBinExtManual};
6use minint::NtTopic;
7use numpy::ndarray;
8use tokio::sync::RwLock;
9
10use crate::{
11    Cfg, Nt, cameras::pipeline::Preprocessor, config, error::Error, subsystems::Subsystem,
12};
13
14use pyo3::prelude::*;
15
16use super::frame_proc_loop;
17
18#[derive(Clone)]
19pub struct PythonSubsys;
20impl Subsystem for PythonSubsys {
21    const NAME: &'static str = "python";
22
23    type Error = Error;
24    type Config = Vec<config::CustomSubsystem>;
25    type Preproc = PythonPreproc;
26    type Output = ();
27
28    async fn init() -> Result<Self, Self::Error> {
29        Ok::<Self, Self::Error>(PythonSubsys)
30    }
31
32    async fn process(
33        &self,
34        nt: minint::NtConn,
35        cam_config: config::Camera,
36        rx: tokio::sync::watch::Receiver<
37            Option<<<Self as Subsystem>::Preproc as Preprocessor>::Frame>,
38        >,
39    ) -> Result<Self::Output, Self::Error> {
40        let handle = tokio::runtime::Handle::current();
41
42        Python::with_gil(|py| -> PyResult<()> {
43            let mut modules = Vec::new();
44
45            for camera in futures_executor::block_on(Cfg.read())
46                .cameras
47                .clone()
48                .unwrap()
49            {
50                for subsys in camera.subsystems.custom {
51                    // Read custom subsystems from the configuration
52                    let subsystems = futures_executor::block_on(Cfg.read())
53                        .custom_subsystems
54                        .clone();
55                    if let Some(subsys) = subsystems.get(&subsys) {
56                        // Add a null terminator to the end of all of these things
57                        let code = [subsys.code.as_bytes(), &[0u8]].concat();
58                        let file_name = [b"custom_code.py".as_slice(), &[0u8]].concat();
59                        let module_name = [b"custom_code".as_slice(), &[0u8]].concat();
60
61                        // Convert them all to CStrs
62                        let code = CStr::from_bytes_with_nul(&code).unwrap();
63                        let file_name = CStr::from_bytes_with_nul(&file_name).unwrap();
64                        let module_name = CStr::from_bytes_with_nul(&module_name).unwrap();
65
66                        // Load the code in
67                        let module = PyModule::from_code(py, code, file_name, module_name).unwrap();
68                        // Unbind the module from Python's GIL
69                        let module = module.unbind();
70
71                        // Save It for Later :)
72                        modules.push(module);
73                    }
74                }
75            }
76
77            py.allow_threads(move || {
78                let handle_ = handle.clone();
79                handle.spawn(async move {
80                    let topics = Arc::new(RwLock::new(HashMap::<String, NtTopic<f64>>::new()));
81                    frame_proc_loop::<Self::Preproc, _>(rx, async move |buf| {
82                        if let Some(settings) = &cam_config.settings {
83                            let py_ret = Python::with_gil(|py| -> PyResult<()> {
84                                let nparr = numpy::PyArray::from_array(py, &buf);
85
86                                for module in &modules {
87                                    let ret: HashMap<String, f64> = module
88                                        .getattr(py, "run")?
89                                        .call1(py, (nparr.clone(),))?
90                                        .extract(py)?;
91
92                                    for (k, v) in ret {
93                                        let (k, v) = (k.clone(), v.clone());
94                                        let topics = topics.clone();
95                                        handle_.spawn(async move {
96                                            let topic_name = format!("/chalkydri/subsystems/{k}");
97
98                                            let mut topics = topics.write().await;
99
100                                            if let Some(topic) = topics.get_mut(&k) {
101                                                topic.set(v).await.unwrap();
102                                            } else {
103                                                let mut topic = Nt
104                                                    .publish::<f64>(topic_name.clone())
105                                                    .await
106                                                    .unwrap();
107                                                topic.set(v).await.unwrap();
108                                                topics.insert(topic_name, topic);
109                                            }
110                                        });
111                                    }
112                                }
113
114                                Ok(())
115                            });
116
117                            if let Err(err) = py_ret {
118                                error!("{err}");
119                            }
120                        }
121                    })
122                    .await;
123                });
124            });
125
126            Ok(())
127        })
128        .unwrap();
129
130        Ok::<Self::Output, Self::Error>(())
131    }
132}
133
134pub struct PythonPreproc {
135    videoconvertscale: Arc<Element>,
136    filter: Arc<Element>,
137}
138impl Preprocessor for PythonPreproc {
139    type Frame = ndarray::Array3<u8>;
140    type Subsys = PythonSubsys;
141
142    fn new(pipeline: &gstreamer::Pipeline) -> Self {
143        // Create the elements
144        let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
145        let filter = ElementFactory::make("capsfilter")
146            .property(
147                "caps",
148                &Caps::builder("video/x-raw").field("format", "BGR").build(),
149            )
150            .build()
151            .unwrap();
152
153        // Add them to the pipeline
154        pipeline.add_many([&videoconvertscale, &filter]).unwrap();
155
156        Self {
157            videoconvertscale: videoconvertscale.into(),
158            filter: filter.into(),
159        }
160    }
161    fn link(&self, src: Element, sink: Element) {
162        Element::link_many([&src, &self.videoconvertscale, &self.filter, &sink]).unwrap();
163    }
164    fn unlink(&self, src: Element, sink: Element) {
165        Element::unlink_many([&src, &self.videoconvertscale, &self.filter, &sink]);
166    }
167    fn sampler(
168        appsink: &gstreamer_app::AppSink,
169        tx: tokio::sync::watch::Sender<Option<Arc<Self::Frame>>>,
170    ) -> Result<Option<()>, Error> {
171        let sample = appsink
172            .pull_sample()
173            .map_err(|_| Error::FailedToPullSample)?;
174        let buf = sample.buffer().unwrap();
175        let buf = buf
176            .to_owned()
177            .into_mapped_buffer_readable()
178            .unwrap()
179            .to_vec();
180
181        let arr = ndarray::Array::from_shape_vec(
182            //(settings.height as usize, settings.width as usize, 3usize),
183            (1280, 720, 3),
184            buf,
185        )
186        .expect("something is really braken");
187
188        tx.send(Arc::new(Some(arr))).unwrap();
189
190        Ok(Some(()))
191    }
192}