1#![deny(missing_docs)]
2
3use crate::{
4 child_reaper::ChildReaper,
5 config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6 container_io::{ContainerIO, ContainerIOType},
7 fd_socket::FdSocket,
8 init::{DefaultInit, Init},
9 journal::Journal,
10 listener::{DefaultListener, Listener},
11 pause::Pause,
12 telemetry::Telemetry,
13 version::Version,
14};
15use anyhow::{format_err, Context, Result};
16use capnp::text_list::Reader;
17use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem};
18use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
19use futures::{AsyncReadExt, FutureExt};
20use getset::Getters;
21use nix::{
22 errno,
23 libc::_exit,
24 sys::signal::Signal,
25 unistd::{fork, ForkResult},
26};
27use opentelemetry::trace::FutureExt as OpenTelemetryFutureExt;
28use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
29use tokio::{
30 fs,
31 runtime::{Builder, Handle},
32 signal::unix::{signal, SignalKind},
33 sync::oneshot,
34 task::{self, LocalSet},
35};
36use tokio_util::compat::TokioAsyncReadCompatExt;
37use tracing::{debug, debug_span, info, Instrument};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
40use twoparty::VatNetwork;
41
42#[derive(Debug, Getters)]
43pub struct Server {
45 #[getset(get = "pub(crate)")]
47 config: Config,
48
49 #[getset(get = "pub(crate)")]
51 reaper: Arc<ChildReaper>,
52
53 #[getset(get = "pub(crate)")]
55 fd_socket: Arc<FdSocket>,
56}
57
58impl Server {
59 pub fn new() -> Result<Self> {
61 let server = Self {
62 config: Default::default(),
63 reaper: Default::default(),
64 fd_socket: Default::default(),
65 };
66
67 if let Some(v) = server.config().version() {
68 Version::new(v == Verbosity::Full).print();
69 process::exit(0);
70 }
71
72 if let Some(Commands::Pause {
73 base_path,
74 pod_id,
75 ipc,
76 pid,
77 net,
78 user,
79 uts,
80 uid_mappings,
81 gid_mappings,
82 }) = server.config().command()
83 {
84 Pause::run(
85 base_path,
86 pod_id,
87 *ipc,
88 *pid,
89 *net,
90 *user,
91 *uts,
92 uid_mappings,
93 gid_mappings,
94 )
95 .context("run pause")?;
96 process::exit(0);
97 }
98
99 server.config().validate().context("validate config")?;
100
101 Self::init().context("init self")?;
102 Ok(server)
103 }
104
105 pub fn start(self) -> Result<()> {
107 if !self.config().skip_fork() {
113 match unsafe { fork()? } {
114 ForkResult::Parent { child, .. } => {
115 let child_str = format!("{child}");
116 File::create(self.config().conmon_pidfile())?
117 .write_all(child_str.as_bytes())?;
118 unsafe { _exit(0) };
119 }
120 ForkResult::Child => (),
121 }
122 }
123
124 prctl::set_child_subreaper(true)
126 .map_err(errno::from_i32)
127 .context("set child subreaper")?;
128
129 let enable_tracing = self.config().enable_tracing();
130
131 let rt = Builder::new_multi_thread().enable_all().build()?;
132 rt.block_on(self.spawn_tasks())?;
133
134 if enable_tracing {
135 Telemetry::shutdown();
136 }
137
138 rt.shutdown_background();
139 Ok(())
140 }
141
142 fn init() -> Result<()> {
143 let init = Init::<DefaultInit>::default();
144 init.unset_locale()?;
145 init.set_default_umask();
146 init.set_oom_score("-1000")
149 }
150
151 fn init_logging(&self) -> Result<()> {
152 let level = LevelFilter::from_str(self.config().log_level().as_ref())
153 .context("convert log level filter")?;
154
155 let telemetry_layer = if self.config().enable_tracing() {
156 Telemetry::layer(self.config().tracing_endpoint())
157 .context("build telemetry layer")?
158 .into()
159 } else {
160 None
161 };
162
163 let registry = tracing_subscriber::registry().with(telemetry_layer);
164
165 match self.config().log_driver() {
166 LogDriver::Stdout => {
167 let layer = tracing_subscriber::fmt::layer()
168 .with_target(true)
169 .with_line_number(true)
170 .with_filter(level);
171 registry
172 .with(layer)
173 .try_init()
174 .context("init stdout fmt layer")?;
175 info!("Using stdout logger");
176 }
177 LogDriver::Systemd => {
178 let layer = tracing_subscriber::fmt::layer()
179 .with_target(true)
180 .with_line_number(true)
181 .without_time()
182 .with_writer(Journal)
183 .with_filter(level);
184 registry
185 .with(layer)
186 .try_init()
187 .context("init journald fmt layer")?;
188 info!("Using systemd/journald logger");
189 }
190 }
191 info!("Set log level to: {}", self.config().log_level());
192 Ok(())
193 }
194
195 async fn spawn_tasks(self) -> Result<()> {
197 self.init_logging().context("init logging")?;
198
199 let (shutdown_tx, shutdown_rx) = oneshot::channel();
200 let socket = self.config().socket();
201 let fd_socket = self.config().fd_socket();
202 let reaper = self.reaper.clone();
203
204 let signal_handler_span = debug_span!("signal_handler");
205 task::spawn(
206 Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
207 .with_context(signal_handler_span.context())
208 .instrument(signal_handler_span),
209 );
210
211 let backend_span = debug_span!("backend");
212 task::spawn_blocking(move || {
213 Handle::current().block_on(
214 LocalSet::new()
215 .run_until(self.start_backend(shutdown_rx))
216 .with_context(backend_span.context())
217 .instrument(backend_span),
218 )
219 })
220 .await?
221 }
222
223 async fn start_signal_handler<T: AsRef<Path>>(
224 reaper: Arc<ChildReaper>,
225 socket: T,
226 fd_socket: T,
227 shutdown_tx: oneshot::Sender<()>,
228 ) -> Result<()> {
229 let mut sigterm = signal(SignalKind::terminate())?;
230 let mut sigint = signal(SignalKind::interrupt())?;
231 let handled_sig: Signal;
232
233 tokio::select! {
234 _ = sigterm.recv() => {
235 info!("Received SIGTERM");
236 handled_sig = Signal::SIGTERM;
237 }
238 _ = sigint.recv() => {
239 info!("Received SIGINT");
240 handled_sig = Signal::SIGINT;
241 }
242 }
243
244 if let Some(pause) = Pause::maybe_shared() {
245 pause.stop();
246 }
247
248 debug!("Starting grandchildren cleanup task");
249 reaper
250 .kill_grandchildren(handled_sig)
251 .context("unable to kill grandchildren")?;
252
253 debug!("Sending shutdown message");
254 shutdown_tx
255 .send(())
256 .map_err(|_| format_err!("unable to send shutdown message"))?;
257
258 debug!("Removing socket file {}", socket.as_ref().display());
259 fs::remove_file(socket)
260 .await
261 .context("remove existing socket file")?;
262
263 debug!("Removing fd socket file {}", fd_socket.as_ref().display());
264 fs::remove_file(fd_socket)
265 .await
266 .or_else(|err| {
267 if err.kind() == std::io::ErrorKind::NotFound {
268 Ok(())
269 } else {
270 Err(err)
271 }
272 })
273 .context("remove existing fd socket file")
274 }
275
276 async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
277 let listener =
278 Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
279 let client: conmon::Client = capnp_rpc::new_client(self);
280
281 loop {
282 let stream = tokio::select! {
283 _ = &mut shutdown_rx => {
284 debug!("Received shutdown message");
285 return Ok(())
286 }
287 stream = listener.accept() => {
288 stream?.0
289 },
290 };
291 let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
292 let network = Box::new(VatNetwork::new(
293 reader,
294 writer,
295 Side::Server,
296 Default::default(),
297 ));
298 let rpc_system = RpcSystem::new(network, Some(client.clone().client));
299 task::spawn_local(Box::pin(rpc_system.map(|_| ())));
300 }
301 }
302}
303
304pub(crate) struct GenerateRuntimeArgs<'a> {
305 pub(crate) config: &'a Config,
306 pub(crate) id: &'a str,
307 pub(crate) container_io: &'a ContainerIO,
308 pub(crate) pidfile: &'a Path,
309 pub(crate) cgroup_manager: CgroupManager,
310}
311
312impl GenerateRuntimeArgs<'_> {
313 const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
314 const RUNTIME_CRUN: &'static str = "crun";
315 const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
316
317 pub fn create_args(
319 self,
320 bundle_path: &Path,
321 global_args: Reader,
322 command_args: Reader,
323 ) -> Result<Vec<String>> {
324 let mut args = self.default_args().context("build default runtime args")?;
325
326 if let Some(rr) = self.config.runtime_root() {
327 args.push(format!("--root={}", rr.display()));
328 }
329
330 if self.cgroup_manager == CgroupManager::Systemd {
331 args.push(Self::SYSTEMD_CGROUP_ARG.into());
332 }
333
334 for arg in global_args {
335 args.push(arg?.to_string()?);
336 }
337
338 args.extend([
339 "create".to_string(),
340 "--bundle".to_string(),
341 bundle_path.display().to_string(),
342 "--pid-file".to_string(),
343 self.pidfile.display().to_string(),
344 ]);
345
346 for arg in command_args {
347 args.push(arg?.to_string()?);
348 }
349
350 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
351 args.push(format!("--console-socket={}", terminal.path().display()));
352 }
353
354 args.push(self.id.into());
355
356 debug!("Runtime args {:?}", args.join(" "));
357 Ok(args)
358 }
359
360 pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
362 let mut args = self.default_args().context("build default runtime args")?;
363
364 args.push("exec".to_string());
365 args.push("-d".to_string());
366
367 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
368 args.push(format!("--console-socket={}", terminal.path().display()));
369 args.push("--tty".to_string());
370 }
371
372 args.push(format!("--pid-file={}", self.pidfile.display()));
373 args.push(self.id.into());
374
375 for arg in command {
376 args.push(arg?.to_string()?);
377 }
378
379 debug!("Exec args {:?}", args.join(" "));
380 Ok(args)
381 }
382
383 fn default_args(&self) -> Result<Vec<String>> {
385 let mut args = vec![];
386
387 if self
388 .config
389 .runtime()
390 .file_name()
391 .context("no filename in path")?
392 == Self::RUNTIME_CRUN
393 {
394 debug!("Found crun used as runtime");
395 args.push(format!("--log=journald:{}", self.id));
396
397 match self.config.log_level() {
398 &LogLevel::Debug | &LogLevel::Error => args.push(format!(
399 "{}={}",
400 Self::LOG_LEVEL_FLAG_CRUN,
401 self.config.log_level()
402 )),
403 &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
404 _ => {}
405 }
406 }
407
408 if let Some(rr) = self.config.runtime_root() {
409 args.push(format!("--root={}", rr.display()));
410 }
411
412 if self.cgroup_manager == CgroupManager::Systemd {
413 args.push(Self::SYSTEMD_CGROUP_ARG.into());
414 }
415
416 Ok(args)
417 }
418}