conmonrs/
rpc.rs

1use crate::{
2    capnp_util,
3    child::Child,
4    container_io::{ContainerIO, SharedContainerIO},
5    container_log::ContainerLog,
6    pause::Pause,
7    server::{GenerateRuntimeArgs, Server},
8    telemetry::Telemetry,
9    version::Version,
10};
11use anyhow::{format_err, Context};
12use capnp::{capability::Promise, Error};
13use capnp_rpc::pry;
14use conmon_common::conmon_capnp::conmon;
15use std::{
16    path::{Path, PathBuf},
17    process, str,
18    time::Duration,
19};
20use tokio::time::Instant;
21use tracing::{debug, debug_span, error, Instrument};
22use uuid::Uuid;
23
24macro_rules! pry_err {
25    ($x:expr) => {
26        pry!(capnp_err!($x))
27    };
28}
29
30macro_rules! capnp_err {
31    ($x:expr) => {
32        $x.map_err(|e| Error::failed(format!("{:#}", e)))
33    };
34}
35
36macro_rules! new_root_span {
37    ($name:expr, $container_id:expr) => {
38        debug_span!(
39            $name,
40            container_id = $container_id,
41            uuid = Uuid::new_v4().to_string().as_str()
42        )
43    };
44}
45
46/// capnp_text_list takes text_list as an input and outputs list of text.
47macro_rules! capnp_text_list {
48    ($x:expr) => {
49        pry!(pry!($x).iter().collect::<Result<Vec<_>, _>>())
50    };
51}
52
53macro_rules! capnp_vec_str {
54    ($x:expr) => {
55        pry!(capnp_text_list!($x)
56            .iter()
57            .map(|s| s.to_string())
58            .collect::<Result<Vec<_>, _>>())
59    };
60}
61
62macro_rules! capnp_vec_path {
63    ($x:expr) => {
64        pry!(capnp_text_list!($x)
65            .iter()
66            .map(|s| s.to_str().map(|x| PathBuf::from(x)))
67            .collect::<Result<Vec<_>, _>>())
68    };
69}
70
71impl conmon::Server for Server {
72    /// Retrieve version information from the server.
73    fn version(
74        &mut self,
75        params: conmon::VersionParams,
76        mut results: conmon::VersionResults,
77    ) -> Promise<(), capnp::Error> {
78        debug!("Got a version request");
79        let req = pry!(pry!(params.get()).get_request());
80
81        let span = debug_span!("version", uuid = Uuid::new_v4().to_string().as_str());
82        let _enter = span.enter();
83        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
84
85        let version = Version::new(req.get_verbose());
86        let mut response = results.get().init_response();
87        response.set_process_id(process::id());
88        response.set_version(version.version());
89        response.set_tag(version.tag());
90        response.set_commit(version.commit());
91        response.set_build_date(version.build_date());
92        response.set_target(version.target());
93        response.set_rust_version(version.rust_version());
94        response.set_cargo_version(version.cargo_version());
95        response.set_cargo_tree(version.cargo_tree());
96
97        Promise::ok(())
98    }
99
100    /// Create a new container for the provided parameters.
101    fn create_container(
102        &mut self,
103        params: conmon::CreateContainerParams,
104        mut results: conmon::CreateContainerResults,
105    ) -> Promise<(), capnp::Error> {
106        let req = pry!(pry!(params.get()).get_request());
107        let id = pry!(pry!(req.get_id()).to_string());
108
109        let span = new_root_span!("create_container", id.as_str());
110        let _enter = span.enter();
111        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
112
113        let cleanup_cmd: Vec<String> = capnp_vec_str!(req.get_cleanup_cmd());
114
115        debug!("Got a create container request");
116
117        let log_drivers = pry!(req.get_log_drivers());
118        let container_log = pry_err!(ContainerLog::from(log_drivers));
119        let mut container_io =
120            pry_err!(ContainerIO::new(req.get_terminal(), container_log.clone()));
121
122        let bundle_path = Path::new(pry!(pry!(req.get_bundle_path()).to_str()));
123        let pidfile = bundle_path.join("pidfile");
124        debug!("PID file is {}", pidfile.display());
125
126        let child_reaper = self.reaper().clone();
127        let global_args = pry!(req.get_global_args());
128        let command_args = pry!(req.get_command_args());
129        let cgroup_manager = pry!(req.get_cgroup_manager());
130        let args = GenerateRuntimeArgs {
131            config: self.config(),
132            id: &id,
133            container_io: &container_io,
134            pidfile: &pidfile,
135            cgroup_manager,
136        };
137        let args = pry_err!(args.create_args(bundle_path, global_args, command_args));
138        let stdin = req.get_stdin();
139        let runtime = self.config().runtime().clone();
140        let exit_paths = capnp_vec_path!(req.get_exit_paths());
141        let oom_exit_paths = capnp_vec_path!(req.get_oom_exit_paths());
142        let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
143
144        let additional_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_additional_fds())));
145        let leak_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_leak_fds())));
146
147        Promise::from_future(
148            async move {
149                capnp_err!(container_log.write().await.init().await)?;
150
151                let (grandchild_pid, token) = capnp_err!(match child_reaper
152                    .create_child(
153                        runtime,
154                        args,
155                        stdin,
156                        &mut container_io,
157                        &pidfile,
158                        env_vars,
159                        additional_fds,
160                    )
161                    .await
162                {
163                    Err(e) => {
164                        // Attach the stderr output to the error message
165                        let (_, stderr, _) =
166                            capnp_err!(container_io.read_all_with_timeout(None).await)?;
167                        if !stderr.is_empty() {
168                            let stderr_str = str::from_utf8(&stderr)?;
169                            Err(format_err!("{:#}: {}", e, stderr_str))
170                        } else {
171                            Err(e)
172                        }
173                    }
174                    res => res,
175                })?;
176
177                // register grandchild with server
178                let io = SharedContainerIO::new(container_io);
179                let child = Child::new(
180                    id,
181                    grandchild_pid,
182                    exit_paths,
183                    oom_exit_paths,
184                    None,
185                    io,
186                    cleanup_cmd,
187                    token,
188                );
189                capnp_err!(child_reaper.watch_grandchild(child, leak_fds))?;
190
191                results
192                    .get()
193                    .init_response()
194                    .set_container_pid(grandchild_pid);
195                Ok(())
196            }
197            .instrument(debug_span!("promise")),
198        )
199    }
200
201    /// Execute a command in sync inside of a container.
202    fn exec_sync_container(
203        &mut self,
204        params: conmon::ExecSyncContainerParams,
205        mut results: conmon::ExecSyncContainerResults,
206    ) -> Promise<(), capnp::Error> {
207        let req = pry!(pry!(params.get()).get_request());
208        let id = pry!(pry!(req.get_id()).to_string());
209
210        let span = new_root_span!("exec_sync_container", id.as_str());
211        let _enter = span.enter();
212        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
213
214        let timeout = req.get_timeout_sec();
215
216        let pidfile = pry_err!(ContainerIO::temp_file_name(
217            Some(self.config().runtime_dir()),
218            "exec_sync",
219            "pid"
220        ));
221
222        debug!("Got exec sync container request with timeout {}", timeout);
223
224        let runtime = self.config().runtime().clone();
225        let child_reaper = self.reaper().clone();
226
227        let logger = ContainerLog::new();
228        let mut container_io = pry_err!(ContainerIO::new(req.get_terminal(), logger));
229
230        let command = pry!(req.get_command());
231        let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
232        let cgroup_manager = pry!(req.get_cgroup_manager());
233
234        let args = GenerateRuntimeArgs {
235            config: self.config(),
236            id: &id,
237            container_io: &container_io,
238            pidfile: &pidfile,
239            cgroup_manager,
240        };
241        let args = pry_err!(args.exec_sync_args(command));
242
243        Promise::from_future(
244            async move {
245                match child_reaper
246                    .create_child(
247                        &runtime,
248                        &args,
249                        false,
250                        &mut container_io,
251                        &pidfile,
252                        env_vars,
253                        vec![],
254                    )
255                    .await
256                {
257                    Ok((grandchild_pid, token)) => {
258                        let time_to_timeout = if timeout > 0 {
259                            Some(Instant::now() + Duration::from_secs(timeout))
260                        } else {
261                            None
262                        };
263                        let mut resp = results.get().init_response();
264                        // register grandchild with server
265                        let io = SharedContainerIO::new(container_io);
266                        let io_clone = io.clone();
267                        let child = Child::new(
268                            id,
269                            grandchild_pid,
270                            vec![],
271                            vec![],
272                            time_to_timeout,
273                            io_clone,
274                            vec![],
275                            token.clone(),
276                        );
277
278                        let mut exit_rx = capnp_err!(child_reaper.watch_grandchild(child, vec![]))?;
279
280                        let (stdout, stderr, timed_out) =
281                            capnp_err!(io.read_all_with_timeout(time_to_timeout).await)?;
282
283                        let exit_data = capnp_err!(exit_rx.recv().await)?;
284                        resp.set_stdout(&stdout);
285                        resp.set_stderr(&stderr);
286                        resp.set_exit_code(*exit_data.exit_code());
287                        if timed_out || exit_data.timed_out {
288                            resp.set_timed_out(true);
289                        }
290                    }
291                    Err(e) => {
292                        error!("Unable to create child: {:#}", e);
293                        let mut resp = results.get().init_response();
294                        resp.set_exit_code(-2);
295                    }
296                }
297                Ok(())
298            }
299            .instrument(debug_span!("promise")),
300        )
301    }
302
303    /// Attach to a running container.
304    fn attach_container(
305        &mut self,
306        params: conmon::AttachContainerParams,
307        _: conmon::AttachContainerResults,
308    ) -> Promise<(), capnp::Error> {
309        let req = pry!(pry!(params.get()).get_request());
310        let id = pry_err!(pry_err!(req.get_id()).to_str());
311
312        let span = new_root_span!("attach_container", id);
313        let _enter = span.enter();
314        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
315
316        debug!("Got a attach container request",);
317
318        let exec_session_id = pry_err!(pry_err!(req.get_exec_session_id()).to_str());
319        if !exec_session_id.is_empty() {
320            debug!("Using exec session id {}", exec_session_id);
321        }
322
323        let socket_path = pry!(pry!(req.get_socket_path()).to_string());
324        let child = pry_err!(self.reaper().get(id));
325        let stop_after_stdin_eof = req.get_stop_after_stdin_eof();
326
327        Promise::from_future(
328            async move {
329                capnp_err!(
330                    child
331                        .io()
332                        .attach()
333                        .await
334                        .add(&socket_path, child.token().clone(), stop_after_stdin_eof)
335                        .await
336                )
337            }
338            .instrument(debug_span!("promise")),
339        )
340    }
341
342    /// Rotate all log drivers for a running container.
343    fn reopen_log_container(
344        &mut self,
345        params: conmon::ReopenLogContainerParams,
346        _: conmon::ReopenLogContainerResults,
347    ) -> Promise<(), capnp::Error> {
348        let req = pry!(pry!(params.get()).get_request());
349        let id = pry_err!(pry_err!(req.get_id()).to_str());
350
351        let span = new_root_span!("reopen_log_container", id);
352        let _enter = span.enter();
353        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
354
355        debug!("Got a reopen container log request");
356
357        let child = pry_err!(self.reaper().get(id));
358
359        Promise::from_future(
360            async move { capnp_err!(child.io().logger().await.write().await.reopen().await) }
361                .instrument(debug_span!("promise")),
362        )
363    }
364
365    /// Adjust the window size of a container running inside of a terminal.
366    fn set_window_size_container(
367        &mut self,
368        params: conmon::SetWindowSizeContainerParams,
369        _: conmon::SetWindowSizeContainerResults,
370    ) -> Promise<(), capnp::Error> {
371        let req = pry!(pry!(params.get()).get_request());
372        let id = pry_err!(pry_err!(req.get_id()).to_str());
373
374        let span = new_root_span!("set_window_size_container", id);
375        let _enter = span.enter();
376        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
377
378        debug!("Got a set window size container request");
379
380        let child = pry_err!(self.reaper().get(id));
381        let width = req.get_width();
382        let height = req.get_height();
383
384        Promise::from_future(
385            async move { capnp_err!(child.io().resize(width, height).await) }
386                .instrument(debug_span!("promise")),
387        )
388    }
389
390    /// Create a new set of namespaces.
391    fn create_namespaces(
392        &mut self,
393        params: conmon::CreateNamespacesParams,
394        mut results: conmon::CreateNamespacesResults,
395    ) -> Promise<(), capnp::Error> {
396        debug!("Got a create namespaces request");
397        let req = pry!(pry!(params.get()).get_request());
398        let pod_id = pry_err!(pry_err!(req.get_pod_id()).to_str());
399
400        if pod_id.is_empty() {
401            return Promise::err(Error::failed("no pod ID provided".into()));
402        }
403
404        let span = new_root_span!("create_namespaces", pod_id);
405        let _enter = span.enter();
406        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
407
408        let pause = pry_err!(Pause::init_shared(
409            pry!(pry!(req.get_base_path()).to_str()),
410            pod_id,
411            pry!(req.get_namespaces()),
412            capnp_vec_str!(req.get_uid_mappings()),
413            capnp_vec_str!(req.get_gid_mappings()),
414        ));
415
416        let response = results.get().init_response();
417        let mut namespaces =
418            response.init_namespaces(pry_err!(pause.namespaces().len().try_into()));
419
420        for (idx, namespace) in pause.namespaces().iter().enumerate() {
421            let mut ns = namespaces.reborrow().get(pry_err!(idx.try_into()));
422            ns.set_path(
423                namespace
424                    .path(pause.base_path(), pod_id)
425                    .display()
426                    .to_string(),
427            );
428            ns.set_type(namespace.to_capnp_namespace());
429        }
430
431        Promise::ok(())
432    }
433
434    fn start_fd_socket(
435        &mut self,
436        params: conmon::StartFdSocketParams,
437        mut results: conmon::StartFdSocketResults,
438    ) -> Promise<(), capnp::Error> {
439        let req = pry!(pry!(params.get()).get_request());
440
441        let span = debug_span!(
442            "start_fd_socket",
443            uuid = Uuid::new_v4().to_string().as_str()
444        );
445        let _enter = span.enter();
446        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
447
448        debug!("Got a start fd socket request");
449
450        let path = self.config().fd_socket();
451        let fd_socket = self.fd_socket().clone();
452
453        Promise::from_future(
454            async move {
455                let path = capnp_err!(fd_socket.start(path).await)?;
456
457                let mut resp = results.get().init_response();
458                resp.set_path(capnp_err!(path.to_str().context("fd_socket path to str"))?);
459
460                Ok(())
461            }
462            .instrument(debug_span!("promise")),
463        )
464    }
465}