Skip to main content

conmonrs/
server.rs

1#![deny(missing_docs)]
2
3use crate::{
4    child_reaper::ChildReaper,
5    config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6    container_io::{ContainerIO, ContainerIOType},
7    fd_socket::FdSocket,
8    init::{DefaultInit, Init},
9    journal::Journal,
10    listener::{DefaultListener, Listener},
11    pause::Pause,
12    streaming_server::StreamingServer,
13    telemetry::Telemetry,
14    version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use libc::_exit;
23use nix::{
24    errno::Errno,
25    sys::signal::Signal,
26    unistd::{ForkResult, fork},
27};
28use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
29use opentelemetry_sdk::trace::SdkTracerProvider;
30use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
31use tokio::{
32    fs,
33    runtime::{Builder, Handle},
34    signal::unix::{SignalKind, signal},
35    sync::{RwLock, oneshot},
36    task::{self, LocalSet},
37};
38use tokio_util::compat::TokioAsyncReadCompatExt;
39use tracing::{Instrument, debug, debug_span, info};
40use tracing_opentelemetry::OpenTelemetrySpanExt;
41use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
42use twoparty::VatNetwork;
43
44#[derive(Debug)]
45/// The main server structure.
46pub struct Server {
47    /// Server configuration.
48    config: Arc<Config>,
49
50    /// Child reaper instance.
51    reaper: Arc<ChildReaper>,
52
53    /// Fd socket instance.
54    fd_socket: Arc<FdSocket>,
55
56    /// OpenTelemetry tracer instance.
57    tracer: Option<SdkTracerProvider>,
58
59    /// Streaming server instance.
60    streaming_server: Arc<RwLock<StreamingServer>>,
61}
62
63impl Server {
64    /// Server configuration.
65    pub(crate) fn config(&self) -> &Arc<Config> {
66        &self.config
67    }
68
69    /// Child reaper instance.
70    pub(crate) fn reaper(&self) -> &Arc<ChildReaper> {
71        &self.reaper
72    }
73
74    /// Fd socket instance.
75    pub(crate) fn fd_socket(&self) -> &Arc<FdSocket> {
76        &self.fd_socket
77    }
78
79    /// OpenTelemetry tracer instance.
80    pub(crate) fn tracer(&self) -> &Option<SdkTracerProvider> {
81        &self.tracer
82    }
83
84    /// Streaming server instance.
85    pub(crate) fn streaming_server(&self) -> &Arc<RwLock<StreamingServer>> {
86        &self.streaming_server
87    }
88
89    /// Create a new `Server` instance.
90    pub fn new() -> Result<Self> {
91        let server = Self {
92            config: Arc::new(Config::default()),
93            reaper: Default::default(),
94            fd_socket: Default::default(),
95            tracer: Default::default(),
96            streaming_server: Default::default(),
97        };
98
99        if let Some(v) = server.config().version() {
100            Version::new(v == Verbosity::Full).print();
101            process::exit(0);
102        }
103
104        if let Some(v) = server.config().version_json() {
105            Version::new(v == Verbosity::Full).print_json()?;
106            process::exit(0);
107        }
108
109        if let Some(Commands::Pause {
110            base_path,
111            pod_id,
112            ipc,
113            pid,
114            net,
115            user,
116            uts,
117            uid_mappings,
118            gid_mappings,
119        }) = server.config().command()
120        {
121            Pause::run(
122                base_path,
123                pod_id,
124                *ipc,
125                *pid,
126                *net,
127                *user,
128                *uts,
129                uid_mappings,
130                gid_mappings,
131            )
132            .context("run pause")?;
133            process::exit(0);
134        }
135
136        server.config().validate().context("validate config")?;
137
138        Self::init().context("init self")?;
139        Ok(server)
140    }
141
142    /// Start the `Server` instance and consume it.
143    pub fn start(self) -> Result<()> {
144        // We need to fork as early as possible, especially before setting up tokio.
145        // If we don't, the child will have a strange thread space and we're at risk of deadlocking.
146        // We also have to treat the parent as the child (as described in [1]) to ensure we don't
147        // interrupt the child's execution.
148        // 1: https://docs.rs/nix/0.23.0/nix/unistd/fn.fork.html#safety
149        if !self.config().skip_fork() {
150            match unsafe { fork()? } {
151                ForkResult::Parent { child, .. } => {
152                    write!(File::create(self.config().conmon_pidfile())?, "{child}")?;
153                    unsafe { _exit(0) };
154                }
155                ForkResult::Child => (),
156            }
157        }
158
159        // now that we've forked, set self to childreaper
160        let ret = unsafe { libc::prctl(libc::PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0) };
161        if ret != 0 {
162            return Err(Errno::last()).context("set child subreaper");
163        }
164
165        let tracer = self.tracer().clone();
166
167        let worker_threads = std::thread::available_parallelism()
168            .map(|n| n.get())
169            .unwrap_or(1)
170            .min(4);
171        debug!(
172            "Configuring Tokio runtime with {} worker threads",
173            worker_threads
174        );
175        let rt = Builder::new_multi_thread()
176            .worker_threads(worker_threads)
177            .thread_stack_size(512 * 1024)
178            .enable_all()
179            .build()?;
180        rt.block_on(self.spawn_tasks())?;
181
182        if let Some(tracer) = tracer {
183            tracer.shutdown().context("shutdown tracer")?;
184        }
185
186        rt.shutdown_timeout(std::time::Duration::from_secs(15));
187        Ok(())
188    }
189
190    fn init() -> Result<()> {
191        let init = Init::<DefaultInit>::default();
192        init.unset_locale()?;
193        init.set_default_umask();
194        // While we could configure this, standard practice has it as -1000,
195        // so it may be YAGNI to add configuration.
196        init.set_oom_score("-1000")
197    }
198
199    fn init_logging(&mut self) -> Result<()> {
200        let level = LevelFilter::from_str(self.config().log_level().as_ref())
201            .context("convert log level filter")?;
202
203        let telemetry_layer = if self.config().enable_tracing() {
204            let tracer = Telemetry::layer(self.config().tracing_endpoint())
205                .context("build telemetry layer")?;
206
207            self.tracer = Some(tracer.clone());
208
209            tracing_opentelemetry::layer()
210                .with_tracer(tracer.tracer(crate_name!()))
211                .into()
212        } else {
213            None
214        };
215
216        let registry = tracing_subscriber::registry().with(telemetry_layer);
217
218        match self.config().log_driver() {
219            LogDriver::None => {}
220            LogDriver::Stdout => {
221                let layer = tracing_subscriber::fmt::layer()
222                    .with_target(true)
223                    .with_line_number(true)
224                    .with_filter(level);
225                registry
226                    .with(layer)
227                    .try_init()
228                    .context("init stdout fmt layer")?;
229                info!("Using stdout logger");
230            }
231            LogDriver::Systemd => {
232                let layer = tracing_subscriber::fmt::layer()
233                    .with_target(true)
234                    .with_line_number(true)
235                    .without_time()
236                    .with_writer(Journal)
237                    .with_filter(level);
238                registry
239                    .with(layer)
240                    .try_init()
241                    .context("init journald fmt layer")?;
242                info!("Using systemd/journald logger");
243            }
244        }
245        info!("Set log level to: {}", self.config().log_level());
246        Ok(())
247    }
248
249    /// Spawns all required tokio tasks.
250    async fn spawn_tasks(mut self) -> Result<()> {
251        self.init_logging().context("init logging")?;
252
253        let (shutdown_tx, shutdown_rx) = oneshot::channel();
254        let socket = self.config().socket();
255        let fd_socket = self.config().fd_socket();
256        let reaper = self.reaper.clone();
257
258        let signal_handler_span = debug_span!("signal_handler");
259        task::spawn(
260            Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
261                .with_context(signal_handler_span.context())
262                .instrument(signal_handler_span),
263        );
264
265        let backend_span = debug_span!("backend");
266        task::spawn_blocking(move || {
267            Handle::current().block_on(
268                LocalSet::new()
269                    .run_until(self.start_backend(shutdown_rx))
270                    .with_context(backend_span.context())
271                    .instrument(backend_span),
272            )
273        })
274        .await?
275    }
276
277    async fn start_signal_handler<T: AsRef<Path>>(
278        reaper: Arc<ChildReaper>,
279        socket: T,
280        fd_socket: T,
281        shutdown_tx: oneshot::Sender<()>,
282    ) -> Result<()> {
283        let mut sigterm = signal(SignalKind::terminate())?;
284        let mut sigint = signal(SignalKind::interrupt())?;
285
286        tokio::select! {
287            _ = sigterm.recv() => {
288                info!("Received SIGTERM");
289            }
290            _ = sigint.recv() => {
291                info!("Received SIGINT");
292            }
293        }
294
295        if let Some(pause) = Pause::maybe_shared() {
296            pause.stop();
297        }
298
299        debug!("Starting grandchildren cleanup task");
300        // Always use SIGKILL to ensure immediate termination of container processes
301        reaper
302            .kill_grandchildren(Signal::SIGKILL)
303            .await
304            .context("unable to kill grandchildren")?;
305
306        debug!("Sending shutdown message");
307        shutdown_tx
308            .send(())
309            .map_err(|_| format_err!("unable to send shutdown message"))?;
310
311        debug!("Removing socket file {}", socket.as_ref().display());
312        fs::remove_file(socket)
313            .await
314            .context("remove existing socket file")?;
315
316        debug!("Removing fd socket file {}", fd_socket.as_ref().display());
317        fs::remove_file(fd_socket)
318            .await
319            .or_else(|err| {
320                if err.kind() == std::io::ErrorKind::NotFound {
321                    Ok(())
322                } else {
323                    Err(err)
324                }
325            })
326            .context("remove existing fd socket file")
327    }
328
329    async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
330        let listener =
331            Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
332        let client: conmon::Client = capnp_rpc::new_client(self);
333
334        loop {
335            let stream = tokio::select! {
336                _ = &mut shutdown_rx => {
337                    debug!("Received shutdown message");
338                    return Ok(())
339                }
340                stream = listener.accept() => {
341                    stream?.0
342                },
343            };
344            let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
345            let network = Box::new(VatNetwork::new(
346                reader,
347                writer,
348                Side::Server,
349                Default::default(),
350            ));
351            let rpc_system = RpcSystem::new(network, Some(client.clone().client));
352            task::spawn_local(Box::pin(rpc_system.map(|_| ())));
353        }
354    }
355}
356
357pub(crate) struct GenerateRuntimeArgs<'a> {
358    pub(crate) config: &'a Config,
359    pub(crate) id: &'a str,
360    pub(crate) container_io: &'a ContainerIO,
361    pub(crate) pidfile: &'a Path,
362    pub(crate) cgroup_manager: CgroupManager,
363}
364
365impl GenerateRuntimeArgs<'_> {
366    const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
367    const RUNTIME_CRUN: &'static str = "crun";
368    const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
369
370    /// Generate the OCI runtime CLI arguments from the provided parameters.
371    pub fn create_args(
372        self,
373        bundle_path: &Path,
374        global_args: Reader,
375        command_args: Reader,
376    ) -> Result<Vec<String>> {
377        // Pre-allocate capacity for typical arg count to reduce reallocations
378        let mut args = Vec::with_capacity(16);
379        args.extend(self.default_args().context("build default runtime args")?);
380
381        if let Some(rr) = self.config.runtime_root() {
382            args.push(format!("--root={}", rr.display()));
383        }
384
385        if self.cgroup_manager == CgroupManager::Systemd {
386            args.push(Self::SYSTEMD_CGROUP_ARG.into());
387        }
388
389        for arg in global_args {
390            args.push(arg?.to_string()?);
391        }
392
393        // Use static strings where possible to avoid allocations
394        args.push("create".into());
395        args.push("--bundle".into());
396        args.push(bundle_path.display().to_string());
397        args.push("--pid-file".into());
398        args.push(self.pidfile.display().to_string());
399
400        for arg in command_args {
401            args.push(arg?.to_string()?);
402        }
403
404        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
405            args.push(format!("--console-socket={}", terminal.path().display()));
406        }
407
408        args.push(self.id.into());
409
410        debug!("Runtime args {:?}", args.join(" "));
411        Ok(args)
412    }
413
414    /// Generate the OCI runtime CLI arguments from the provided parameters.
415    pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
416        let mut args = self
417            .exec_sync_args_without_command()
418            .context("exec sync args without command")?;
419
420        for arg in command {
421            args.push(arg?.to_string()?);
422        }
423
424        debug!("Exec args {:?}", args.join(" "));
425        Ok(args)
426    }
427
428    pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
429        // Pre-allocate capacity for typical arg count
430        let mut args = Vec::with_capacity(12);
431        args.extend(self.default_args().context("build default runtime args")?);
432
433        if let Some(rr) = self.config.runtime_root() {
434            args.push(format!("--root={}", rr.display()));
435        }
436
437        if self.cgroup_manager == CgroupManager::Systemd {
438            args.push(Self::SYSTEMD_CGROUP_ARG.into());
439        }
440
441        // Use static strings to avoid allocations
442        args.push("exec".into());
443        args.push("-d".into());
444
445        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
446            args.push(format!("--console-socket={}", terminal.path().display()));
447            args.push("--tty".into());
448        }
449
450        args.push(format!("--pid-file={}", self.pidfile.display()));
451        args.push(self.id.into());
452
453        Ok(args)
454    }
455
456    /// Build the default arguments for any provided runtime.
457    fn default_args(&self) -> Result<Vec<String>> {
458        let mut args = vec![];
459
460        if self
461            .config
462            .runtime()
463            .file_name()
464            .context("no filename in path")?
465            == Self::RUNTIME_CRUN
466        {
467            debug!("Found crun used as runtime");
468            args.push(format!("--log=journald:{}", self.id));
469
470            match self.config.log_level() {
471                &LogLevel::Debug | &LogLevel::Error => args.push(format!(
472                    "{}={}",
473                    Self::LOG_LEVEL_FLAG_CRUN,
474                    self.config.log_level()
475                )),
476                &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
477                _ => {}
478            }
479        }
480
481        if let Some(rr) = self.config.runtime_root() {
482            args.push(format!("--root={}", rr.display()));
483        }
484
485        if self.cgroup_manager == CgroupManager::Systemd {
486            args.push(Self::SYSTEMD_CGROUP_ARG.into());
487        }
488
489        Ok(args)
490    }
491}