conmonrs/
server.rs

1#![deny(missing_docs)]
2
3use crate::{
4    child_reaper::ChildReaper,
5    config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6    container_io::{ContainerIO, ContainerIOType},
7    fd_socket::FdSocket,
8    init::{DefaultInit, Init},
9    journal::Journal,
10    listener::{DefaultListener, Listener},
11    pause::Pause,
12    telemetry::Telemetry,
13    version::Version,
14};
15use anyhow::{format_err, Context, Result};
16use capnp::text_list::Reader;
17use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem};
18use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
19use futures::{AsyncReadExt, FutureExt};
20use getset::Getters;
21use nix::{
22    errno,
23    libc::_exit,
24    sys::signal::Signal,
25    unistd::{fork, ForkResult},
26};
27use opentelemetry::trace::FutureExt as OpenTelemetryFutureExt;
28use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
29use tokio::{
30    fs,
31    runtime::{Builder, Handle},
32    signal::unix::{signal, SignalKind},
33    sync::oneshot,
34    task::{self, LocalSet},
35};
36use tokio_util::compat::TokioAsyncReadCompatExt;
37use tracing::{debug, debug_span, info, Instrument};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
40use twoparty::VatNetwork;
41
42#[derive(Debug, Getters)]
43/// The main server structure.
44pub struct Server {
45    /// Server configuration.
46    #[getset(get = "pub(crate)")]
47    config: Config,
48
49    /// Child reaper instance.
50    #[getset(get = "pub(crate)")]
51    reaper: Arc<ChildReaper>,
52
53    /// Fd socket instance.
54    #[getset(get = "pub(crate)")]
55    fd_socket: Arc<FdSocket>,
56}
57
58impl Server {
59    /// Create a new `Server` instance.
60    pub fn new() -> Result<Self> {
61        let server = Self {
62            config: Default::default(),
63            reaper: Default::default(),
64            fd_socket: Default::default(),
65        };
66
67        if let Some(v) = server.config().version() {
68            Version::new(v == Verbosity::Full).print();
69            process::exit(0);
70        }
71
72        if let Some(Commands::Pause {
73            base_path,
74            pod_id,
75            ipc,
76            pid,
77            net,
78            user,
79            uts,
80            uid_mappings,
81            gid_mappings,
82        }) = server.config().command()
83        {
84            Pause::run(
85                base_path,
86                pod_id,
87                *ipc,
88                *pid,
89                *net,
90                *user,
91                *uts,
92                uid_mappings,
93                gid_mappings,
94            )
95            .context("run pause")?;
96            process::exit(0);
97        }
98
99        server.config().validate().context("validate config")?;
100
101        Self::init().context("init self")?;
102        Ok(server)
103    }
104
105    /// Start the `Server` instance and consume it.
106    pub fn start(self) -> Result<()> {
107        // We need to fork as early as possible, especially before setting up tokio.
108        // If we don't, the child will have a strange thread space and we're at risk of deadlocking.
109        // We also have to treat the parent as the child (as described in [1]) to ensure we don't
110        // interrupt the child's execution.
111        // 1: https://docs.rs/nix/0.23.0/nix/unistd/fn.fork.html#safety
112        if !self.config().skip_fork() {
113            match unsafe { fork()? } {
114                ForkResult::Parent { child, .. } => {
115                    let child_str = format!("{child}");
116                    File::create(self.config().conmon_pidfile())?
117                        .write_all(child_str.as_bytes())?;
118                    unsafe { _exit(0) };
119                }
120                ForkResult::Child => (),
121            }
122        }
123
124        // now that we've forked, set self to childreaper
125        prctl::set_child_subreaper(true)
126            .map_err(errno::from_i32)
127            .context("set child subreaper")?;
128
129        let enable_tracing = self.config().enable_tracing();
130
131        let rt = Builder::new_multi_thread().enable_all().build()?;
132        rt.block_on(self.spawn_tasks())?;
133
134        if enable_tracing {
135            Telemetry::shutdown();
136        }
137
138        rt.shutdown_background();
139        Ok(())
140    }
141
142    fn init() -> Result<()> {
143        let init = Init::<DefaultInit>::default();
144        init.unset_locale()?;
145        init.set_default_umask();
146        // While we could configure this, standard practice has it as -1000,
147        // so it may be YAGNI to add configuration.
148        init.set_oom_score("-1000")
149    }
150
151    fn init_logging(&self) -> Result<()> {
152        let level = LevelFilter::from_str(self.config().log_level().as_ref())
153            .context("convert log level filter")?;
154
155        let telemetry_layer = if self.config().enable_tracing() {
156            Telemetry::layer(self.config().tracing_endpoint())
157                .context("build telemetry layer")?
158                .into()
159        } else {
160            None
161        };
162
163        let registry = tracing_subscriber::registry().with(telemetry_layer);
164
165        match self.config().log_driver() {
166            LogDriver::Stdout => {
167                let layer = tracing_subscriber::fmt::layer()
168                    .with_target(true)
169                    .with_line_number(true)
170                    .with_filter(level);
171                registry
172                    .with(layer)
173                    .try_init()
174                    .context("init stdout fmt layer")?;
175                info!("Using stdout logger");
176            }
177            LogDriver::Systemd => {
178                let layer = tracing_subscriber::fmt::layer()
179                    .with_target(true)
180                    .with_line_number(true)
181                    .without_time()
182                    .with_writer(Journal)
183                    .with_filter(level);
184                registry
185                    .with(layer)
186                    .try_init()
187                    .context("init journald fmt layer")?;
188                info!("Using systemd/journald logger");
189            }
190        }
191        info!("Set log level to: {}", self.config().log_level());
192        Ok(())
193    }
194
195    /// Spawns all required tokio tasks.
196    async fn spawn_tasks(self) -> Result<()> {
197        self.init_logging().context("init logging")?;
198
199        let (shutdown_tx, shutdown_rx) = oneshot::channel();
200        let socket = self.config().socket();
201        let fd_socket = self.config().fd_socket();
202        let reaper = self.reaper.clone();
203
204        let signal_handler_span = debug_span!("signal_handler");
205        task::spawn(
206            Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
207                .with_context(signal_handler_span.context())
208                .instrument(signal_handler_span),
209        );
210
211        let backend_span = debug_span!("backend");
212        task::spawn_blocking(move || {
213            Handle::current().block_on(
214                LocalSet::new()
215                    .run_until(self.start_backend(shutdown_rx))
216                    .with_context(backend_span.context())
217                    .instrument(backend_span),
218            )
219        })
220        .await?
221    }
222
223    async fn start_signal_handler<T: AsRef<Path>>(
224        reaper: Arc<ChildReaper>,
225        socket: T,
226        fd_socket: T,
227        shutdown_tx: oneshot::Sender<()>,
228    ) -> Result<()> {
229        let mut sigterm = signal(SignalKind::terminate())?;
230        let mut sigint = signal(SignalKind::interrupt())?;
231        let handled_sig: Signal;
232
233        tokio::select! {
234            _ = sigterm.recv() => {
235                info!("Received SIGTERM");
236                handled_sig = Signal::SIGTERM;
237            }
238            _ = sigint.recv() => {
239                info!("Received SIGINT");
240                handled_sig = Signal::SIGINT;
241            }
242        }
243
244        if let Some(pause) = Pause::maybe_shared() {
245            pause.stop();
246        }
247
248        debug!("Starting grandchildren cleanup task");
249        reaper
250            .kill_grandchildren(handled_sig)
251            .context("unable to kill grandchildren")?;
252
253        debug!("Sending shutdown message");
254        shutdown_tx
255            .send(())
256            .map_err(|_| format_err!("unable to send shutdown message"))?;
257
258        debug!("Removing socket file {}", socket.as_ref().display());
259        fs::remove_file(socket)
260            .await
261            .context("remove existing socket file")?;
262
263        debug!("Removing fd socket file {}", fd_socket.as_ref().display());
264        fs::remove_file(fd_socket)
265            .await
266            .or_else(|err| {
267                if err.kind() == std::io::ErrorKind::NotFound {
268                    Ok(())
269                } else {
270                    Err(err)
271                }
272            })
273            .context("remove existing fd socket file")
274    }
275
276    async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
277        let listener =
278            Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
279        let client: conmon::Client = capnp_rpc::new_client(self);
280
281        loop {
282            let stream = tokio::select! {
283                _ = &mut shutdown_rx => {
284                    debug!("Received shutdown message");
285                    return Ok(())
286                }
287                stream = listener.accept() => {
288                    stream?.0
289                },
290            };
291            let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
292            let network = Box::new(VatNetwork::new(
293                reader,
294                writer,
295                Side::Server,
296                Default::default(),
297            ));
298            let rpc_system = RpcSystem::new(network, Some(client.clone().client));
299            task::spawn_local(Box::pin(rpc_system.map(|_| ())));
300        }
301    }
302}
303
304pub(crate) struct GenerateRuntimeArgs<'a> {
305    pub(crate) config: &'a Config,
306    pub(crate) id: &'a str,
307    pub(crate) container_io: &'a ContainerIO,
308    pub(crate) pidfile: &'a Path,
309    pub(crate) cgroup_manager: CgroupManager,
310}
311
312impl GenerateRuntimeArgs<'_> {
313    const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
314    const RUNTIME_CRUN: &'static str = "crun";
315    const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
316
317    /// Generate the OCI runtime CLI arguments from the provided parameters.
318    pub fn create_args(
319        self,
320        bundle_path: &Path,
321        global_args: Reader,
322        command_args: Reader,
323    ) -> Result<Vec<String>> {
324        let mut args = self.default_args().context("build default runtime args")?;
325
326        if let Some(rr) = self.config.runtime_root() {
327            args.push(format!("--root={}", rr.display()));
328        }
329
330        if self.cgroup_manager == CgroupManager::Systemd {
331            args.push(Self::SYSTEMD_CGROUP_ARG.into());
332        }
333
334        for arg in global_args {
335            args.push(arg?.to_string()?);
336        }
337
338        args.extend([
339            "create".to_string(),
340            "--bundle".to_string(),
341            bundle_path.display().to_string(),
342            "--pid-file".to_string(),
343            self.pidfile.display().to_string(),
344        ]);
345
346        for arg in command_args {
347            args.push(arg?.to_string()?);
348        }
349
350        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
351            args.push(format!("--console-socket={}", terminal.path().display()));
352        }
353
354        args.push(self.id.into());
355
356        debug!("Runtime args {:?}", args.join(" "));
357        Ok(args)
358    }
359
360    /// Generate the OCI runtime CLI arguments from the provided parameters.
361    pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
362        let mut args = self.default_args().context("build default runtime args")?;
363
364        args.push("exec".to_string());
365        args.push("-d".to_string());
366
367        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
368            args.push(format!("--console-socket={}", terminal.path().display()));
369            args.push("--tty".to_string());
370        }
371
372        args.push(format!("--pid-file={}", self.pidfile.display()));
373        args.push(self.id.into());
374
375        for arg in command {
376            args.push(arg?.to_string()?);
377        }
378
379        debug!("Exec args {:?}", args.join(" "));
380        Ok(args)
381    }
382
383    /// Build the default arguments for any provided runtime.
384    fn default_args(&self) -> Result<Vec<String>> {
385        let mut args = vec![];
386
387        if self
388            .config
389            .runtime()
390            .file_name()
391            .context("no filename in path")?
392            == Self::RUNTIME_CRUN
393        {
394            debug!("Found crun used as runtime");
395            args.push(format!("--log=journald:{}", self.id));
396
397            match self.config.log_level() {
398                &LogLevel::Debug | &LogLevel::Error => args.push(format!(
399                    "{}={}",
400                    Self::LOG_LEVEL_FLAG_CRUN,
401                    self.config.log_level()
402                )),
403                &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
404                _ => {}
405            }
406        }
407
408        if let Some(rr) = self.config.runtime_root() {
409            args.push(format!("--root={}", rr.display()));
410        }
411
412        if self.cgroup_manager == CgroupManager::Systemd {
413            args.push(Self::SYSTEMD_CGROUP_ARG.into());
414        }
415
416        Ok(args)
417    }
418}