#![feature(unrestricted_attribute_tokens)] #![feature(custom_attribute)] #![feature(try_from)] mod filter; mod message; mod monitor; mod monitors; mod output; mod outputs; extern crate getopts; extern crate globset; extern crate log; extern crate serde_yaml; extern crate simplelog; use getopts::Options; use serde::Deserialize; use std::collections::HashMap; use std::env; use std::fs::File; use std::io::prelude::*; use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use crate::filter::*; use crate::message::*; #[derive(Debug, Deserialize)] pub struct AppConfig { pub monitors: Vec>, pub outputs: Vec>, } fn main() { let args: Vec = env::args().collect(); // Parse command line args let mut opts = Options::new(); opts.optopt( "c", "config", "Path to config.yaml. Default: ./config.yaml", "config.yaml", ); opts.optflag("h", "help", "print this help menu"); opts.optflag("v", "verbose", "Switch to DEBUG output"); opts.optflag("q", "quiet", "Switch to WARNING output"); let matches = match opts.parse(&args[1..]) { Ok(m) => m, Err(f) => panic!(f.to_string()), }; if matches.opt_present("h") { let brief = format!("Usage: {} [options]", args[0]); print!("{}", opts.usage(&brief)); return; } let level; if matches.opt_present("v") && matches.opt_present("q") { panic!("-v and -q are mutually exclusive"); } if matches.opt_present("v") { level = simplelog::LevelFilter::Debug; } else if matches.opt_present("q") { level = simplelog::LevelFilter::Warn; } else { level = simplelog::LevelFilter::Info; } simplelog::TermLogger::init( level, simplelog::Config { time: None, level: Some(simplelog::Level::Error), target: Some(simplelog::Level::Info), location: Some(simplelog::Level::Debug), time_format: None, }, ) .unwrap(); // Parse config let config_path = matches.opt_str("c").unwrap_or("./config.yaml".to_owned()); let mut config_file = File::open(config_path).expect("Could not open configuration file"); let mut config_content = String::new(); config_file .read_to_string(&mut config_content) .expect("Could not read configuration file"); let config: AppConfig = serde_yaml::from_str(&config_content).expect("Invalid config file content"); let thread_count = config.monitors.len() + config.outputs.len(); let barrier = Arc::new(Barrier::new(thread_count + 1)); // Monitors -> dispatcher channel let (mon_sender, mon_receiver) = mpsc::channel(); // Instantiate monitor threads and structs let mut mon_threads = vec![]; for mon_node in config.monitors { let mon_type = mon_node .get("type") .expect("Missing `type` key for monitor") .as_str() .expect("Key `type` for monitor is not a string") .to_owned(); let mon_config = mon_node .get("config") .unwrap_or(&serde_yaml::Mapping::new().into()) .clone(); let snd = mon_sender.clone(); let bar = barrier.clone(); mon_threads.push(thread::spawn(move || { log::info!("+> monitor: {}", mon_type); match monitors::factory(&mon_type, mon_config) { Ok(mut mon) => { bar.wait(); loop { mon.run(&snd); } } Err(e) => log::error!("Cannot instantiate monitor {}: {}", mon_type, e), } })); } // Instantiate output threads and structs let mut output_senders: Vec> = vec![]; let mut output_threads = vec![]; for out_node in config.outputs { let out_type = out_node .get("type") .expect("Missing `type` key for output") .as_str() .expect("Key `type` for output is not a string") .to_owned(); let out_config = out_node .get("config") .unwrap_or(&serde_yaml::Mapping::new().into()) .clone(); let out_filter = out_node .get("filter") .unwrap_or(&serde_yaml::Mapping::new().into()) .clone(); let (out_sender, out_receiver) = mpsc::channel(); output_senders.push(out_sender); let filter = Filter::new(&out_filter); let bar = barrier.clone(); output_threads.push(thread::spawn(move || { log::info!("+> output: {}", out_type); match outputs::factory(&out_type, out_config) { Ok(mut output) => { bar.wait(); loop { let message = out_receiver.recv().unwrap(); if filter.is_message_allowed(&message) { output.process_message(message); } } } Err(e) => log::error!("Cannot instantiate output {}: {}", out_type, e), } })); } let output_senders = output_senders; // Start all the threads barrier.wait(); log::info!("Started rnetmon"); for t in &mon_threads { t.thread().unpark(); } for t in &output_threads { t.thread().unpark(); } // Dispatch messages loop { let message = mon_receiver.recv().unwrap(); for out in &output_senders { out.send(message.clone()).unwrap(); } } }