1#![deny(missing_docs)]
2
3use crate::{
4 child_reaper::ChildReaper,
5 config::{Commands, Config, LogDriver, LogLevel, Verbosity},
6 container_io::{ContainerIO, ContainerIOType},
7 fd_socket::FdSocket,
8 init::{DefaultInit, Init},
9 journal::Journal,
10 listener::{DefaultListener, Listener},
11 pause::Pause,
12 streaming_server::StreamingServer,
13 telemetry::Telemetry,
14 version::Version,
15};
16use anyhow::{Context, Result, format_err};
17use capnp::text_list::Reader;
18use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty};
19use clap::crate_name;
20use conmon_common::conmon_capnp::conmon::{self, CgroupManager};
21use futures::{AsyncReadExt, FutureExt};
22use getset::Getters;
23use nix::{
24 errno::Errno,
25 libc::_exit,
26 sys::signal::Signal,
27 unistd::{ForkResult, fork},
28};
29use opentelemetry::trace::{FutureExt as OpenTelemetryFutureExt, TracerProvider};
30use opentelemetry_sdk::trace::SdkTracerProvider;
31use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
32use tokio::{
33 fs,
34 runtime::{Builder, Handle},
35 signal::unix::{SignalKind, signal},
36 sync::{RwLock, oneshot},
37 task::{self, LocalSet},
38};
39use tokio_util::compat::TokioAsyncReadCompatExt;
40use tracing::{Instrument, debug, debug_span, info};
41use tracing_opentelemetry::OpenTelemetrySpanExt;
42use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, prelude::*};
43use twoparty::VatNetwork;
44
45#[derive(Debug, Getters)]
46pub struct Server {
48 #[getset(get = "pub(crate)")]
50 config: Config,
51
52 #[getset(get = "pub(crate)")]
54 reaper: Arc<ChildReaper>,
55
56 #[getset(get = "pub(crate)")]
58 fd_socket: Arc<FdSocket>,
59
60 #[getset(get = "pub(crate)")]
62 tracer: Option<SdkTracerProvider>,
63
64 #[getset(get = "pub(crate)")]
66 streaming_server: Arc<RwLock<StreamingServer>>,
67}
68
69impl Server {
70 pub fn new() -> Result<Self> {
72 let server = Self {
73 config: Default::default(),
74 reaper: Default::default(),
75 fd_socket: Default::default(),
76 tracer: Default::default(),
77 streaming_server: Default::default(),
78 };
79
80 if let Some(v) = server.config().version() {
81 Version::new(v == Verbosity::Full).print();
82 process::exit(0);
83 }
84
85 if let Some(v) = server.config().version_json() {
86 Version::new(v == Verbosity::Full).print_json()?;
87 process::exit(0);
88 }
89
90 if let Some(Commands::Pause {
91 base_path,
92 pod_id,
93 ipc,
94 pid,
95 net,
96 user,
97 uts,
98 uid_mappings,
99 gid_mappings,
100 }) = server.config().command()
101 {
102 Pause::run(
103 base_path,
104 pod_id,
105 *ipc,
106 *pid,
107 *net,
108 *user,
109 *uts,
110 uid_mappings,
111 gid_mappings,
112 )
113 .context("run pause")?;
114 process::exit(0);
115 }
116
117 server.config().validate().context("validate config")?;
118
119 Self::init().context("init self")?;
120 Ok(server)
121 }
122
123 pub fn start(self) -> Result<()> {
125 if !self.config().skip_fork() {
131 match unsafe { fork()? } {
132 ForkResult::Parent { child, .. } => {
133 let child_str = format!("{child}");
134 File::create(self.config().conmon_pidfile())?
135 .write_all(child_str.as_bytes())?;
136 unsafe { _exit(0) };
137 }
138 ForkResult::Child => (),
139 }
140 }
141
142 prctl::set_child_subreaper(true)
144 .map_err(Errno::from_raw)
145 .context("set child subreaper")?;
146
147 let tracer = self.tracer().clone();
148
149 let rt = Builder::new_multi_thread().enable_all().build()?;
150 rt.block_on(self.spawn_tasks())?;
151
152 if let Some(tracer) = tracer {
153 tracer.shutdown().context("shutdown tracer")?;
154 }
155
156 rt.shutdown_background();
157 Ok(())
158 }
159
160 fn init() -> Result<()> {
161 let init = Init::<DefaultInit>::default();
162 init.unset_locale()?;
163 init.set_default_umask();
164 init.set_oom_score("-1000")
167 }
168
169 fn init_logging(&mut self) -> Result<()> {
170 let level = LevelFilter::from_str(self.config().log_level().as_ref())
171 .context("convert log level filter")?;
172
173 let telemetry_layer = if self.config().enable_tracing() {
174 let tracer = Telemetry::layer(self.config().tracing_endpoint())
175 .context("build telemetry layer")?;
176
177 self.tracer = Some(tracer.clone());
178
179 tracing_opentelemetry::layer()
180 .with_tracer(tracer.tracer(crate_name!()))
181 .into()
182 } else {
183 None
184 };
185
186 let registry = tracing_subscriber::registry().with(telemetry_layer);
187
188 match self.config().log_driver() {
189 LogDriver::None => {}
190 LogDriver::Stdout => {
191 let layer = tracing_subscriber::fmt::layer()
192 .with_target(true)
193 .with_line_number(true)
194 .with_filter(level);
195 registry
196 .with(layer)
197 .try_init()
198 .context("init stdout fmt layer")?;
199 info!("Using stdout logger");
200 }
201 LogDriver::Systemd => {
202 let layer = tracing_subscriber::fmt::layer()
203 .with_target(true)
204 .with_line_number(true)
205 .without_time()
206 .with_writer(Journal)
207 .with_filter(level);
208 registry
209 .with(layer)
210 .try_init()
211 .context("init journald fmt layer")?;
212 info!("Using systemd/journald logger");
213 }
214 }
215 info!("Set log level to: {}", self.config().log_level());
216 Ok(())
217 }
218
219 async fn spawn_tasks(mut self) -> Result<()> {
221 self.init_logging().context("init logging")?;
222
223 let (shutdown_tx, shutdown_rx) = oneshot::channel();
224 let socket = self.config().socket();
225 let fd_socket = self.config().fd_socket();
226 let reaper = self.reaper.clone();
227
228 let signal_handler_span = debug_span!("signal_handler");
229 task::spawn(
230 Self::start_signal_handler(reaper, socket, fd_socket, shutdown_tx)
231 .with_context(signal_handler_span.context())
232 .instrument(signal_handler_span),
233 );
234
235 let backend_span = debug_span!("backend");
236 task::spawn_blocking(move || {
237 Handle::current().block_on(
238 LocalSet::new()
239 .run_until(self.start_backend(shutdown_rx))
240 .with_context(backend_span.context())
241 .instrument(backend_span),
242 )
243 })
244 .await?
245 }
246
247 async fn start_signal_handler<T: AsRef<Path>>(
248 reaper: Arc<ChildReaper>,
249 socket: T,
250 fd_socket: T,
251 shutdown_tx: oneshot::Sender<()>,
252 ) -> Result<()> {
253 let mut sigterm = signal(SignalKind::terminate())?;
254 let mut sigint = signal(SignalKind::interrupt())?;
255 let handled_sig: Signal;
256
257 tokio::select! {
258 _ = sigterm.recv() => {
259 info!("Received SIGTERM");
260 handled_sig = Signal::SIGTERM;
261 }
262 _ = sigint.recv() => {
263 info!("Received SIGINT");
264 handled_sig = Signal::SIGINT;
265 }
266 }
267
268 if let Some(pause) = Pause::maybe_shared() {
269 pause.stop();
270 }
271
272 debug!("Starting grandchildren cleanup task");
273 reaper
274 .kill_grandchildren(handled_sig)
275 .context("unable to kill grandchildren")?;
276
277 debug!("Sending shutdown message");
278 shutdown_tx
279 .send(())
280 .map_err(|_| format_err!("unable to send shutdown message"))?;
281
282 debug!("Removing socket file {}", socket.as_ref().display());
283 fs::remove_file(socket)
284 .await
285 .context("remove existing socket file")?;
286
287 debug!("Removing fd socket file {}", fd_socket.as_ref().display());
288 fs::remove_file(fd_socket)
289 .await
290 .or_else(|err| {
291 if err.kind() == std::io::ErrorKind::NotFound {
292 Ok(())
293 } else {
294 Err(err)
295 }
296 })
297 .context("remove existing fd socket file")
298 }
299
300 async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
301 let listener =
302 Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
303 let client: conmon::Client = capnp_rpc::new_client(self);
304
305 loop {
306 let stream = tokio::select! {
307 _ = &mut shutdown_rx => {
308 debug!("Received shutdown message");
309 return Ok(())
310 }
311 stream = listener.accept() => {
312 stream?.0
313 },
314 };
315 let (reader, writer) = TokioAsyncReadCompatExt::compat(stream).split();
316 let network = Box::new(VatNetwork::new(
317 reader,
318 writer,
319 Side::Server,
320 Default::default(),
321 ));
322 let rpc_system = RpcSystem::new(network, Some(client.clone().client));
323 task::spawn_local(Box::pin(rpc_system.map(|_| ())));
324 }
325 }
326}
327
328pub(crate) struct GenerateRuntimeArgs<'a> {
329 pub(crate) config: &'a Config,
330 pub(crate) id: &'a str,
331 pub(crate) container_io: &'a ContainerIO,
332 pub(crate) pidfile: &'a Path,
333 pub(crate) cgroup_manager: CgroupManager,
334}
335
336impl GenerateRuntimeArgs<'_> {
337 const SYSTEMD_CGROUP_ARG: &'static str = "--systemd-cgroup";
338 const RUNTIME_CRUN: &'static str = "crun";
339 const LOG_LEVEL_FLAG_CRUN: &'static str = "--log-level";
340
341 pub fn create_args(
343 self,
344 bundle_path: &Path,
345 global_args: Reader,
346 command_args: Reader,
347 ) -> Result<Vec<String>> {
348 let mut args = self.default_args().context("build default runtime args")?;
349
350 if let Some(rr) = self.config.runtime_root() {
351 args.push(format!("--root={}", rr.display()));
352 }
353
354 if self.cgroup_manager == CgroupManager::Systemd {
355 args.push(Self::SYSTEMD_CGROUP_ARG.into());
356 }
357
358 for arg in global_args {
359 args.push(arg?.to_string()?);
360 }
361
362 args.extend([
363 "create".to_string(),
364 "--bundle".to_string(),
365 bundle_path.display().to_string(),
366 "--pid-file".to_string(),
367 self.pidfile.display().to_string(),
368 ]);
369
370 for arg in command_args {
371 args.push(arg?.to_string()?);
372 }
373
374 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
375 args.push(format!("--console-socket={}", terminal.path().display()));
376 }
377
378 args.push(self.id.into());
379
380 debug!("Runtime args {:?}", args.join(" "));
381 Ok(args)
382 }
383
384 pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
386 let mut args = self
387 .exec_sync_args_without_command()
388 .context("exec sync args without command")?;
389
390 for arg in command {
391 args.push(arg?.to_string()?);
392 }
393
394 debug!("Exec args {:?}", args.join(" "));
395 Ok(args)
396 }
397
398 pub(crate) fn exec_sync_args_without_command(&self) -> Result<Vec<String>> {
399 let mut args = self.default_args().context("build default runtime args")?;
400
401 if let Some(rr) = self.config.runtime_root() {
402 args.push(format!("--root={}", rr.display()));
403 }
404
405 if self.cgroup_manager == CgroupManager::Systemd {
406 args.push(Self::SYSTEMD_CGROUP_ARG.into());
407 }
408
409 args.push("exec".to_string());
410 args.push("-d".to_string());
411
412 if let ContainerIOType::Terminal(terminal) = self.container_io.typ() {
413 args.push(format!("--console-socket={}", terminal.path().display()));
414 args.push("--tty".to_string());
415 }
416
417 args.push(format!("--pid-file={}", self.pidfile.display()));
418 args.push(self.id.into());
419
420 Ok(args)
421 }
422
423 fn default_args(&self) -> Result<Vec<String>> {
425 let mut args = vec![];
426
427 if self
428 .config
429 .runtime()
430 .file_name()
431 .context("no filename in path")?
432 == Self::RUNTIME_CRUN
433 {
434 debug!("Found crun used as runtime");
435 args.push(format!("--log=journald:{}", self.id));
436
437 match self.config.log_level() {
438 &LogLevel::Debug | &LogLevel::Error => args.push(format!(
439 "{}={}",
440 Self::LOG_LEVEL_FLAG_CRUN,
441 self.config.log_level()
442 )),
443 &LogLevel::Warn => args.push(format!("{}=warning", Self::LOG_LEVEL_FLAG_CRUN)),
444 _ => {}
445 }
446 }
447
448 if let Some(rr) = self.config.runtime_root() {
449 args.push(format!("--root={}", rr.display()));
450 }
451
452 if self.cgroup_manager == CgroupManager::Systemd {
453 args.push(Self::SYSTEMD_CGROUP_ARG.into());
454 }
455
456 Ok(args)
457 }
458}