1#![deny(missing_docs)]
2
3use crate::{
4 child_reaper::ChildReaper,
5 config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6 container_io::{ContainerIO, ContainerIOType},
7 fd_socket::FdSocket,
8 init::{DefaultInit, Init},
9 journal::Journal,
10 listener::{DefaultListener, Listener},
11 pause::Pause,
12 streaming_server::StreamingServer,
13 telemetry::Telemetry,
14 version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use libc::_exit;
23use nix::{
24 errno::Errno,
25 sys::signal::Signal,
26 unistd::{ForkResult, fork},
27};
28use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
29use opentelemetry_sdk::trace::SdkTracerProvider;
30use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
31use tokio::{
32 fs,
33 runtime::{Builder, Handle},
34 signal::unix::{SignalKind, signal},
35 sync::{RwLock, oneshot},
36 task::{self, LocalSet},
37};
38use tokio_util::compat::TokioAsyncReadCompatExt;
39use tracing::{Instrument, debug, debug_span, info};
40use tracing_opentelemetry::OpenTelemetrySpanExt;
41use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
42use twoparty::VatNetwork;
43
44#[derive(Debug)]
45pub struct Server {
47 config: Arc<Config>,
49
50 reaper: Arc<ChildReaper>,
52
53 fd_socket: Arc<FdSocket>,
55
56 tracer: Option<SdkTracerProvider>,
58
59 streaming_server: Arc<RwLock<StreamingServer>>,
61}
62
63impl Server {
64 pub(crate) fn config(&self) -> &Arc<Config> {
66 &self.config
67 }
68
69 pub(crate) fn reaper(&self) -> &Arc<ChildReaper> {
71 &self.reaper
72 }
73
74 pub(crate) fn fd_socket(&self) -> &Arc<FdSocket> {
76 &self.fd_socket
77 }
78
79 pub(crate) fn tracer(&self) -> &Option<SdkTracerProvider> {
81 &self.tracer
82 }
83
84 pub(crate) fn streaming_server(&self) -> &Arc<RwLock<StreamingServer>> {
86 &self.streaming_server
87 }
88
89 pub fn new() -> Result<Self> {
91 let server = Self {
92 config: Arc::new(Config::default()),
93 reaper: Default::default(),
94 fd_socket: Default::default(),
95 tracer: Default::default(),
96 streaming_server: Default::default(),
97 };
98
99 if let Some(v) = server.config().version() {
100 Version::new(v == Verbosity::Full).print();
101 process::exit(0);
102 }
103
104 if let Some(v) = server.config().version_json() {
105 Version::new(v == Verbosity::Full).print_json()?;
106 process::exit(0);
107 }
108
109 if let Some(Commands::Pause {
110 base_path,
111 pod_id,
112 ipc,
113 pid,
114 net,
115 user,
116 uts,
117 uid_mappings,
118 gid_mappings,
119 }) = server.config().command()
120 {
121 Pause::run(
122 base_path,
123 pod_id,
124 *ipc,
125 *pid,
126 *net,
127 *user,
128 *uts,
129 uid_mappings,
130 gid_mappings,
131 )
132 .context("run pause")?;
133 process::exit(0);
134 }
135
136 server.config().validate().context("validate config")?;
137
138 Self::init().context("init self")?;
139 Ok(server)
140 }
141
142 pub fn start(self) -> Result<()> {
144 if !self.config().skip_fork() {
150 match unsafe { fork()? } {
151 ForkResult::Parent { child, .. } => {
152 write!(File::create(self.config().conmon_pidfile())?, "{child}")?;
153 unsafe { _exit(0) };
154 }
155 ForkResult::Child => (),
156 }
157 }
158
159 let ret = unsafe { libc::prctl(libc::PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0) };
161 if ret != 0 {
162 return Err(Errno::last()).context("set child subreaper");
163 }
164
165 let tracer = self.tracer().clone();
166
167 let worker_threads = std::thread::available_parallelism()
168 .map(|n| n.get())
169 .unwrap_or(1)
170 .min(4);
171 debug!(
172 "Configuring Tokio runtime with {} worker threads",
173 worker_threads
174 );
175 let rt = Builder::new_multi_thread()
176 .worker_threads(worker_threads)
177 .thread_stack_size(512 * 1024)
178 .enable_all()
179 .build()?;
180 rt.block_on(self.spawn_tasks())?;
181
182 if let Some(tracer) = tracer {
183 tracer.shutdown().context("shutdown tracer")?;
184 }
185
186 rt.shutdown_timeout(std::time::Duration::from_secs(15));
187 Ok(())
188 }
189
190 fn init() -> Result<()> {
191 let init = Init::<DefaultInit>::default();
192 init.unset_locale()?;
193 init.set_default_umask();
194 init.set_oom_score("-1000")
197 }
198
199 fn init_logging(&mut self) -> Result<()> {
200 let level = LevelFilter::from_str(self.config().log_level().as_ref())
201 .context("convert log level filter")?;
202
203 let telemetry_layer = if self.config().enable_tracing() {
204 let tracer = Telemetry::layer(self.config().tracing_endpoint())
205 .context("build telemetry layer")?;
206
207 self.tracer = Some(tracer.clone());
208
209 tracing_opentelemetry::layer()
210 .with_tracer(tracer.tracer(crate_name!()))
211 .into()
212 } else {
213 None
214 };
215
216 let registry = tracing_subscriber::registry().with(telemetry_layer);
217
218 match self.config().log_driver() {
219 LogDriver::None => {}
220 LogDriver::Stdout => {
221 let layer = tracing_subscriber::fmt::layer()
222 .with_target(true)
223 .with_line_number(true)
224 .with_filter(level);
225 registry
226 .with(layer)
227 .try_init()
228 .context("init stdout fmt layer")?;
229 info!("Using stdout logger");
230 }
231 LogDriver::Systemd => {
232 let layer = tracing_subscriber::fmt::layer()
233 .with_target(true)
234 .with_line_number(true)
235 .without_time()
236 .with_writer(Journal)
237 .with_filter(level);
238 registry
239 .with(layer)
240 .try_init()
241 .context("init journald fmt layer")?;
242 info!("Using systemd/journald logger");
243 }
244 }
245 info!("Set log level to: {}", self.config().log_level());
246 Ok(())
247 }
248
249 async fn spawn_tasks(mut self) -> Result<()> {
251 self.init_logging().context("init logging")?;
252
253 let (shutdown_tx, shutdown_rx) = oneshot::channel();
254 let socket = self.config().socket();
255 let fd_socket = self.config().fd_socket();
256 let reaper = self.reaper.clone();
257
258 let signal_handler_span = debug_span!("signal_handler");
259 task::spawn(
260 Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
261 .with_context(signal_handler_span.context())
262 .instrument(signal_handler_span),
263 );
264
265 let backend_span = debug_span!("backend");
266 task::spawn_blocking(move || {
267 Handle::current().block_on(
268 LocalSet::new()
269 .run_until(self.start_backend(shutdown_rx))
270 .with_context(backend_span.context())
271 .instrument(backend_span),
272 )
273 })
274 .await?
275 }
276
277 async fn start_signal_handler<T: AsRef<Path>>(
278 reaper: Arc<ChildReaper>,
279 socket: T,
280 fd_socket: T,
281 shutdown_tx: oneshot::Sender<()>,
282 ) -> Result<()> {
283 let mut sigterm = signal(SignalKind::terminate())?;
284 let mut sigint = signal(SignalKind::interrupt())?;
285
286 tokio::select! {
287 _ = sigterm.recv() => {
288 info!("Received SIGTERM");
289 }
290 _ = sigint.recv() => {
291 info!("Received SIGINT");
292 }
293 }
294
295 if let Some(pause) = Pause::maybe_shared() {
296 pause.stop();
297 }
298
299 debug!("Starting grandchildren cleanup task");
300 reaper
302 .kill_grandchildren(Signal::SIGKILL)
303 .await
304 .context("unable to kill grandchildren")?;
305
306 debug!("Sending shutdown message");
307 shutdown_tx
308 .send(())
309 .map_err(|_| format_err!("unable to send shutdown message"))?;
310
311 debug!("Removing socket file {}", socket.as_ref().display());
312 fs::remove_file(socket)
313 .await
314 .context("remove existing socket file")?;
315
316 debug!("Removing fd socket file {}", fd_socket.as_ref().display());
317 fs::remove_file(fd_socket)
318 .await
319 .or_else(|err| {
320 if err.kind() == std::io::ErrorKind::NotFound {
321 Ok(())
322 } else {
323 Err(err)
324 }
325 })
326 .context("remove existing fd socket file")
327 }
328
329 async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
330 let listener =
331 Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
332 let client: conmon::Client = capnp_rpc::new_client(self);
333
334 loop {
335 let stream = tokio::select! {
336 _ = &mut shutdown_rx => {
337 debug!("Received shutdown message");
338 return Ok(())
339 }
340 stream = listener.accept() => {
341 stream?.0
342 },
343 };
344 let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
345 let network = Box::new(VatNetwork::new(
346 reader,
347 writer,
348 Side::Server,
349 Default::default(),
350 ));
351 let rpc_system = RpcSystem::new(network, Some(client.clone().client));
352 task::spawn_local(Box::pin(rpc_system.map(|_| ())));
353 }
354 }
355}
356
357pub(crate) struct GenerateRuntimeArgs<'a> {
358 pub(crate) config: &'a Config,
359 pub(crate) id: &'a str,
360 pub(crate) container_io: &'a ContainerIO,
361 pub(crate) pidfile: &'a Path,
362 pub(crate) cgroup_manager: CgroupManager,
363}
364
365impl GenerateRuntimeArgs<'_> {
366 const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
367 const RUNTIME_CRUN: &'static str = "crun";
368 const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
369
370 pub fn create_args(
372 self,
373 bundle_path: &Path,
374 global_args: Reader,
375 command_args: Reader,
376 ) -> Result<Vec<String>> {
377 let mut args = Vec::with_capacity(16);
379 args.extend(self.default_args().context("build default runtime args")?);
380
381 if let Some(rr) = self.config.runtime_root() {
382 args.push(format!("--root={}", rr.display()));
383 }
384
385 if self.cgroup_manager == CgroupManager::Systemd {
386 args.push(Self::SYSTEMD_CGROUP_ARG.into());
387 }
388
389 for arg in global_args {
390 args.push(arg?.to_string()?);
391 }
392
393 args.push("create".into());
395 args.push("--bundle".into());
396 args.push(bundle_path.display().to_string());
397 args.push("--pid-file".into());
398 args.push(self.pidfile.display().to_string());
399
400 for arg in command_args {
401 args.push(arg?.to_string()?);
402 }
403
404 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
405 args.push(format!("--console-socket={}", terminal.path().display()));
406 }
407
408 args.push(self.id.into());
409
410 debug!("Runtime args {:?}", args.join(" "));
411 Ok(args)
412 }
413
414 pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
416 let mut args = self
417 .exec_sync_args_without_command()
418 .context("exec sync args without command")?;
419
420 for arg in command {
421 args.push(arg?.to_string()?);
422 }
423
424 debug!("Exec args {:?}", args.join(" "));
425 Ok(args)
426 }
427
428 pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
429 let mut args = Vec::with_capacity(12);
431 args.extend(self.default_args().context("build default runtime args")?);
432
433 if let Some(rr) = self.config.runtime_root() {
434 args.push(format!("--root={}", rr.display()));
435 }
436
437 if self.cgroup_manager == CgroupManager::Systemd {
438 args.push(Self::SYSTEMD_CGROUP_ARG.into());
439 }
440
441 args.push("exec".into());
443 args.push("-d".into());
444
445 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
446 args.push(format!("--console-socket={}", terminal.path().display()));
447 args.push("--tty".into());
448 }
449
450 args.push(format!("--pid-file={}", self.pidfile.display()));
451 args.push(self.id.into());
452
453 Ok(args)
454 }
455
456 fn default_args(&self) -> Result<Vec<String>> {
458 let mut args = vec![];
459
460 if self
461 .config
462 .runtime()
463 .file_name()
464 .context("no filename in path")?
465 == Self::RUNTIME_CRUN
466 {
467 debug!("Found crun used as runtime");
468 args.push(format!("--log=journald:{}", self.id));
469
470 match self.config.log_level() {
471 &LogLevel::Debug | &LogLevel::Error => args.push(format!(
472 "{}={}",
473 Self::LOG_LEVEL_FLAG_CRUN,
474 self.config.log_level()
475 )),
476 &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
477 _ => {}
478 }
479 }
480
481 if let Some(rr) = self.config.runtime_root() {
482 args.push(format!("--root={}", rr.display()));
483 }
484
485 if self.cgroup_manager == CgroupManager::Systemd {
486 args.push(Self::SYSTEMD_CGROUP_ARG.into());
487 }
488
489 Ok(args)
490 }
491}