conmonrs/
server.rs

1#![deny(missing_docs)]
2
3use crate::{
4    child_reaper::ChildReaper,
5    config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6    container_io::{ContainerIO, ContainerIOType},
7    fd_socket::FdSocket,
8    init::{DefaultInit, Init},
9    journal::Journal,
10    listener::{DefaultListener, Listener},
11    pause::Pause,
12    streaming_server::StreamingServer,
13    telemetry::Telemetry,
14    version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use getset::Getters;
23use nix::{
24    errno::Errno,
25    libc::_exit,
26    sys::signal::Signal,
27    unistd::{ForkResult, fork},
28};
29use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
30use opentelemetry_sdk::trace::SdkTracerProvider;
31use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
32use tokio::{
33    fs,
34    runtime::{Builder, Handle},
35    signal::unix::{SignalKind, signal},
36    sync::{RwLock, oneshot},
37    task::{self, LocalSet},
38};
39use tokio_util::compat::TokioAsyncReadCompatExt;
40use tracing::{Instrument, debug, debug_span, info};
41use tracing_opentelemetry::OpenTelemetrySpanExt;
42use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
43use twoparty::VatNetwork;
44
45#[derive(Debug, Getters)]
46/// The main server structure.
47pub struct Server {
48    /// Server configuration.
49    #[getset(get = "pub(crate)")]
50    config: Config,
51
52    /// Child reaper instance.
53    #[getset(get = "pub(crate)")]
54    reaper: Arc<ChildReaper>,
55
56    /// Fd socket instance.
57    #[getset(get = "pub(crate)")]
58    fd_socket: Arc<FdSocket>,
59
60    /// OpenTelemetry tracer instance.
61    #[getset(get = "pub(crate)")]
62    tracer: Option<SdkTracerProvider>,
63
64    /// Streaming server instance.
65    #[getset(get = "pub(crate)")]
66    streaming_server: Arc<RwLock<StreamingServer>>,
67}
68
69impl Server {
70    /// Create a new `Server` instance.
71    pub fn new() -> Result<Self> {
72        let server = Self {
73            config: Default::default(),
74            reaper: Default::default(),
75            fd_socket: Default::default(),
76            tracer: Default::default(),
77            streaming_server: Default::default(),
78        };
79
80        if let Some(v) = server.config().version() {
81            Version::new(v == Verbosity::Full).print();
82            process::exit(0);
83        }
84
85        if let Some(Commands::Pause {
86            base_path,
87            pod_id,
88            ipc,
89            pid,
90            net,
91            user,
92            uts,
93            uid_mappings,
94            gid_mappings,
95        }) = server.config().command()
96        {
97            Pause::run(
98                base_path,
99                pod_id,
100                *ipc,
101                *pid,
102                *net,
103                *user,
104                *uts,
105                uid_mappings,
106                gid_mappings,
107            )
108            .context("run pause")?;
109            process::exit(0);
110        }
111
112        server.config().validate().context("validate config")?;
113
114        Self::init().context("init self")?;
115        Ok(server)
116    }
117
118    /// Start the `Server` instance and consume it.
119    pub fn start(self) -> Result<()> {
120        // We need to fork as early as possible, especially before setting up tokio.
121        // If we don't, the child will have a strange thread space and we're at risk of deadlocking.
122        // We also have to treat the parent as the child (as described in [1]) to ensure we don't
123        // interrupt the child's execution.
124        // 1: https://docs.rs/nix/0.23.0/nix/unistd/fn.fork.html#safety
125        if !self.config().skip_fork() {
126            match unsafe { fork()? } {
127                ForkResult::Parent { child, .. } => {
128                    let child_str = format!("{child}");
129                    File::create(self.config().conmon_pidfile())?
130                        .write_all(child_str.as_bytes())?;
131                    unsafe { _exit(0) };
132                }
133                ForkResult::Child => (),
134            }
135        }
136
137        // now that we've forked, set self to childreaper
138        prctl::set_child_subreaper(true)
139            .map_err(Errno::from_raw)
140            .context("set child subreaper")?;
141
142        let tracer = self.tracer().clone();
143
144        let rt = Builder::new_multi_thread().enable_all().build()?;
145        rt.block_on(self.spawn_tasks())?;
146
147        if let Some(tracer) = tracer {
148            tracer.shutdown().context("shutdown tracer")?;
149        }
150
151        rt.shutdown_background();
152        Ok(())
153    }
154
155    fn init() -> Result<()> {
156        let init = Init::<DefaultInit>::default();
157        init.unset_locale()?;
158        init.set_default_umask();
159        // While we could configure this, standard practice has it as -1000,
160        // so it may be YAGNI to add configuration.
161        init.set_oom_score("-1000")
162    }
163
164    fn init_logging(&mut self) -> Result<()> {
165        let level = LevelFilter::from_str(self.config().log_level().as_ref())
166            .context("convert log level filter")?;
167
168        let telemetry_layer = if self.config().enable_tracing() {
169            let tracer = Telemetry::layer(self.config().tracing_endpoint())
170                .context("build telemetry layer")?;
171
172            self.tracer = Some(tracer.clone());
173
174            tracing_opentelemetry::layer()
175                .with_tracer(tracer.tracer(crate_name!()))
176                .into()
177        } else {
178            None
179        };
180
181        let registry = tracing_subscriber::registry().with(telemetry_layer);
182
183        match self.config().log_driver() {
184            LogDriver::Stdout => {
185                let layer = tracing_subscriber::fmt::layer()
186                    .with_target(true)
187                    .with_line_number(true)
188                    .with_filter(level);
189                registry
190                    .with(layer)
191                    .try_init()
192                    .context("init stdout fmt layer")?;
193                info!("Using stdout logger");
194            }
195            LogDriver::Systemd => {
196                let layer = tracing_subscriber::fmt::layer()
197                    .with_target(true)
198                    .with_line_number(true)
199                    .without_time()
200                    .with_writer(Journal)
201                    .with_filter(level);
202                registry
203                    .with(layer)
204                    .try_init()
205                    .context("init journald fmt layer")?;
206                info!("Using systemd/journald logger");
207            }
208        }
209        info!("Set log level to: {}", self.config().log_level());
210        Ok(())
211    }
212
213    /// Spawns all required tokio tasks.
214    async fn spawn_tasks(mut self) -> Result<()> {
215        self.init_logging().context("init logging")?;
216
217        let (shutdown_tx, shutdown_rx) = oneshot::channel();
218        let socket = self.config().socket();
219        let fd_socket = self.config().fd_socket();
220        let reaper = self.reaper.clone();
221
222        let signal_handler_span = debug_span!("signal_handler");
223        task::spawn(
224            Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
225                .with_context(signal_handler_span.context())
226                .instrument(signal_handler_span),
227        );
228
229        let backend_span = debug_span!("backend");
230        task::spawn_blocking(move || {
231            Handle::current().block_on(
232                LocalSet::new()
233                    .run_until(self.start_backend(shutdown_rx))
234                    .with_context(backend_span.context())
235                    .instrument(backend_span),
236            )
237        })
238        .await?
239    }
240
241    async fn start_signal_handler<T: AsRef<Path>>(
242        reaper: Arc<ChildReaper>,
243        socket: T,
244        fd_socket: T,
245        shutdown_tx: oneshot::Sender<()>,
246    ) -> Result<()> {
247        let mut sigterm = signal(SignalKind::terminate())?;
248        let mut sigint = signal(SignalKind::interrupt())?;
249        let handled_sig: Signal;
250
251        tokio::select! {
252            _ = sigterm.recv() => {
253                info!("Received SIGTERM");
254                handled_sig = Signal::SIGTERM;
255            }
256            _ = sigint.recv() => {
257                info!("Received SIGINT");
258                handled_sig = Signal::SIGINT;
259            }
260        }
261
262        if let Some(pause) = Pause::maybe_shared() {
263            pause.stop();
264        }
265
266        debug!("Starting grandchildren cleanup task");
267        reaper
268            .kill_grandchildren(handled_sig)
269            .context("unable to kill grandchildren")?;
270
271        debug!("Sending shutdown message");
272        shutdown_tx
273            .send(())
274            .map_err(|_| format_err!("unable to send shutdown message"))?;
275
276        debug!("Removing socket file {}", socket.as_ref().display());
277        fs::remove_file(socket)
278            .await
279            .context("remove existing socket file")?;
280
281        debug!("Removing fd socket file {}", fd_socket.as_ref().display());
282        fs::remove_file(fd_socket)
283            .await
284            .or_else(|err| {
285                if err.kind() == std::io::ErrorKind::NotFound {
286                    Ok(())
287                } else {
288                    Err(err)
289                }
290            })
291            .context("remove existing fd socket file")
292    }
293
294    async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
295        let listener =
296            Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
297        let client: conmon::Client = capnp_rpc::new_client(self);
298
299        loop {
300            let stream = tokio::select! {
301                _ = &mut shutdown_rx => {
302                    debug!("Received shutdown message");
303                    return Ok(())
304                }
305                stream = listener.accept() => {
306                    stream?.0
307                },
308            };
309            let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
310            let network = Box::new(VatNetwork::new(
311                reader,
312                writer,
313                Side::Server,
314                Default::default(),
315            ));
316            let rpc_system = RpcSystem::new(network, Some(client.clone().client));
317            task::spawn_local(Box::pin(rpc_system.map(|_| ())));
318        }
319    }
320}
321
322pub(crate) struct GenerateRuntimeArgs<'a> {
323    pub(crate) config: &'a Config,
324    pub(crate) id: &'a str,
325    pub(crate) container_io: &'a ContainerIO,
326    pub(crate) pidfile: &'a Path,
327    pub(crate) cgroup_manager: CgroupManager,
328}
329
330impl GenerateRuntimeArgs<'_> {
331    const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
332    const RUNTIME_CRUN: &'static str = "crun";
333    const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
334
335    /// Generate the OCI runtime CLI arguments from the provided parameters.
336    pub fn create_args(
337        self,
338        bundle_path: &Path,
339        global_args: Reader,
340        command_args: Reader,
341    ) -> Result<Vec<String>> {
342        let mut args = self.default_args().context("build default runtime args")?;
343
344        if let Some(rr) = self.config.runtime_root() {
345            args.push(format!("--root={}", rr.display()));
346        }
347
348        if self.cgroup_manager == CgroupManager::Systemd {
349            args.push(Self::SYSTEMD_CGROUP_ARG.into());
350        }
351
352        for arg in global_args {
353            args.push(arg?.to_string()?);
354        }
355
356        args.extend([
357            "create".to_string(),
358            "--bundle".to_string(),
359            bundle_path.display().to_string(),
360            "--pid-file".to_string(),
361            self.pidfile.display().to_string(),
362        ]);
363
364        for arg in command_args {
365            args.push(arg?.to_string()?);
366        }
367
368        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
369            args.push(format!("--console-socket={}", terminal.path().display()));
370        }
371
372        args.push(self.id.into());
373
374        debug!("Runtime args {:?}", args.join(" "));
375        Ok(args)
376    }
377
378    /// Generate the OCI runtime CLI arguments from the provided parameters.
379    pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
380        let mut args = self
381            .exec_sync_args_without_command()
382            .context("exec sync args without command")?;
383
384        for arg in command {
385            args.push(arg?.to_string()?);
386        }
387
388        debug!("Exec args {:?}", args.join(" "));
389        Ok(args)
390    }
391
392    pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
393        let mut args = self.default_args().context("build default runtime args")?;
394
395        if let Some(rr) = self.config.runtime_root() {
396            args.push(format!("--root={}", rr.display()));
397        }
398
399        if self.cgroup_manager == CgroupManager::Systemd {
400            args.push(Self::SYSTEMD_CGROUP_ARG.into());
401        }
402
403        args.push("exec".to_string());
404        args.push("-d".to_string());
405
406        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
407            args.push(format!("--console-socket={}", terminal.path().display()));
408            args.push("--tty".to_string());
409        }
410
411        args.push(format!("--pid-file={}", self.pidfile.display()));
412        args.push(self.id.into());
413
414        Ok(args)
415    }
416
417    /// Build the default arguments for any provided runtime.
418    fn default_args(&self) -> Result<Vec<String>> {
419        let mut args = vec![];
420
421        if self
422            .config
423            .runtime()
424            .file_name()
425            .context("no filename in path")?
426            == Self::RUNTIME_CRUN
427        {
428            debug!("Found crun used as runtime");
429            args.push(format!("--log=journald:{}", self.id));
430
431            match self.config.log_level() {
432                &LogLevel::Debug | &LogLevel::Error => args.push(format!(
433                    "{}={}",
434                    Self::LOG_LEVEL_FLAG_CRUN,
435                    self.config.log_level()
436                )),
437                &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
438                _ => {}
439            }
440        }
441
442        if let Some(rr) = self.config.runtime_root() {
443            args.push(format!("--root={}", rr.display()));
444        }
445
446        if self.cgroup_manager == CgroupManager::Systemd {
447            args.push(Self::SYSTEMD_CGROUP_ARG.into());
448        }
449
450        Ok(args)
451    }
452}