1#![deny(missing_docs)]
2
3use crate::{
4 child_reaper::ChildReaper,
5 config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6 container_io::{ContainerIO, ContainerIOType},
7 fd_socket::FdSocket,
8 init::{DefaultInit, Init},
9 journal::Journal,
10 listener::{DefaultListener, Listener},
11 pause::Pause,
12 streaming_server::StreamingServer,
13 telemetry::Telemetry,
14 version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use getset::Getters;
23use nix::{
24 errno::Errno,
25 libc::_exit,
26 sys::signal::Signal,
27 unistd::{ForkResult, fork},
28};
29use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
30use opentelemetry_sdk::trace::SdkTracerProvider;
31use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
32use tokio::{
33 fs,
34 runtime::{Builder, Handle},
35 signal::unix::{SignalKind, signal},
36 sync::{RwLock, oneshot},
37 task::{self, LocalSet},
38};
39use tokio_util::compat::TokioAsyncReadCompatExt;
40use tracing::{Instrument, debug, debug_span, info};
41use tracing_opentelemetry::OpenTelemetrySpanExt;
42use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
43use twoparty::VatNetwork;
44
45#[derive(Debug, Getters)]
46pub struct Server {
48 #[getset(get = "pub(crate)")]
50 config: Config,
51
52 #[getset(get = "pub(crate)")]
54 reaper: Arc<ChildReaper>,
55
56 #[getset(get = "pub(crate)")]
58 fd_socket: Arc<FdSocket>,
59
60 #[getset(get = "pub(crate)")]
62 tracer: Option<SdkTracerProvider>,
63
64 #[getset(get = "pub(crate)")]
66 streaming_server: Arc<RwLock<StreamingServer>>,
67}
68
69impl Server {
70 pub fn new() -> Result<Self> {
72 let server = Self {
73 config: Default::default(),
74 reaper: Default::default(),
75 fd_socket: Default::default(),
76 tracer: Default::default(),
77 streaming_server: Default::default(),
78 };
79
80 if let Some(v) = server.config().version() {
81 Version::new(v == Verbosity::Full).print();
82 process::exit(0);
83 }
84
85 if let Some(Commands::Pause {
86 base_path,
87 pod_id,
88 ipc,
89 pid,
90 net,
91 user,
92 uts,
93 uid_mappings,
94 gid_mappings,
95 }) = server.config().command()
96 {
97 Pause::run(
98 base_path,
99 pod_id,
100 *ipc,
101 *pid,
102 *net,
103 *user,
104 *uts,
105 uid_mappings,
106 gid_mappings,
107 )
108 .context("run pause")?;
109 process::exit(0);
110 }
111
112 server.config().validate().context("validate config")?;
113
114 Self::init().context("init self")?;
115 Ok(server)
116 }
117
118 pub fn start(self) -> Result<()> {
120 if !self.config().skip_fork() {
126 match unsafe { fork()? } {
127 ForkResult::Parent { child, .. } => {
128 let child_str = format!("{child}");
129 File::create(self.config().conmon_pidfile())?
130 .write_all(child_str.as_bytes())?;
131 unsafe { _exit(0) };
132 }
133 ForkResult::Child => (),
134 }
135 }
136
137 prctl::set_child_subreaper(true)
139 .map_err(Errno::from_raw)
140 .context("set child subreaper")?;
141
142 let tracer = self.tracer().clone();
143
144 let rt = Builder::new_multi_thread().enable_all().build()?;
145 rt.block_on(self.spawn_tasks())?;
146
147 if let Some(tracer) = tracer {
148 tracer.shutdown().context("shutdown tracer")?;
149 }
150
151 rt.shutdown_background();
152 Ok(())
153 }
154
155 fn init() -> Result<()> {
156 let init = Init::<DefaultInit>::default();
157 init.unset_locale()?;
158 init.set_default_umask();
159 init.set_oom_score("-1000")
162 }
163
164 fn init_logging(&mut self) -> Result<()> {
165 let level = LevelFilter::from_str(self.config().log_level().as_ref())
166 .context("convert log level filter")?;
167
168 let telemetry_layer = if self.config().enable_tracing() {
169 let tracer = Telemetry::layer(self.config().tracing_endpoint())
170 .context("build telemetry layer")?;
171
172 self.tracer = Some(tracer.clone());
173
174 tracing_opentelemetry::layer()
175 .with_tracer(tracer.tracer(crate_name!()))
176 .into()
177 } else {
178 None
179 };
180
181 let registry = tracing_subscriber::registry().with(telemetry_layer);
182
183 match self.config().log_driver() {
184 LogDriver::Stdout => {
185 let layer = tracing_subscriber::fmt::layer()
186 .with_target(true)
187 .with_line_number(true)
188 .with_filter(level);
189 registry
190 .with(layer)
191 .try_init()
192 .context("init stdout fmt layer")?;
193 info!("Using stdout logger");
194 }
195 LogDriver::Systemd => {
196 let layer = tracing_subscriber::fmt::layer()
197 .with_target(true)
198 .with_line_number(true)
199 .without_time()
200 .with_writer(Journal)
201 .with_filter(level);
202 registry
203 .with(layer)
204 .try_init()
205 .context("init journald fmt layer")?;
206 info!("Using systemd/journald logger");
207 }
208 }
209 info!("Set log level to: {}", self.config().log_level());
210 Ok(())
211 }
212
213 async fn spawn_tasks(mut self) -> Result<()> {
215 self.init_logging().context("init logging")?;
216
217 let (shutdown_tx, shutdown_rx) = oneshot::channel();
218 let socket = self.config().socket();
219 let fd_socket = self.config().fd_socket();
220 let reaper = self.reaper.clone();
221
222 let signal_handler_span = debug_span!("signal_handler");
223 task::spawn(
224 Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
225 .with_context(signal_handler_span.context())
226 .instrument(signal_handler_span),
227 );
228
229 let backend_span = debug_span!("backend");
230 task::spawn_blocking(move || {
231 Handle::current().block_on(
232 LocalSet::new()
233 .run_until(self.start_backend(shutdown_rx))
234 .with_context(backend_span.context())
235 .instrument(backend_span),
236 )
237 })
238 .await?
239 }
240
241 async fn start_signal_handler<T: AsRef<Path>>(
242 reaper: Arc<ChildReaper>,
243 socket: T,
244 fd_socket: T,
245 shutdown_tx: oneshot::Sender<()>,
246 ) -> Result<()> {
247 let mut sigterm = signal(SignalKind::terminate())?;
248 let mut sigint = signal(SignalKind::interrupt())?;
249 let handled_sig: Signal;
250
251 tokio::select! {
252 _ = sigterm.recv() => {
253 info!("Received SIGTERM");
254 handled_sig = Signal::SIGTERM;
255 }
256 _ = sigint.recv() => {
257 info!("Received SIGINT");
258 handled_sig = Signal::SIGINT;
259 }
260 }
261
262 if let Some(pause) = Pause::maybe_shared() {
263 pause.stop();
264 }
265
266 debug!("Starting grandchildren cleanup task");
267 reaper
268 .kill_grandchildren(handled_sig)
269 .context("unable to kill grandchildren")?;
270
271 debug!("Sending shutdown message");
272 shutdown_tx
273 .send(())
274 .map_err(|_| format_err!("unable to send shutdown message"))?;
275
276 debug!("Removing socket file {}", socket.as_ref().display());
277 fs::remove_file(socket)
278 .await
279 .context("remove existing socket file")?;
280
281 debug!("Removing fd socket file {}", fd_socket.as_ref().display());
282 fs::remove_file(fd_socket)
283 .await
284 .or_else(|err| {
285 if err.kind() == std::io::ErrorKind::NotFound {
286 Ok(())
287 } else {
288 Err(err)
289 }
290 })
291 .context("remove existing fd socket file")
292 }
293
294 async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
295 let listener =
296 Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
297 let client: conmon::Client = capnp_rpc::new_client(self);
298
299 loop {
300 let stream = tokio::select! {
301 _ = &mut shutdown_rx => {
302 debug!("Received shutdown message");
303 return Ok(())
304 }
305 stream = listener.accept() => {
306 stream?.0
307 },
308 };
309 let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
310 let network = Box::new(VatNetwork::new(
311 reader,
312 writer,
313 Side::Server,
314 Default::default(),
315 ));
316 let rpc_system = RpcSystem::new(network, Some(client.clone().client));
317 task::spawn_local(Box::pin(rpc_system.map(|_| ())));
318 }
319 }
320}
321
322pub(crate) struct GenerateRuntimeArgs<'a> {
323 pub(crate) config: &'a Config,
324 pub(crate) id: &'a str,
325 pub(crate) container_io: &'a ContainerIO,
326 pub(crate) pidfile: &'a Path,
327 pub(crate) cgroup_manager: CgroupManager,
328}
329
330impl GenerateRuntimeArgs<'_> {
331 const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
332 const RUNTIME_CRUN: &'static str = "crun";
333 const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
334
335 pub fn create_args(
337 self,
338 bundle_path: &Path,
339 global_args: Reader,
340 command_args: Reader,
341 ) -> Result<Vec<String>> {
342 let mut args = self.default_args().context("build default runtime args")?;
343
344 if let Some(rr) = self.config.runtime_root() {
345 args.push(format!("--root={}", rr.display()));
346 }
347
348 if self.cgroup_manager == CgroupManager::Systemd {
349 args.push(Self::SYSTEMD_CGROUP_ARG.into());
350 }
351
352 for arg in global_args {
353 args.push(arg?.to_string()?);
354 }
355
356 args.extend([
357 "create".to_string(),
358 "--bundle".to_string(),
359 bundle_path.display().to_string(),
360 "--pid-file".to_string(),
361 self.pidfile.display().to_string(),
362 ]);
363
364 for arg in command_args {
365 args.push(arg?.to_string()?);
366 }
367
368 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
369 args.push(format!("--console-socket={}", terminal.path().display()));
370 }
371
372 args.push(self.id.into());
373
374 debug!("Runtime args {:?}", args.join(" "));
375 Ok(args)
376 }
377
378 pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
380 let mut args = self
381 .exec_sync_args_without_command()
382 .context("exec sync args without command")?;
383
384 for arg in command {
385 args.push(arg?.to_string()?);
386 }
387
388 debug!("Exec args {:?}", args.join(" "));
389 Ok(args)
390 }
391
392 pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
393 let mut args = self.default_args().context("build default runtime args")?;
394
395 if let Some(rr) = self.config.runtime_root() {
396 args.push(format!("--root={}", rr.display()));
397 }
398
399 if self.cgroup_manager == CgroupManager::Systemd {
400 args.push(Self::SYSTEMD_CGROUP_ARG.into());
401 }
402
403 args.push("exec".to_string());
404 args.push("-d".to_string());
405
406 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
407 args.push(format!("--console-socket={}", terminal.path().display()));
408 args.push("--tty".to_string());
409 }
410
411 args.push(format!("--pid-file={}", self.pidfile.display()));
412 args.push(self.id.into());
413
414 Ok(args)
415 }
416
417 fn default_args(&self) -> Result<Vec<String>> {
419 let mut args = vec![];
420
421 if self
422 .config
423 .runtime()
424 .file_name()
425 .context("no filename in path")?
426 == Self::RUNTIME_CRUN
427 {
428 debug!("Found crun used as runtime");
429 args.push(format!("--log=journald:{}", self.id));
430
431 match self.config.log_level() {
432 &LogLevel::Debug | &LogLevel::Error => args.push(format!(
433 "{}={}",
434 Self::LOG_LEVEL_FLAG_CRUN,
435 self.config.log_level()
436 )),
437 &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
438 _ => {}
439 }
440 }
441
442 if let Some(rr) = self.config.runtime_root() {
443 args.push(format!("--root={}", rr.display()));
444 }
445
446 if self.cgroup_manager == CgroupManager::Systemd {
447 args.push(Self::SYSTEMD_CGROUP_ARG.into());
448 }
449
450 Ok(args)
451 }
452}