1use crate::{
2 capnp_util,
3 child::Child,
4 container_io::{ContainerIO, SharedContainerIO},
5 container_log::ContainerLog,
6 pause::Pause,
7 server::{GenerateRuntimeArgs, Server},
8 telemetry::Telemetry,
9 version::Version,
10};
11use anyhow::{Context, format_err};
12use capnp::{Error, capability::Promise};
13use capnp_rpc::pry;
14use conmon_common::conmon_capnp::conmon;
15use std::{
16 path::{Path, PathBuf},
17 process,
18 rc::Rc,
19 str,
20 time::Duration,
21};
22use tokio::time::Instant;
23use tracing::{Instrument, debug, debug_span, error};
24use uuid::Uuid;
25
26macro_rules! pry_err {
27 ($x:expr_2021) => {
28 pry!(capnp_err!($x))
29 };
30}
31
32macro_rules! capnp_err {
33 ($x:expr_2021) => {
34 $x.map_err(|e| Error::failed(format!("{:#}", e)))
35 };
36}
37
38macro_rules! new_root_span {
39 ($name:expr_2021, $container_id:expr_2021) => {
40 debug_span!(
41 $name,
42 container_id = $container_id,
43 uuid = %Uuid::new_v4()
44 )
45 };
46}
47
48macro_rules! capnp_text_list {
50 ($x:expr_2021) => {
51 pry!(pry!($x).iter().collect::<Result<Vec<_>, _>>())
52 };
53}
54
55macro_rules! capnp_vec_str {
56 ($x:expr_2021) => {
57 pry!(
58 capnp_text_list!($x)
59 .iter()
60 .map(|s| s.to_string())
61 .collect::<Result<Vec<_>, _>>()
62 )
63 };
64}
65
66macro_rules! capnp_vec_path {
67 ($x:expr_2021) => {
68 pry!(
69 capnp_text_list!($x)
70 .iter()
71 .map(|s| s.to_str().map(|x| PathBuf::from(x)))
72 .collect::<Result<Vec<_>, _>>()
73 )
74 };
75}
76
77#[allow(refining_impl_trait_reachable)]
78impl conmon::Server for Server {
79 fn version(
81 self: Rc<Server>,
82 params: conmon::VersionParams,
83 mut results: conmon::VersionResults,
84 ) -> Promise<(), capnp::Error> {
85 debug!("Got a version request");
86 let req = pry!(pry!(params.get()).get_request());
87
88 let span = debug_span!("version", uuid = %Uuid::new_v4());
89 let _enter = span.enter();
90 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
91
92 let version = Version::new(req.get_verbose());
93 let mut response = results.get().init_response();
94 response.set_process_id(process::id());
95 response.set_version(version.version());
96 response.set_tag(version.tag());
97 response.set_commit(version.commit());
98 response.set_build_date(version.build_date());
99 response.set_target(version.target());
100 response.set_rust_version(version.rust_version());
101 response.set_cargo_version(version.cargo_version());
102 response.set_cargo_tree(version.cargo_tree());
103
104 Promise::ok(())
105 }
106
107 fn create_container(
109 self: Rc<Server>,
110 params: conmon::CreateContainerParams,
111 mut results: conmon::CreateContainerResults,
112 ) -> Promise<(), capnp::Error> {
113 let req = pry!(pry!(params.get()).get_request());
114 let id = pry!(pry!(req.get_id()).to_string());
115
116 let span = new_root_span!("create_container", id.as_str());
117 let _enter = span.enter();
118 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
119
120 let cleanup_cmd: Vec<String> = capnp_vec_str!(req.get_cleanup_cmd());
121
122 debug!("Got a create container request");
123
124 let log_drivers = pry!(req.get_log_drivers());
125 let container_log = pry_err!(ContainerLog::from(log_drivers));
126 let mut container_io =
127 pry_err!(ContainerIO::new(req.get_terminal(), container_log.clone()));
128
129 let bundle_path = Path::new(pry!(pry!(req.get_bundle_path()).to_str()));
130 let pidfile = bundle_path.join("pidfile");
131 debug!("PID file is {}", pidfile.display());
132
133 let child_reaper = self.reaper().clone();
134 let global_args = pry!(req.get_global_args());
135 let command_args = pry!(req.get_command_args());
136 let cgroup_manager = pry!(req.get_cgroup_manager());
137 let args = GenerateRuntimeArgs {
138 config: self.config(),
139 id: &id,
140 container_io: &container_io,
141 pidfile: &pidfile,
142 cgroup_manager,
143 };
144 let args = pry_err!(args.create_args(bundle_path, global_args, command_args));
145 let stdin = req.get_stdin();
146 let runtime = self.config().runtime().clone();
147 let exit_paths = capnp_vec_path!(req.get_exit_paths());
148 let oom_exit_paths = capnp_vec_path!(req.get_oom_exit_paths());
149 let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
150
151 let additional_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_additional_fds())));
152 let leak_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_leak_fds())));
153
154 Promise::from_future(
155 async move {
156 capnp_err!(container_log.write().await.init().await)?;
157
158 let (grandchild_pid, token) = capnp_err!(match child_reaper
159 .create_child(
160 runtime,
161 args,
162 stdin,
163 &mut container_io,
164 &pidfile,
165 env_vars,
166 additional_fds,
167 )
168 .await
169 {
170 Err(e) => {
171 let (_, stderr, _) =
173 capnp_err!(container_io.read_all_with_timeout(None).await)?;
174 if !stderr.is_empty() {
175 let stderr_str = str::from_utf8(&stderr)?;
176 Err(format_err!("{:#}: {}", e, stderr_str))
177 } else {
178 Err(e)
179 }
180 }
181 res => res,
182 })?;
183
184 let io = SharedContainerIO::new(container_io);
186 let child = Child::new(
187 id,
188 grandchild_pid,
189 exit_paths,
190 oom_exit_paths,
191 None,
192 io,
193 cleanup_cmd,
194 token,
195 );
196 capnp_err!(child_reaper.watch_grandchild(child, leak_fds))?;
197
198 results
199 .get()
200 .init_response()
201 .set_container_pid(grandchild_pid);
202 Ok(())
203 }
204 .instrument(debug_span!("promise")),
205 )
206 }
207
208 fn exec_sync_container(
210 self: Rc<Server>,
211 params: conmon::ExecSyncContainerParams,
212 mut results: conmon::ExecSyncContainerResults,
213 ) -> Promise<(), capnp::Error> {
214 let req = pry!(pry!(params.get()).get_request());
215 let id = pry!(pry!(req.get_id()).to_string());
216
217 let span = new_root_span!("exec_sync_container", id.as_str());
218 let _enter = span.enter();
219 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
220
221 let timeout = req.get_timeout_sec();
222
223 let pidfile =
224 ContainerIO::temp_file_name(Some(self.config().runtime_dir()), "exec_sync", "pid");
225
226 debug!("Got exec sync container request with timeout {}", timeout);
227
228 let runtime = self.config().runtime().clone();
229 let child_reaper = self.reaper().clone();
230
231 let logger = ContainerLog::new();
232 let mut container_io = pry_err!(ContainerIO::new(req.get_terminal(), logger));
233
234 let command = pry!(req.get_command());
235 let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
236 let cgroup_manager = pry!(req.get_cgroup_manager());
237
238 let args = GenerateRuntimeArgs {
239 config: self.config(),
240 id: &id,
241 container_io: &container_io,
242 pidfile: &pidfile,
243 cgroup_manager,
244 };
245 let args = pry_err!(args.exec_sync_args(command));
246
247 Promise::from_future(
248 async move {
249 match child_reaper
250 .create_child(
251 &runtime,
252 &args,
253 false,
254 &mut container_io,
255 &pidfile,
256 env_vars,
257 vec![],
258 )
259 .await
260 {
261 Ok((grandchild_pid, token)) => {
262 let time_to_timeout = if timeout > 0 {
263 Some(Instant::now() + Duration::from_secs(timeout))
264 } else {
265 None
266 };
267 let mut resp = results.get().init_response();
268 let io = SharedContainerIO::new(container_io);
270 let io_clone = io.clone();
271 let child = Child::new(
272 id,
273 grandchild_pid,
274 vec![],
275 vec![],
276 time_to_timeout,
277 io_clone,
278 vec![],
279 token.clone(),
280 );
281
282 let mut exit_rx = capnp_err!(child_reaper.watch_grandchild(child, vec![]))?;
283
284 let (stdout, stderr, timed_out) =
285 capnp_err!(io.read_all_with_timeout(time_to_timeout).await)?;
286
287 let exit_data = capnp_err!(exit_rx.recv().await)?;
288 resp.set_stdout(&stdout);
289 resp.set_stderr(&stderr);
290 resp.set_exit_code(exit_data.exit_code);
291 if timed_out || exit_data.timed_out {
292 resp.set_timed_out(true);
293 }
294 }
295 Err(e) => {
296 error!("Unable to create child: {:#}", e);
297 let mut resp = results.get().init_response();
298 resp.set_exit_code(-2);
299 }
300 }
301 Ok(())
302 }
303 .instrument(debug_span!("promise")),
304 )
305 }
306
307 fn attach_container(
309 self: Rc<Server>,
310 params: conmon::AttachContainerParams,
311 _: conmon::AttachContainerResults,
312 ) -> Promise<(), capnp::Error> {
313 let req = pry!(pry!(params.get()).get_request());
314 let id = pry_err!(pry_err!(req.get_id()).to_str());
315
316 let span = new_root_span!("attach_container", id);
317 let _enter = span.enter();
318 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
319
320 debug!("Got a attach container request",);
321
322 let exec_session_id = pry_err!(pry_err!(req.get_exec_session_id()).to_str());
323 if !exec_session_id.is_empty() {
324 debug!("Using exec session id {}", exec_session_id);
325 }
326
327 let socket_path = pry!(pry!(req.get_socket_path()).to_string());
328 let child = pry_err!(self.reaper().get(id));
329 let stop_after_stdin_eof = req.get_stop_after_stdin_eof();
330
331 Promise::from_future(
332 async move {
333 capnp_err!(
334 child
335 .io()
336 .attach()
337 .await
338 .add(&socket_path, child.token().clone(), stop_after_stdin_eof)
339 .await
340 )
341 }
342 .instrument(debug_span!("promise")),
343 )
344 }
345
346 fn reopen_log_container(
348 self: Rc<Server>,
349 params: conmon::ReopenLogContainerParams,
350 _: conmon::ReopenLogContainerResults,
351 ) -> Promise<(), capnp::Error> {
352 let req = pry!(pry!(params.get()).get_request());
353 let id = pry_err!(pry_err!(req.get_id()).to_str());
354
355 let span = new_root_span!("reopen_log_container", id);
356 let _enter = span.enter();
357 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
358
359 debug!("Got a reopen container log request");
360
361 let child = pry_err!(self.reaper().get(id));
362
363 Promise::from_future(
364 async move { capnp_err!(child.io().logger().await.write().await.reopen().await) }
365 .instrument(debug_span!("promise")),
366 )
367 }
368
369 fn set_window_size_container(
371 self: Rc<Server>,
372 params: conmon::SetWindowSizeContainerParams,
373 _: conmon::SetWindowSizeContainerResults,
374 ) -> Promise<(), capnp::Error> {
375 let req = pry!(pry!(params.get()).get_request());
376 let id = pry_err!(pry_err!(req.get_id()).to_str());
377
378 let span = new_root_span!("set_window_size_container", id);
379 let _enter = span.enter();
380 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
381
382 debug!("Got a set window size container request");
383
384 let child = pry_err!(self.reaper().get(id));
385 let width = req.get_width();
386 let height = req.get_height();
387
388 Promise::from_future(
389 async move { capnp_err!(child.io().resize(width, height).await) }
390 .instrument(debug_span!("promise")),
391 )
392 }
393
394 fn create_namespaces(
396 self: Rc<Server>,
397 params: conmon::CreateNamespacesParams,
398 mut results: conmon::CreateNamespacesResults,
399 ) -> Promise<(), capnp::Error> {
400 debug!("Got a create namespaces request");
401 let req = pry!(pry!(params.get()).get_request());
402 let pod_id = pry_err!(pry_err!(req.get_pod_id()).to_str());
403
404 if pod_id.is_empty() {
405 return Promise::err(Error::failed("no pod ID provided".into()));
406 }
407
408 let span = new_root_span!("create_namespaces", pod_id);
409 let _enter = span.enter();
410 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
411
412 let pause = pry_err!(Pause::init_shared(
413 pry!(pry!(req.get_base_path()).to_str()),
414 pod_id,
415 pry!(req.get_namespaces()),
416 capnp_vec_str!(req.get_uid_mappings()),
417 capnp_vec_str!(req.get_gid_mappings()),
418 ));
419
420 let response = results.get().init_response();
421 let mut namespaces =
422 response.init_namespaces(pry_err!(pause.namespaces().len().try_into()));
423
424 for (idx, namespace) in pause.namespaces().iter().enumerate() {
425 let mut ns = namespaces.reborrow().get(pry_err!(idx.try_into()));
426 ns.set_path(
427 namespace
428 .path(pause.base_path(), pod_id)
429 .display()
430 .to_string(),
431 );
432 ns.set_type(namespace.to_capnp_namespace());
433 }
434
435 Promise::ok(())
436 }
437
438 fn start_fd_socket(
439 self: Rc<Server>,
440 params: conmon::StartFdSocketParams,
441 mut results: conmon::StartFdSocketResults,
442 ) -> Promise<(), capnp::Error> {
443 let req = pry!(pry!(params.get()).get_request());
444
445 let span = debug_span!(
446 "start_fd_socket",
447 uuid = %Uuid::new_v4()
448 );
449 let _enter = span.enter();
450 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
451
452 debug!("Got a start fd socket request");
453
454 let path = self.config().fd_socket();
455 let fd_socket = self.fd_socket().clone();
456
457 Promise::from_future(
458 async move {
459 let path = capnp_err!(fd_socket.start(path).await)?;
460
461 let mut resp = results.get().init_response();
462 resp.set_path(capnp_err!(path.to_str().context("fd_socket path to str"))?);
463
464 Ok(())
465 }
466 .instrument(debug_span!("promise")),
467 )
468 }
469
470 fn serve_exec_container(
471 self: Rc<Server>,
472 params: conmon::ServeExecContainerParams,
473 mut results: conmon::ServeExecContainerResults,
474 ) -> Promise<(), capnp::Error> {
475 debug!("Got a serve exec container request");
476 let req = pry!(pry!(params.get()).get_request());
477
478 let span = debug_span!(
479 "serve_exec_container",
480 uuid = %Uuid::new_v4()
481 );
482 let _enter = span.enter();
483 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
484
485 let id = pry_err!(pry_err!(req.get_id()).to_string());
486
487 pry_err!(self.reaper().get(&id));
489
490 let command = capnp_vec_str!(req.get_command());
491 let (tty, stdin, stdout, stderr) = (
492 req.get_tty(),
493 req.get_stdin(),
494 req.get_stdout(),
495 req.get_stderr(),
496 );
497
498 let streaming_server = self.streaming_server().clone();
499 let child_reaper = self.reaper().clone();
500 let container_io = pry_err!(ContainerIO::new(tty, ContainerLog::new()));
501 let config = self.config().clone();
502 let cgroup_manager = pry!(req.get_cgroup_manager());
503
504 Promise::from_future(
505 async move {
506 capnp_err!(
507 streaming_server
508 .write()
509 .await
510 .start_if_required()
511 .await
512 .context("start streaming server if required")
513 )?;
514
515 let url = streaming_server
516 .read()
517 .await
518 .exec_url(
519 child_reaper,
520 container_io,
521 config,
522 cgroup_manager,
523 id.into_boxed_str(),
524 command,
525 stdin,
526 stdout,
527 stderr,
528 )
529 .await;
530
531 results.get().init_response().set_url(&url);
532 Ok(())
533 }
534 .instrument(debug_span!("promise")),
535 )
536 }
537
538 fn serve_attach_container(
539 self: Rc<Server>,
540 params: conmon::ServeAttachContainerParams,
541 mut results: conmon::ServeAttachContainerResults,
542 ) -> Promise<(), capnp::Error> {
543 debug!("Got a serve attach container request");
544 let req = pry!(pry!(params.get()).get_request());
545
546 let span = debug_span!(
547 "serve_attach_container",
548 uuid = %Uuid::new_v4()
549 );
550 let _enter = span.enter();
551 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
552
553 let id = pry_err!(pry_err!(req.get_id()).to_str());
554 let (stdin, stdout, stderr) = (req.get_stdin(), req.get_stdout(), req.get_stderr());
555
556 let streaming_server = self.streaming_server().clone();
557 let child = pry_err!(self.reaper().get(id));
558
559 Promise::from_future(
560 async move {
561 capnp_err!(
562 streaming_server
563 .write()
564 .await
565 .start_if_required()
566 .await
567 .context("start streaming server")
568 )?;
569
570 let url = streaming_server
571 .read()
572 .await
573 .attach_url(child, stdin, stdout, stderr)
574 .await;
575
576 results.get().init_response().set_url(&url);
577 Ok(())
578 }
579 .instrument(debug_span!("promise")),
580 )
581 }
582
583 fn serve_port_forward_container(
584 self: Rc<Server>,
585 params: conmon::ServePortForwardContainerParams,
586 mut results: conmon::ServePortForwardContainerResults,
587 ) -> Promise<(), capnp::Error> {
588 debug!("Got a serve port forward container request");
589 let req = pry!(pry!(params.get()).get_request());
590
591 let span = debug_span!(
592 "serve_port_forward_container",
593 uuid = %Uuid::new_v4()
594 );
595 let _enter = span.enter();
596 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
597
598 let net_ns_path = pry_err!(pry_err!(req.get_net_ns_path()).to_string());
599 let streaming_server = self.streaming_server().clone();
600
601 Promise::from_future(
602 async move {
603 capnp_err!(
604 streaming_server
605 .write()
606 .await
607 .start_if_required()
608 .await
609 .context("start streaming server if required")
610 )?;
611
612 let url = streaming_server
613 .read()
614 .await
615 .port_forward_url(net_ns_path)
616 .await;
617
618 results.get().init_response().set_url(&url);
619 Ok(())
620 }
621 .instrument(debug_span!("promise")),
622 )
623 }
624}