1use crate::{
2 capnp_util,
3 child::Child,
4 container_io::{ContainerIO, SharedContainerIO},
5 container_log::ContainerLog,
6 pause::Pause,
7 server::{GenerateRuntimeArgs, Server},
8 telemetry::Telemetry,
9 version::Version,
10};
11use anyhow::{format_err, Context};
12use capnp::{capability::Promise, Error};
13use capnp_rpc::pry;
14use conmon_common::conmon_capnp::conmon;
15use std::{
16 path::{Path, PathBuf},
17 process, str,
18 time::Duration,
19};
20use tokio::time::Instant;
21use tracing::{debug, debug_span, error, Instrument};
22use uuid::Uuid;
23
24macro_rules! pry_err {
25 ($x:expr) => {
26 pry!(capnp_err!($x))
27 };
28}
29
30macro_rules! capnp_err {
31 ($x:expr) => {
32 $x.map_err(|e| Error::failed(format!("{:#}", e)))
33 };
34}
35
36macro_rules! new_root_span {
37 ($name:expr, $container_id:expr) => {
38 debug_span!(
39 $name,
40 container_id = $container_id,
41 uuid = Uuid::new_v4().to_string().as_str()
42 )
43 };
44}
45
46macro_rules! capnp_text_list {
48 ($x:expr) => {
49 pry!(pry!($x).iter().collect::<Result<Vec<_>, _>>())
50 };
51}
52
53macro_rules! capnp_vec_str {
54 ($x:expr) => {
55 pry!(capnp_text_list!($x)
56 .iter()
57 .map(|s| s.to_string())
58 .collect::<Result<Vec<_>, _>>())
59 };
60}
61
62macro_rules! capnp_vec_path {
63 ($x:expr) => {
64 pry!(capnp_text_list!($x)
65 .iter()
66 .map(|s| s.to_str().map(|x| PathBuf::from(x)))
67 .collect::<Result<Vec<_>, _>>())
68 };
69}
70
71impl conmon::Server for Server {
72 fn version(
74 &mut self,
75 params: conmon::VersionParams,
76 mut results: conmon::VersionResults,
77 ) -> Promise<(), capnp::Error> {
78 debug!("Got a version request");
79 let req = pry!(pry!(params.get()).get_request());
80
81 let span = debug_span!("version", uuid = Uuid::new_v4().to_string().as_str());
82 let _enter = span.enter();
83 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
84
85 let version = Version::new(req.get_verbose());
86 let mut response = results.get().init_response();
87 response.set_process_id(process::id());
88 response.set_version(version.version());
89 response.set_tag(version.tag());
90 response.set_commit(version.commit());
91 response.set_build_date(version.build_date());
92 response.set_target(version.target());
93 response.set_rust_version(version.rust_version());
94 response.set_cargo_version(version.cargo_version());
95 response.set_cargo_tree(version.cargo_tree());
96
97 Promise::ok(())
98 }
99
100 fn create_container(
102 &mut self,
103 params: conmon::CreateContainerParams,
104 mut results: conmon::CreateContainerResults,
105 ) -> Promise<(), capnp::Error> {
106 let req = pry!(pry!(params.get()).get_request());
107 let id = pry!(pry!(req.get_id()).to_string());
108
109 let span = new_root_span!("create_container", id.as_str());
110 let _enter = span.enter();
111 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
112
113 let cleanup_cmd: Vec<String> = capnp_vec_str!(req.get_cleanup_cmd());
114
115 debug!("Got a create container request");
116
117 let log_drivers = pry!(req.get_log_drivers());
118 let container_log = pry_err!(ContainerLog::from(log_drivers));
119 let mut container_io =
120 pry_err!(ContainerIO::new(req.get_terminal(), container_log.clone()));
121
122 let bundle_path = Path::new(pry!(pry!(req.get_bundle_path()).to_str()));
123 let pidfile = bundle_path.join("pidfile");
124 debug!("PID file is {}", pidfile.display());
125
126 let child_reaper = self.reaper().clone();
127 let global_args = pry!(req.get_global_args());
128 let command_args = pry!(req.get_command_args());
129 let cgroup_manager = pry!(req.get_cgroup_manager());
130 let args = GenerateRuntimeArgs {
131 config: self.config(),
132 id: &id,
133 container_io: &container_io,
134 pidfile: &pidfile,
135 cgroup_manager,
136 };
137 let args = pry_err!(args.create_args(bundle_path, global_args, command_args));
138 let stdin = req.get_stdin();
139 let runtime = self.config().runtime().clone();
140 let exit_paths = capnp_vec_path!(req.get_exit_paths());
141 let oom_exit_paths = capnp_vec_path!(req.get_oom_exit_paths());
142 let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
143
144 let additional_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_additional_fds())));
145 let leak_fds = pry_err!(self.fd_socket().take_all(pry!(req.get_leak_fds())));
146
147 Promise::from_future(
148 async move {
149 capnp_err!(container_log.write().await.init().await)?;
150
151 let (grandchild_pid, token) = capnp_err!(match child_reaper
152 .create_child(
153 runtime,
154 args,
155 stdin,
156 &mut container_io,
157 &pidfile,
158 env_vars,
159 additional_fds,
160 )
161 .await
162 {
163 Err(e) => {
164 let (_, stderr, _) =
166 capnp_err!(container_io.read_all_with_timeout(None).await)?;
167 if !stderr.is_empty() {
168 let stderr_str = str::from_utf8(&stderr)?;
169 Err(format_err!("{:#}: {}", e, stderr_str))
170 } else {
171 Err(e)
172 }
173 }
174 res => res,
175 })?;
176
177 let io = SharedContainerIO::new(container_io);
179 let child = Child::new(
180 id,
181 grandchild_pid,
182 exit_paths,
183 oom_exit_paths,
184 None,
185 io,
186 cleanup_cmd,
187 token,
188 );
189 capnp_err!(child_reaper.watch_grandchild(child, leak_fds))?;
190
191 results
192 .get()
193 .init_response()
194 .set_container_pid(grandchild_pid);
195 Ok(())
196 }
197 .instrument(debug_span!("promise")),
198 )
199 }
200
201 fn exec_sync_container(
203 &mut self,
204 params: conmon::ExecSyncContainerParams,
205 mut results: conmon::ExecSyncContainerResults,
206 ) -> Promise<(), capnp::Error> {
207 let req = pry!(pry!(params.get()).get_request());
208 let id = pry!(pry!(req.get_id()).to_string());
209
210 let span = new_root_span!("exec_sync_container", id.as_str());
211 let _enter = span.enter();
212 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
213
214 let timeout = req.get_timeout_sec();
215
216 let pidfile = pry_err!(ContainerIO::temp_file_name(
217 Some(self.config().runtime_dir()),
218 "exec_sync",
219 "pid"
220 ));
221
222 debug!("Got exec sync container request with timeout {}", timeout);
223
224 let runtime = self.config().runtime().clone();
225 let child_reaper = self.reaper().clone();
226
227 let logger = ContainerLog::new();
228 let mut container_io = pry_err!(ContainerIO::new(req.get_terminal(), logger));
229
230 let command = pry!(req.get_command());
231 let env_vars = pry!(req.get_env_vars().and_then(capnp_util::into_map));
232 let cgroup_manager = pry!(req.get_cgroup_manager());
233
234 let args = GenerateRuntimeArgs {
235 config: self.config(),
236 id: &id,
237 container_io: &container_io,
238 pidfile: &pidfile,
239 cgroup_manager,
240 };
241 let args = pry_err!(args.exec_sync_args(command));
242
243 Promise::from_future(
244 async move {
245 match child_reaper
246 .create_child(
247 &runtime,
248 &args,
249 false,
250 &mut container_io,
251 &pidfile,
252 env_vars,
253 vec![],
254 )
255 .await
256 {
257 Ok((grandchild_pid, token)) => {
258 let time_to_timeout = if timeout > 0 {
259 Some(Instant::now() + Duration::from_secs(timeout))
260 } else {
261 None
262 };
263 let mut resp = results.get().init_response();
264 let io = SharedContainerIO::new(container_io);
266 let io_clone = io.clone();
267 let child = Child::new(
268 id,
269 grandchild_pid,
270 vec![],
271 vec![],
272 time_to_timeout,
273 io_clone,
274 vec![],
275 token.clone(),
276 );
277
278 let mut exit_rx = capnp_err!(child_reaper.watch_grandchild(child, vec![]))?;
279
280 let (stdout, stderr, timed_out) =
281 capnp_err!(io.read_all_with_timeout(time_to_timeout).await)?;
282
283 let exit_data = capnp_err!(exit_rx.recv().await)?;
284 resp.set_stdout(&stdout);
285 resp.set_stderr(&stderr);
286 resp.set_exit_code(*exit_data.exit_code());
287 if timed_out || exit_data.timed_out {
288 resp.set_timed_out(true);
289 }
290 }
291 Err(e) => {
292 error!("Unable to create child: {:#}", e);
293 let mut resp = results.get().init_response();
294 resp.set_exit_code(-2);
295 }
296 }
297 Ok(())
298 }
299 .instrument(debug_span!("promise")),
300 )
301 }
302
303 fn attach_container(
305 &mut self,
306 params: conmon::AttachContainerParams,
307 _: conmon::AttachContainerResults,
308 ) -> Promise<(), capnp::Error> {
309 let req = pry!(pry!(params.get()).get_request());
310 let id = pry_err!(pry_err!(req.get_id()).to_str());
311
312 let span = new_root_span!("attach_container", id);
313 let _enter = span.enter();
314 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
315
316 debug!("Got a attach container request",);
317
318 let exec_session_id = pry_err!(pry_err!(req.get_exec_session_id()).to_str());
319 if !exec_session_id.is_empty() {
320 debug!("Using exec session id {}", exec_session_id);
321 }
322
323 let socket_path = pry!(pry!(req.get_socket_path()).to_string());
324 let child = pry_err!(self.reaper().get(id));
325 let stop_after_stdin_eof = req.get_stop_after_stdin_eof();
326
327 Promise::from_future(
328 async move {
329 capnp_err!(
330 child
331 .io()
332 .attach()
333 .await
334 .add(&socket_path, child.token().clone(), stop_after_stdin_eof)
335 .await
336 )
337 }
338 .instrument(debug_span!("promise")),
339 )
340 }
341
342 fn reopen_log_container(
344 &mut self,
345 params: conmon::ReopenLogContainerParams,
346 _: conmon::ReopenLogContainerResults,
347 ) -> Promise<(), capnp::Error> {
348 let req = pry!(pry!(params.get()).get_request());
349 let id = pry_err!(pry_err!(req.get_id()).to_str());
350
351 let span = new_root_span!("reopen_log_container", id);
352 let _enter = span.enter();
353 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
354
355 debug!("Got a reopen container log request");
356
357 let child = pry_err!(self.reaper().get(id));
358
359 Promise::from_future(
360 async move { capnp_err!(child.io().logger().await.write().await.reopen().await) }
361 .instrument(debug_span!("promise")),
362 )
363 }
364
365 fn set_window_size_container(
367 &mut self,
368 params: conmon::SetWindowSizeContainerParams,
369 _: conmon::SetWindowSizeContainerResults,
370 ) -> Promise<(), capnp::Error> {
371 let req = pry!(pry!(params.get()).get_request());
372 let id = pry_err!(pry_err!(req.get_id()).to_str());
373
374 let span = new_root_span!("set_window_size_container", id);
375 let _enter = span.enter();
376 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
377
378 debug!("Got a set window size container request");
379
380 let child = pry_err!(self.reaper().get(id));
381 let width = req.get_width();
382 let height = req.get_height();
383
384 Promise::from_future(
385 async move { capnp_err!(child.io().resize(width, height).await) }
386 .instrument(debug_span!("promise")),
387 )
388 }
389
390 fn create_namespaces(
392 &mut self,
393 params: conmon::CreateNamespacesParams,
394 mut results: conmon::CreateNamespacesResults,
395 ) -> Promise<(), capnp::Error> {
396 debug!("Got a create namespaces request");
397 let req = pry!(pry!(params.get()).get_request());
398 let pod_id = pry_err!(pry_err!(req.get_pod_id()).to_str());
399
400 if pod_id.is_empty() {
401 return Promise::err(Error::failed("no pod ID provided".into()));
402 }
403
404 let span = new_root_span!("create_namespaces", pod_id);
405 let _enter = span.enter();
406 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
407
408 let pause = pry_err!(Pause::init_shared(
409 pry!(pry!(req.get_base_path()).to_str()),
410 pod_id,
411 pry!(req.get_namespaces()),
412 capnp_vec_str!(req.get_uid_mappings()),
413 capnp_vec_str!(req.get_gid_mappings()),
414 ));
415
416 let response = results.get().init_response();
417 let mut namespaces =
418 response.init_namespaces(pry_err!(pause.namespaces().len().try_into()));
419
420 for (idx, namespace) in pause.namespaces().iter().enumerate() {
421 let mut ns = namespaces.reborrow().get(pry_err!(idx.try_into()));
422 ns.set_path(
423 namespace
424 .path(pause.base_path(), pod_id)
425 .display()
426 .to_string(),
427 );
428 ns.set_type(namespace.to_capnp_namespace());
429 }
430
431 Promise::ok(())
432 }
433
434 fn start_fd_socket(
435 &mut self,
436 params: conmon::StartFdSocketParams,
437 mut results: conmon::StartFdSocketResults,
438 ) -> Promise<(), capnp::Error> {
439 let req = pry!(pry!(params.get()).get_request());
440
441 let span = debug_span!(
442 "start_fd_socket",
443 uuid = Uuid::new_v4().to_string().as_str()
444 );
445 let _enter = span.enter();
446 pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
447
448 debug!("Got a start fd socket request");
449
450 let path = self.config().fd_socket();
451 let fd_socket = self.fd_socket().clone();
452
453 Promise::from_future(
454 async move {
455 let path = capnp_err!(fd_socket.start(path).await)?;
456
457 let mut resp = results.get().init_response();
458 resp.set_path(capnp_err!(path.to_str().context("fd_socket path to str"))?);
459
460 Ok(())
461 }
462 .instrument(debug_span!("promise")),
463 )
464 }
465}