conmonrs/
server.rs

1#![deny(missing_docs)]
2
3use crate::{
4    child_reaper::ChildReaper,
5    config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6    container_io::{ContainerIO, ContainerIOType},
7    fd_socket::FdSocket,
8    init::{DefaultInit, Init},
9    journal::Journal,
10    listener::{DefaultListener, Listener},
11    pause::Pause,
12    streaming_server::StreamingServer,
13    telemetry::Telemetry,
14    version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use getset::Getters;
23use nix::{
24    errno::Errno,
25    libc::_exit,
26    sys::signal::Signal,
27    unistd::{ForkResult, fork},
28};
29use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
30use opentelemetry_sdk::trace::SdkTracerProvider;
31use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
32use tokio::{
33    fs,
34    runtime::{Builder, Handle},
35    signal::unix::{SignalKind, signal},
36    sync::{RwLock, oneshot},
37    task::{self, LocalSet},
38};
39use tokio_util::compat::TokioAsyncReadCompatExt;
40use tracing::{Instrument, debug, debug_span, info};
41use tracing_opentelemetry::OpenTelemetrySpanExt;
42use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
43use twoparty::VatNetwork;
44
45#[derive(Debug, Getters)]
46/// The main server structure.
47pub struct Server {
48    /// Server configuration.
49    #[getset(get = "pub(crate)")]
50    config: Config,
51
52    /// Child reaper instance.
53    #[getset(get = "pub(crate)")]
54    reaper: Arc<ChildReaper>,
55
56    /// Fd socket instance.
57    #[getset(get = "pub(crate)")]
58    fd_socket: Arc<FdSocket>,
59
60    /// OpenTelemetry tracer instance.
61    #[getset(get = "pub(crate)")]
62    tracer: Option<SdkTracerProvider>,
63
64    /// Streaming server instance.
65    #[getset(get = "pub(crate)")]
66    streaming_server: Arc<RwLock<StreamingServer>>,
67}
68
69impl Server {
70    /// Create a new `Server` instance.
71    pub fn new() -> Result<Self> {
72        let server = Self {
73            config: Default::default(),
74            reaper: Default::default(),
75            fd_socket: Default::default(),
76            tracer: Default::default(),
77            streaming_server: Default::default(),
78        };
79
80        if let Some(v) = server.config().version() {
81            Version::new(v == Verbosity::Full).print();
82            process::exit(0);
83        }
84
85        if let Some(v) = server.config().version_json() {
86            Version::new(v == Verbosity::Full).print_json()?;
87            process::exit(0);
88        }
89
90        if let Some(Commands::Pause {
91            base_path,
92            pod_id,
93            ipc,
94            pid,
95            net,
96            user,
97            uts,
98            uid_mappings,
99            gid_mappings,
100        }) = server.config().command()
101        {
102            Pause::run(
103                base_path,
104                pod_id,
105                *ipc,
106                *pid,
107                *net,
108                *user,
109                *uts,
110                uid_mappings,
111                gid_mappings,
112            )
113            .context("run pause")?;
114            process::exit(0);
115        }
116
117        server.config().validate().context("validate config")?;
118
119        Self::init().context("init self")?;
120        Ok(server)
121    }
122
123    /// Start the `Server` instance and consume it.
124    pub fn start(self) -> Result<()> {
125        // We need to fork as early as possible, especially before setting up tokio.
126        // If we don't, the child will have a strange thread space and we're at risk of deadlocking.
127        // We also have to treat the parent as the child (as described in [1]) to ensure we don't
128        // interrupt the child's execution.
129        // 1: https://docs.rs/nix/0.23.0/nix/unistd/fn.fork.html#safety
130        if !self.config().skip_fork() {
131            match unsafe { fork()? } {
132                ForkResult::Parent { child, .. } => {
133                    let child_str = format!("{child}");
134                    File::create(self.config().conmon_pidfile())?
135                        .write_all(child_str.as_bytes())?;
136                    unsafe { _exit(0) };
137                }
138                ForkResult::Child => (),
139            }
140        }
141
142        // now that we've forked, set self to childreaper
143        prctl::set_child_subreaper(true)
144            .map_err(Errno::from_raw)
145            .context("set child subreaper")?;
146
147        let tracer = self.tracer().clone();
148
149        let rt = Builder::new_multi_thread().enable_all().build()?;
150        rt.block_on(self.spawn_tasks())?;
151
152        if let Some(tracer) = tracer {
153            tracer.shutdown().context("shutdown tracer")?;
154        }
155
156        rt.shutdown_background();
157        Ok(())
158    }
159
160    fn init() -> Result<()> {
161        let init = Init::<DefaultInit>::default();
162        init.unset_locale()?;
163        init.set_default_umask();
164        // While we could configure this, standard practice has it as -1000,
165        // so it may be YAGNI to add configuration.
166        init.set_oom_score("-1000")
167    }
168
169    fn init_logging(&mut self) -> Result<()> {
170        let level = LevelFilter::from_str(self.config().log_level().as_ref())
171            .context("convert log level filter")?;
172
173        let telemetry_layer = if self.config().enable_tracing() {
174            let tracer = Telemetry::layer(self.config().tracing_endpoint())
175                .context("build telemetry layer")?;
176
177            self.tracer = Some(tracer.clone());
178
179            tracing_opentelemetry::layer()
180                .with_tracer(tracer.tracer(crate_name!()))
181                .into()
182        } else {
183            None
184        };
185
186        let registry = tracing_subscriber::registry().with(telemetry_layer);
187
188        match self.config().log_driver() {
189            LogDriver::None => {}
190            LogDriver::Stdout => {
191                let layer = tracing_subscriber::fmt::layer()
192                    .with_target(true)
193                    .with_line_number(true)
194                    .with_filter(level);
195                registry
196                    .with(layer)
197                    .try_init()
198                    .context("init stdout fmt layer")?;
199                info!("Using stdout logger");
200            }
201            LogDriver::Systemd => {
202                let layer = tracing_subscriber::fmt::layer()
203                    .with_target(true)
204                    .with_line_number(true)
205                    .without_time()
206                    .with_writer(Journal)
207                    .with_filter(level);
208                registry
209                    .with(layer)
210                    .try_init()
211                    .context("init journald fmt layer")?;
212                info!("Using systemd/journald logger");
213            }
214        }
215        info!("Set log level to: {}", self.config().log_level());
216        Ok(())
217    }
218
219    /// Spawns all required tokio tasks.
220    async fn spawn_tasks(mut self) -> Result<()> {
221        self.init_logging().context("init logging")?;
222
223        let (shutdown_tx, shutdown_rx) = oneshot::channel();
224        let socket = self.config().socket();
225        let fd_socket = self.config().fd_socket();
226        let reaper = self.reaper.clone();
227
228        let signal_handler_span = debug_span!("signal_handler");
229        task::spawn(
230            Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
231                .with_context(signal_handler_span.context())
232                .instrument(signal_handler_span),
233        );
234
235        let backend_span = debug_span!("backend");
236        task::spawn_blocking(move || {
237            Handle::current().block_on(
238                LocalSet::new()
239                    .run_until(self.start_backend(shutdown_rx))
240                    .with_context(backend_span.context())
241                    .instrument(backend_span),
242            )
243        })
244        .await?
245    }
246
247    async fn start_signal_handler<T: AsRef<Path>>(
248        reaper: Arc<ChildReaper>,
249        socket: T,
250        fd_socket: T,
251        shutdown_tx: oneshot::Sender<()>,
252    ) -> Result<()> {
253        let mut sigterm = signal(SignalKind::terminate())?;
254        let mut sigint = signal(SignalKind::interrupt())?;
255        let handled_sig: Signal;
256
257        tokio::select! {
258            _ = sigterm.recv() => {
259                info!("Received SIGTERM");
260                handled_sig = Signal::SIGTERM;
261            }
262            _ = sigint.recv() => {
263                info!("Received SIGINT");
264                handled_sig = Signal::SIGINT;
265            }
266        }
267
268        if let Some(pause) = Pause::maybe_shared() {
269            pause.stop();
270        }
271
272        debug!("Starting grandchildren cleanup task");
273        reaper
274            .kill_grandchildren(handled_sig)
275            .context("unable to kill grandchildren")?;
276
277        debug!("Sending shutdown message");
278        shutdown_tx
279            .send(())
280            .map_err(|_| format_err!("unable to send shutdown message"))?;
281
282        debug!("Removing socket file {}", socket.as_ref().display());
283        fs::remove_file(socket)
284            .await
285            .context("remove existing socket file")?;
286
287        debug!("Removing fd socket file {}", fd_socket.as_ref().display());
288        fs::remove_file(fd_socket)
289            .await
290            .or_else(|err| {
291                if err.kind() == std::io::ErrorKind::NotFound {
292                    Ok(())
293                } else {
294                    Err(err)
295                }
296            })
297            .context("remove existing fd socket file")
298    }
299
300    async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
301        let listener =
302            Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
303        let client: conmon::Client = capnp_rpc::new_client(self);
304
305        loop {
306            let stream = tokio::select! {
307                _ = &mut shutdown_rx => {
308                    debug!("Received shutdown message");
309                    return Ok(())
310                }
311                stream = listener.accept() => {
312                    stream?.0
313                },
314            };
315            let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
316            let network = Box::new(VatNetwork::new(
317                reader,
318                writer,
319                Side::Server,
320                Default::default(),
321            ));
322            let rpc_system = RpcSystem::new(network, Some(client.clone().client));
323            task::spawn_local(Box::pin(rpc_system.map(|_| ())));
324        }
325    }
326}
327
328pub(crate) struct GenerateRuntimeArgs<'a> {
329    pub(crate) config: &'a Config,
330    pub(crate) id: &'a str,
331    pub(crate) container_io: &'a ContainerIO,
332    pub(crate) pidfile: &'a Path,
333    pub(crate) cgroup_manager: CgroupManager,
334}
335
336impl GenerateRuntimeArgs<'_> {
337    const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
338    const RUNTIME_CRUN: &'static str = "crun";
339    const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
340
341    /// Generate the OCI runtime CLI arguments from the provided parameters.
342    pub fn create_args(
343        self,
344        bundle_path: &Path,
345        global_args: Reader,
346        command_args: Reader,
347    ) -> Result<Vec<String>> {
348        let mut args = self.default_args().context("build default runtime args")?;
349
350        if let Some(rr) = self.config.runtime_root() {
351            args.push(format!("--root={}", rr.display()));
352        }
353
354        if self.cgroup_manager == CgroupManager::Systemd {
355            args.push(Self::SYSTEMD_CGROUP_ARG.into());
356        }
357
358        for arg in global_args {
359            args.push(arg?.to_string()?);
360        }
361
362        args.extend([
363            "create".to_string(),
364            "--bundle".to_string(),
365            bundle_path.display().to_string(),
366            "--pid-file".to_string(),
367            self.pidfile.display().to_string(),
368        ]);
369
370        for arg in command_args {
371            args.push(arg?.to_string()?);
372        }
373
374        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
375            args.push(format!("--console-socket={}", terminal.path().display()));
376        }
377
378        args.push(self.id.into());
379
380        debug!("Runtime args {:?}", args.join(" "));
381        Ok(args)
382    }
383
384    /// Generate the OCI runtime CLI arguments from the provided parameters.
385    pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
386        let mut args = self
387            .exec_sync_args_without_command()
388            .context("exec sync args without command")?;
389
390        for arg in command {
391            args.push(arg?.to_string()?);
392        }
393
394        debug!("Exec args {:?}", args.join(" "));
395        Ok(args)
396    }
397
398    pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
399        let mut args = self.default_args().context("build default runtime args")?;
400
401        if let Some(rr) = self.config.runtime_root() {
402            args.push(format!("--root={}", rr.display()));
403        }
404
405        if self.cgroup_manager == CgroupManager::Systemd {
406            args.push(Self::SYSTEMD_CGROUP_ARG.into());
407        }
408
409        args.push("exec".to_string());
410        args.push("-d".to_string());
411
412        if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
413            args.push(format!("--console-socket={}", terminal.path().display()));
414            args.push("--tty".to_string());
415        }
416
417        args.push(format!("--pid-file={}", self.pidfile.display()));
418        args.push(self.id.into());
419
420        Ok(args)
421    }
422
423    /// Build the default arguments for any provided runtime.
424    fn default_args(&self) -> Result<Vec<String>> {
425        let mut args = vec![];
426
427        if self
428            .config
429            .runtime()
430            .file_name()
431            .context("no filename in path")?
432            == Self::RUNTIME_CRUN
433        {
434            debug!("Found crun used as runtime");
435            args.push(format!("--log=journald:{}", self.id));
436
437            match self.config.log_level() {
438                &LogLevel::Debug | &LogLevel::Error => args.push(format!(
439                    "{}={}",
440                    Self::LOG_LEVEL_FLAG_CRUN,
441                    self.config.log_level()
442                )),
443                &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
444                _ => {}
445            }
446        }
447
448        if let Some(rr) = self.config.runtime_root() {
449            args.push(format!("--root={}", rr.display()));
450        }
451
452        if self.cgroup_manager == CgroupManager::Systemd {
453            args.push(Self::SYSTEMD_CGROUP_ARG.into());
454        }
455
456        Ok(args)
457    }
458}