chalkydri/subsystems/python/
mod.rs

1mod api;
2
3use std::{any::Any, collections::HashMap, ffi::CStr, sync::Arc};
4
5use gstreamer::{
6    Caps, Element, ElementFactory,
7    prelude::{GstBinExt, GstBinExtManual},
8};
9use minint::NtTopic;
10use numpy::{
11    ndarray::{self, ShapeBuilder},
12};
13use tokio::sync::RwLock;
14
15use crate::{Cfg, Nt, config, error::Error, subsystems::Subsystem};
16
17use pyo3::prelude::*;
18
19use super::frame_proc_loop;
20
21#[derive(Clone)]
22pub struct PythonSubsys;
23impl Subsystem for PythonSubsys {
24    const NAME: &'static str = "python";
25
26    type Error = Error;
27    type Config = Vec<config::CustomSubsystem>;
28    type Output = ();
29
30    async fn init() -> Result<Self, Self::Error> {
31        Ok::<Self, Self::Error>(PythonSubsys)
32    }
33
34    async fn process(
35        &self,
36        manager: super::SubsysManager,
37        nt: minint::NtConn,
38        cam_config: crate::config::Camera,
39        rx: tokio::sync::watch::Receiver<Option<Vec<u8>>>,
40    ) -> Result<Self::Output, Self::Error> {
41        let handle = tokio::runtime::Handle::current();
42
43        Python::with_gil(|py| -> PyResult<()> {
44            let mut modules = Vec::new();
45
46            for camera in futures_executor::block_on(Cfg.read())
47                .cameras
48                .clone()
49                .unwrap()
50            {
51                for subsys in camera.subsystems.custom {
52                    // Read custom subsystems from the configuration
53                    let subsystems = futures_executor::block_on(Cfg.read())
54                        .custom_subsystems
55                        .clone();
56                    if let Some(subsys) = subsystems.get(&subsys) {
57                        // Add a null terminator to the end of all of these things
58                        let code = [subsys.code.as_bytes(), &[0u8]].concat();
59                        let file_name = [b"custom_code.py".as_slice(), &[0u8]].concat();
60                        let module_name = [b"custom_code".as_slice(), &[0u8]].concat();
61
62                        // Convert them all to CStrs
63                        let code = CStr::from_bytes_with_nul(&code).unwrap();
64                        let file_name = CStr::from_bytes_with_nul(&file_name).unwrap();
65                        let module_name = CStr::from_bytes_with_nul(&module_name).unwrap();
66
67                        // Load the code in
68                        let module = PyModule::from_code(py, code, file_name, module_name).unwrap();
69                        // Unbind the module from Python's GIL
70                        let module = module.unbind();
71
72                        // Save It for Later :)
73                        modules.push(module);
74                    }
75                }
76            }
77
78            py.allow_threads(move || {
79                let handle_ = handle.clone();
80                handle.spawn(async move {
81                    let topics = Arc::new(RwLock::new(HashMap::<String, NtTopic<f64>>::new()));
82                    frame_proc_loop(rx, async move |buf| {
83                        if let Some(settings) = &cam_config.settings {
84                            let py_ret = Python::with_gil(|py| -> PyResult<()> {
85                                let arr = ndarray::Array::from_shape_vec(
86                                    (settings.height as usize, settings.width as usize, 3usize),
87                                    buf,
88                                )
89                                .expect("something is really braken");
90                                let nparr = numpy::PyArray::from_array(py, &arr);
91
92                                for module in &modules {
93                                    let ret: HashMap<String, f64> = module
94                                        .getattr(py, "run")?
95                                        .call1(py, (nparr.clone(),))?
96                                        .extract(py)?;
97
98                                    for (k, v) in ret {
99                                        let (k, v) = (k.clone(), v.clone());
100                                        let topics = topics.clone();
101                                        handle_.spawn(async move {
102                                            let topic_name = format!("/chalkydri/subsystems/{k}");
103
104                                            let mut topics = topics.write().await;
105
106                                            if let Some(topic) = topics.get_mut(&k) {
107                                                topic.set(v).await.unwrap();
108                                            } else {
109                                                let mut topic = Nt
110                                                    .publish::<f64>(topic_name.clone())
111                                                    .await
112                                                    .unwrap();
113                                                topic.set(v).await.unwrap();
114                                                topics.insert(topic_name, topic);
115                                            }
116                                        });
117                                    }
118                                }
119
120                                Ok(())
121                            });
122
123                            if let Err(err) = py_ret {
124                                error!("{err}");
125                            }
126                        }
127                    })
128                    .await;
129                });
130            });
131
132            Ok(())
133        })
134        .unwrap();
135
136        Ok::<Self::Output, Self::Error>(())
137    }
138
139    fn preproc(
140        config: crate::config::Camera,
141        pipeline: &gstreamer::Pipeline,
142    ) -> Result<(gstreamer::Element, gstreamer::Element), Self::Error> {
143        // Create the elements
144        let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
145        let filter = ElementFactory::make("capsfilter")
146            .property(
147                "caps",
148                &Caps::builder("video/x-raw").field("format", "BGR").build(),
149            )
150            .build()
151            .unwrap();
152
153        // Add them to the pipeline
154        pipeline.add_many([&videoconvertscale, &filter]).unwrap();
155
156        // Link them
157        Element::link_many([&videoconvertscale, &filter]).unwrap();
158
159        Ok((videoconvertscale, filter))
160    }
161}