Skip to main content

conmonrs/
rpc.rs

1use crate::{
2    capnp_util,
3    child::Child,
4    container_io::{ContainerIO, SharedContainerIO},
5    container_log::ContainerLog,
6    pause::Pause,
7    server::{GenerateRuntimeArgs, Server},
8    telemetry::Telemetry,
9    version::Version,
10};
11use anyhow::{Context, format_err};
12use capnp::{Error, capability::Promise};
13use capnp_rpc::pry;
14use conmon_common::conmon_capnp::conmon;
15use std::{
16    path::{Path, PathBuf},
17    process,
18    rc::Rc,
19    str,
20    time::Duration,
21};
22use tokio::time::Instant;
23use tracing::{Instrument, debug, debug_span, error};
24use uuid::Uuid;
25
26macro_rules! pry_err {
27    ($x:expr_2021) => {
28        pry!(capnp_err!($x))
29    };
30}
31
32macro_rules! capnp_err {
33    ($x:expr_2021) => {
34        $x.map_err(|e| Error::failed(format!("{:#}", e)))
35    };
36}
37
38macro_rules! new_root_span {
39    ($name:expr_2021, $container_id:expr_2021) => {
40        debug_span!(
41            $name,
42            container_id = $container_id,
43            uuid = %Uuid::new_v4()
44        )
45    };
46}
47
48/// capnp_text_list takes text_list as an input and outputs list of text.
49macro_rules! capnp_text_list {
50    ($x:expr_2021) => {
51        pry!(pry!($x).iter().collect::<Result<Vec<_>, _>>())
52    };
53}
54
55macro_rules! capnp_vec_str {
56    ($x:expr_2021) => {
57        pry!(
58            capnp_text_list!($x)
59                .iter()
60                .map(|s| s.to_string())
61                .collect::<Result<Vec<_>, _>>()
62        )
63    };
64}
65
66macro_rules! capnp_vec_path {
67    ($x:expr_2021) => {
68        pry!(
69            capnp_text_list!($x)
70                .iter()
71                .map(|s| s.to_str().map(|x| PathBuf::from(x)))
72                .collect::<Result<Vec<_>, _>>()
73        )
74    };
75}
76
77#[allow(refining_impl_trait_reachable)]
78impl conmon::Server for Server {
79    /// Retrieve version information from the server.
80    fn version(
81        self: Rc<Server>,
82        params: conmon::VersionParams,
83        mut results: conmon::VersionResults,
84    ) -> Promise<(), capnp::Error> {
85        debug!("Got a version request");
86        let req = pry!(pry!(params.get()).get_request());
87
88        let span = debug_span!("version", uuid = %Uuid::new_v4());
89        let _enter = span.enter();
90        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
91
92        let version = Version::new(req.get_verbose());
93        let mut response = results.get().init_response();
94        response.set_process_id(process::id());
95        response.set_version(version.version());
96        response.set_tag(version.tag());
97        response.set_commit(version.commit());
98        response.set_build_date(version.build_date());
99        response.set_target(version.target());
100        response.set_rust_version(version.rust_version());
101        response.set_cargo_version(version.cargo_version());
102        response.set_cargo_tree(version.cargo_tree());
103
104        Promise::ok(())
105    }
106
107    /// Create a new container for the provided parameters.
108    fn create_container(
109        self: Rc<Server>,
110        params: conmon::CreateContainerParams,
111        mut results: conmon::CreateContainerResults,
112    ) -> Promise<(), capnp::Error> {
113        let req = pry!(pry!(params.get()).get_request());
114        let id = pry!(pry!(req.get_id()).to_string());
115
116        let span = new_root_span!("create_container", id.as_str());
117        let _enter = span.enter();
118        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
119
120        let cleanup_cmd: Vec<String> = capnp_vec_str!(req.get_cleanup_cmd());
121
122        debug!("Got a create container request");
123
124        let log_drivers = pry!(req.get_log_drivers());
125        let container_log = pry_err!(ContainerLog::from(log_drivers));
126        let mut container_io =
127            pry_err!(ContainerIO::new(req.get_terminal(), container_log.clone()));
128
129        let bundle_path = Path::new(pry!(pry!(req.get_bundle_path()).to_str()));
130        let pidfile = bundle_path.join("pidfile");
131        debug!("PID file is {}", pidfile.display());
132
133        let child_reaper = self.reaper().clone();
134        let global_args = pry!(req.get_global_args());
135        let command_args = pry!(req.get_command_args());
136        let cgroup_manager = pry!(req.get_cgroup_manager());
137        let args = GenerateRuntimeArgs {
138            config: self.config(),
139            id: &id,
140            container_io: &container_io,
141            pidfile: &pidfile,
142            cgroup_manager,
143        };
144        let args = pry_err!(args.create_args(bundle_path, global_args, command_args));
145        let stdin = req.get_stdin();
146        let runtime = self.config().runtime().clone();
147        let exit_paths = capnp_vec_path!(req.get_exit_paths());
148        let oom_exit_paths = capnp_vec_path!(req.get_oom_exit_paths());
149        let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
150
151        let additional_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_additional_fds())));
152        let leak_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_leak_fds())));
153
154        Promise::from_future(
155            async move {
156                capnp_err!(container_log.write().await.init().await)?;
157
158                let (grandchild_pid, token) = capnp_err!(match child_reaper
159                    .create_child(
160                        runtime,
161                        args,
162                        stdin,
163                        &mut container_io,
164                        &pidfile,
165                        env_vars,
166                        additional_fds,
167                    )
168                    .await
169                {
170                    Err(e) => {
171                        // Attach the stderr output to the error message
172                        let (_, stderr, _) =
173                            capnp_err!(container_io.read_all_with_timeout(None).await)?;
174                        if !stderr.is_empty() {
175                            let stderr_str = str::from_utf8(&stderr)?;
176                            Err(format_err!("{:#}: {}", e, stderr_str))
177                        } else {
178                            Err(e)
179                        }
180                    }
181                    res => res,
182                })?;
183
184                // register grandchild with server
185                let io = SharedContainerIO::new(container_io);
186                let child = Child::new(
187                    id,
188                    grandchild_pid,
189                    exit_paths,
190                    oom_exit_paths,
191                    None,
192                    io,
193                    cleanup_cmd,
194                    token,
195                );
196                capnp_err!(child_reaper.watch_grandchild(child, leak_fds))?;
197
198                results
199                    .get()
200                    .init_response()
201                    .set_container_pid(grandchild_pid);
202                Ok(())
203            }
204            .instrument(debug_span!("promise")),
205        )
206    }
207
208    /// Execute a command in sync inside of a container.
209    fn exec_sync_container(
210        self: Rc<Server>,
211        params: conmon::ExecSyncContainerParams,
212        mut results: conmon::ExecSyncContainerResults,
213    ) -> Promise<(), capnp::Error> {
214        let req = pry!(pry!(params.get()).get_request());
215        let id = pry!(pry!(req.get_id()).to_string());
216
217        let span = new_root_span!("exec_sync_container", id.as_str());
218        let _enter = span.enter();
219        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
220
221        let timeout = req.get_timeout_sec();
222
223        let pidfile =
224            ContainerIO::temp_file_name(Some(self.config().runtime_dir()), "exec_sync", "pid");
225
226        debug!("Got exec sync container request with timeout {}", timeout);
227
228        let runtime = self.config().runtime().clone();
229        let child_reaper = self.reaper().clone();
230
231        let logger = ContainerLog::new();
232        let mut container_io = pry_err!(ContainerIO::new(req.get_terminal(), logger));
233
234        let command = pry!(req.get_command());
235        let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
236        let cgroup_manager = pry!(req.get_cgroup_manager());
237
238        let args = GenerateRuntimeArgs {
239            config: self.config(),
240            id: &id,
241            container_io: &container_io,
242            pidfile: &pidfile,
243            cgroup_manager,
244        };
245        let args = pry_err!(args.exec_sync_args(command));
246
247        Promise::from_future(
248            async move {
249                match child_reaper
250                    .create_child(
251                        &runtime,
252                        &args,
253                        false,
254                        &mut container_io,
255                        &pidfile,
256                        env_vars,
257                        vec![],
258                    )
259                    .await
260                {
261                    Ok((grandchild_pid, token)) => {
262                        let time_to_timeout = if timeout > 0 {
263                            Some(Instant::now() + Duration::from_secs(timeout))
264                        } else {
265                            None
266                        };
267                        let mut resp = results.get().init_response();
268                        // register grandchild with server
269                        let io = SharedContainerIO::new(container_io);
270                        let io_clone = io.clone();
271                        let child = Child::new(
272                            id,
273                            grandchild_pid,
274                            vec![],
275                            vec![],
276                            time_to_timeout,
277                            io_clone,
278                            vec![],
279                            token.clone(),
280                        );
281
282                        let mut exit_rx = capnp_err!(child_reaper.watch_grandchild(child, vec![]))?;
283
284                        let (stdout, stderr, timed_out) =
285                            capnp_err!(io.read_all_with_timeout(time_to_timeout).await)?;
286
287                        let exit_data = capnp_err!(exit_rx.recv().await)?;
288                        resp.set_stdout(&stdout);
289                        resp.set_stderr(&stderr);
290                        resp.set_exit_code(exit_data.exit_code);
291                        if timed_out || exit_data.timed_out {
292                            resp.set_timed_out(true);
293                        }
294                    }
295                    Err(e) => {
296                        error!("Unable to create child: {:#}", e);
297                        let mut resp = results.get().init_response();
298                        resp.set_exit_code(-2);
299                    }
300                }
301                Ok(())
302            }
303            .instrument(debug_span!("promise")),
304        )
305    }
306
307    /// Attach to a running container.
308    fn attach_container(
309        self: Rc<Server>,
310        params: conmon::AttachContainerParams,
311        _: conmon::AttachContainerResults,
312    ) -> Promise<(), capnp::Error> {
313        let req = pry!(pry!(params.get()).get_request());
314        let id = pry_err!(pry_err!(req.get_id()).to_str());
315
316        let span = new_root_span!("attach_container", id);
317        let _enter = span.enter();
318        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
319
320        debug!("Got a attach container request",);
321
322        let exec_session_id = pry_err!(pry_err!(req.get_exec_session_id()).to_str());
323        if !exec_session_id.is_empty() {
324            debug!("Using exec session id {}", exec_session_id);
325        }
326
327        let socket_path = pry!(pry!(req.get_socket_path()).to_string());
328        let child = pry_err!(self.reaper().get(id));
329        let stop_after_stdin_eof = req.get_stop_after_stdin_eof();
330
331        Promise::from_future(
332            async move {
333                capnp_err!(
334                    child
335                        .io()
336                        .attach()
337                        .await
338                        .add(&socket_path, child.token().clone(), stop_after_stdin_eof)
339                        .await
340                )
341            }
342            .instrument(debug_span!("promise")),
343        )
344    }
345
346    /// Rotate all log drivers for a running container.
347    fn reopen_log_container(
348        self: Rc<Server>,
349        params: conmon::ReopenLogContainerParams,
350        _: conmon::ReopenLogContainerResults,
351    ) -> Promise<(), capnp::Error> {
352        let req = pry!(pry!(params.get()).get_request());
353        let id = pry_err!(pry_err!(req.get_id()).to_str());
354
355        let span = new_root_span!("reopen_log_container", id);
356        let _enter = span.enter();
357        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
358
359        debug!("Got a reopen container log request");
360
361        let child = pry_err!(self.reaper().get(id));
362
363        Promise::from_future(
364            async move { capnp_err!(child.io().logger().await.write().await.reopen().await) }
365                .instrument(debug_span!("promise")),
366        )
367    }
368
369    /// Adjust the window size of a container running inside of a terminal.
370    fn set_window_size_container(
371        self: Rc<Server>,
372        params: conmon::SetWindowSizeContainerParams,
373        _: conmon::SetWindowSizeContainerResults,
374    ) -> Promise<(), capnp::Error> {
375        let req = pry!(pry!(params.get()).get_request());
376        let id = pry_err!(pry_err!(req.get_id()).to_str());
377
378        let span = new_root_span!("set_window_size_container", id);
379        let _enter = span.enter();
380        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
381
382        debug!("Got a set window size container request");
383
384        let child = pry_err!(self.reaper().get(id));
385        let width = req.get_width();
386        let height = req.get_height();
387
388        Promise::from_future(
389            async move { capnp_err!(child.io().resize(width, height).await) }
390                .instrument(debug_span!("promise")),
391        )
392    }
393
394    /// Create a new set of namespaces.
395    fn create_namespaces(
396        self: Rc<Server>,
397        params: conmon::CreateNamespacesParams,
398        mut results: conmon::CreateNamespacesResults,
399    ) -> Promise<(), capnp::Error> {
400        debug!("Got a create namespaces request");
401        let req = pry!(pry!(params.get()).get_request());
402        let pod_id = pry_err!(pry_err!(req.get_pod_id()).to_str());
403
404        if pod_id.is_empty() {
405            return Promise::err(Error::failed("no pod ID provided".into()));
406        }
407
408        let span = new_root_span!("create_namespaces", pod_id);
409        let _enter = span.enter();
410        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
411
412        let pause = pry_err!(Pause::init_shared(
413            pry!(pry!(req.get_base_path()).to_str()),
414            pod_id,
415            pry!(req.get_namespaces()),
416            capnp_vec_str!(req.get_uid_mappings()),
417            capnp_vec_str!(req.get_gid_mappings()),
418        ));
419
420        let response = results.get().init_response();
421        let mut namespaces =
422            response.init_namespaces(pry_err!(pause.namespaces().len().try_into()));
423
424        for (idx, namespace) in pause.namespaces().iter().enumerate() {
425            let mut ns = namespaces.reborrow().get(pry_err!(idx.try_into()));
426            ns.set_path(
427                namespace
428                    .path(pause.base_path(), pod_id)
429                    .display()
430                    .to_string(),
431            );
432            ns.set_type(namespace.to_capnp_namespace());
433        }
434
435        Promise::ok(())
436    }
437
438    fn start_fd_socket(
439        self: Rc<Server>,
440        params: conmon::StartFdSocketParams,
441        mut results: conmon::StartFdSocketResults,
442    ) -> Promise<(), capnp::Error> {
443        let req = pry!(pry!(params.get()).get_request());
444
445        let span = debug_span!(
446            "start_fd_socket",
447            uuid = %Uuid::new_v4()
448        );
449        let _enter = span.enter();
450        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
451
452        debug!("Got a start fd socket request");
453
454        let path = self.config().fd_socket();
455        let fd_socket = self.fd_socket().clone();
456
457        Promise::from_future(
458            async move {
459                let path = capnp_err!(fd_socket.start(path).await)?;
460
461                let mut resp = results.get().init_response();
462                resp.set_path(capnp_err!(path.to_str().context("fd_socket path to str"))?);
463
464                Ok(())
465            }
466            .instrument(debug_span!("promise")),
467        )
468    }
469
470    fn serve_exec_container(
471        self: Rc<Server>,
472        params: conmon::ServeExecContainerParams,
473        mut results: conmon::ServeExecContainerResults,
474    ) -> Promise<(), capnp::Error> {
475        debug!("Got a serve exec container request");
476        let req = pry!(pry!(params.get()).get_request());
477
478        let span = debug_span!(
479            "serve_exec_container",
480            uuid = %Uuid::new_v4()
481        );
482        let _enter = span.enter();
483        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
484
485        let id = pry_err!(pry_err!(req.get_id()).to_string());
486
487        // Validate that the container actually exists
488        pry_err!(self.reaper().get(&id));
489
490        let command = capnp_vec_str!(req.get_command());
491        let (tty, stdin, stdout, stderr) = (
492            req.get_tty(),
493            req.get_stdin(),
494            req.get_stdout(),
495            req.get_stderr(),
496        );
497
498        let streaming_server = self.streaming_server().clone();
499        let child_reaper = self.reaper().clone();
500        let container_io = pry_err!(ContainerIO::new(tty, ContainerLog::new()));
501        let config = self.config().clone();
502        let cgroup_manager = pry!(req.get_cgroup_manager());
503
504        Promise::from_future(
505            async move {
506                capnp_err!(
507                    streaming_server
508                        .write()
509                        .await
510                        .start_if_required()
511                        .await
512                        .context("start streaming server if required")
513                )?;
514
515                let url = streaming_server
516                    .read()
517                    .await
518                    .exec_url(
519                        child_reaper,
520                        container_io,
521                        config,
522                        cgroup_manager,
523                        id.into_boxed_str(),
524                        command,
525                        stdin,
526                        stdout,
527                        stderr,
528                    )
529                    .await;
530
531                results.get().init_response().set_url(&url);
532                Ok(())
533            }
534            .instrument(debug_span!("promise")),
535        )
536    }
537
538    fn serve_attach_container(
539        self: Rc<Server>,
540        params: conmon::ServeAttachContainerParams,
541        mut results: conmon::ServeAttachContainerResults,
542    ) -> Promise<(), capnp::Error> {
543        debug!("Got a serve attach container request");
544        let req = pry!(pry!(params.get()).get_request());
545
546        let span = debug_span!(
547            "serve_attach_container",
548            uuid = %Uuid::new_v4()
549        );
550        let _enter = span.enter();
551        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
552
553        let id = pry_err!(pry_err!(req.get_id()).to_str());
554        let (stdin, stdout, stderr) = (req.get_stdin(), req.get_stdout(), req.get_stderr());
555
556        let streaming_server = self.streaming_server().clone();
557        let child = pry_err!(self.reaper().get(id));
558
559        Promise::from_future(
560            async move {
561                capnp_err!(
562                    streaming_server
563                        .write()
564                        .await
565                        .start_if_required()
566                        .await
567                        .context("start streaming server")
568                )?;
569
570                let url = streaming_server
571                    .read()
572                    .await
573                    .attach_url(child, stdin, stdout, stderr)
574                    .await;
575
576                results.get().init_response().set_url(&url);
577                Ok(())
578            }
579            .instrument(debug_span!("promise")),
580        )
581    }
582
583    fn serve_port_forward_container(
584        self: Rc<Server>,
585        params: conmon::ServePortForwardContainerParams,
586        mut results: conmon::ServePortForwardContainerResults,
587    ) -> Promise<(), capnp::Error> {
588        debug!("Got a serve port forward container request");
589        let req = pry!(pry!(params.get()).get_request());
590
591        let span = debug_span!(
592            "serve_port_forward_container",
593            uuid = %Uuid::new_v4()
594        );
595        let _enter = span.enter();
596        pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
597
598        let net_ns_path = pry_err!(pry_err!(req.get_net_ns_path()).to_string());
599        let streaming_server = self.streaming_server().clone();
600
601        Promise::from_future(
602            async move {
603                capnp_err!(
604                    streaming_server
605                        .write()
606                        .await
607                        .start_if_required()
608                        .await
609                        .context("start streaming server if required")
610                )?;
611
612                let url = streaming_server
613                    .read()
614                    .await
615                    .port_forward_url(net_ns_path)
616                    .await;
617
618                results.get().init_response().set_url(&url);
619                Ok(())
620            }
621            .instrument(debug_span!("promise")),
622        )
623    }
624}