194 lines
5.6 KiB
Rust
194 lines
5.6 KiB
Rust
#![feature(register_tool)]
|
|
#![feature(register_attr)]
|
|
|
|
mod filter;
|
|
mod message;
|
|
mod monitor;
|
|
mod monitors;
|
|
mod output;
|
|
mod outputs;
|
|
|
|
extern crate getopts;
|
|
extern crate globset;
|
|
extern crate log;
|
|
extern crate serde_yaml;
|
|
extern crate simplelog;
|
|
|
|
use getopts::Options;
|
|
use serde::Deserialize;
|
|
use std::collections::HashMap;
|
|
use std::env;
|
|
use std::fs::File;
|
|
use std::io::prelude::*;
|
|
use std::sync::mpsc;
|
|
use std::sync::{Arc, Barrier};
|
|
use std::thread;
|
|
|
|
use crate::filter::*;
|
|
use crate::message::*;
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct AppConfig {
|
|
pub monitors: Vec<HashMap<String, serde_yaml::Value>>,
|
|
pub outputs: Vec<HashMap<String, serde_yaml::Value>>,
|
|
}
|
|
|
|
fn main() {
|
|
let args: Vec<String> = env::args().collect();
|
|
|
|
// Parse command line args
|
|
let mut opts = Options::new();
|
|
opts.optopt(
|
|
"c",
|
|
"config",
|
|
"Path to config.yaml. Default: ./config.yaml",
|
|
"config.yaml",
|
|
);
|
|
opts.optflag("h", "help", "print this help menu");
|
|
opts.optflag("v", "verbose", "Switch to DEBUG output");
|
|
opts.optflag("q", "quiet", "Switch to WARNING output");
|
|
|
|
let matches = match opts.parse(&args[1..]) {
|
|
Ok(m) => m,
|
|
Err(f) => panic!(f.to_string()),
|
|
};
|
|
if matches.opt_present("h") {
|
|
let brief = format!("Usage: {} [options]", args[0]);
|
|
print!("{}", opts.usage(&brief));
|
|
return;
|
|
}
|
|
|
|
let level;
|
|
if matches.opt_present("v") && matches.opt_present("q") {
|
|
panic!("-v and -q are mutually exclusive");
|
|
}
|
|
if matches.opt_present("v") {
|
|
level = simplelog::LevelFilter::Debug;
|
|
} else if matches.opt_present("q") {
|
|
level = simplelog::LevelFilter::Warn;
|
|
} else {
|
|
level = simplelog::LevelFilter::Info;
|
|
}
|
|
|
|
simplelog::TermLogger::init(
|
|
level,
|
|
simplelog::Config {
|
|
time: None,
|
|
level: Some(simplelog::Level::Error),
|
|
target: Some(simplelog::Level::Info),
|
|
location: Some(simplelog::Level::Debug),
|
|
time_format: None,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
// Parse config
|
|
let config_path = matches.opt_str("c").unwrap_or("./config.yaml".to_owned());
|
|
|
|
let mut config_file = File::open(config_path).expect("Could not open configuration file");
|
|
let mut config_content = String::new();
|
|
config_file
|
|
.read_to_string(&mut config_content)
|
|
.expect("Could not read configuration file");
|
|
let config: AppConfig =
|
|
serde_yaml::from_str(&config_content).expect("Invalid config file content");
|
|
|
|
let thread_count = config.monitors.len() + config.outputs.len();
|
|
let barrier = Arc::new(Barrier::new(thread_count + 1));
|
|
|
|
// Monitors -> dispatcher channel
|
|
let (mon_sender, mon_receiver) = mpsc::channel();
|
|
|
|
// Instantiate monitor threads and structs
|
|
let mut mon_threads = vec![];
|
|
for mon_node in config.monitors {
|
|
let mon_type = mon_node
|
|
.get("type")
|
|
.expect("Missing `type` key for monitor")
|
|
.as_str()
|
|
.expect("Key `type` for monitor is not a string")
|
|
.to_owned();
|
|
let mon_config = mon_node
|
|
.get("config")
|
|
.unwrap_or(&serde_yaml::Mapping::new().into())
|
|
.clone();
|
|
|
|
let snd = mon_sender.clone();
|
|
let bar = barrier.clone();
|
|
|
|
mon_threads.push(thread::spawn(move || {
|
|
log::info!("+> monitor: {}", mon_type);
|
|
match monitors::factory(&mon_type, mon_config) {
|
|
Ok(mut mon) => {
|
|
bar.wait();
|
|
loop {
|
|
mon.run(&snd);
|
|
}
|
|
}
|
|
Err(e) => log::error!("Cannot instantiate monitor {}: {}", mon_type, e),
|
|
}
|
|
}));
|
|
}
|
|
|
|
// Instantiate output threads and structs
|
|
let mut output_senders: Vec<mpsc::Sender<Message>> = vec![];
|
|
let mut output_threads = vec![];
|
|
for out_node in config.outputs {
|
|
let out_type = out_node
|
|
.get("type")
|
|
.expect("Missing `type` key for output")
|
|
.as_str()
|
|
.expect("Key `type` for output is not a string")
|
|
.to_owned();
|
|
let out_config = out_node
|
|
.get("config")
|
|
.unwrap_or(&serde_yaml::Mapping::new().into())
|
|
.clone();
|
|
let out_filter = out_node
|
|
.get("filter")
|
|
.unwrap_or(&serde_yaml::Mapping::new().into())
|
|
.clone();
|
|
|
|
let (out_sender, out_receiver) = mpsc::channel();
|
|
output_senders.push(out_sender);
|
|
|
|
let filter = Filter::new(&out_filter);
|
|
let bar = barrier.clone();
|
|
|
|
output_threads.push(thread::spawn(move || {
|
|
log::info!("+> output: {}", out_type);
|
|
match outputs::factory(&out_type, out_config) {
|
|
Ok(mut output) => {
|
|
bar.wait();
|
|
loop {
|
|
let message = out_receiver.recv().unwrap();
|
|
if filter.is_message_allowed(&message) {
|
|
output.process_message(message);
|
|
}
|
|
}
|
|
}
|
|
Err(e) => log::error!("Cannot instantiate output {}: {}", out_type, e),
|
|
}
|
|
}));
|
|
}
|
|
let output_senders = output_senders;
|
|
|
|
// Start all the threads
|
|
barrier.wait();
|
|
log::info!("Started rnetmon");
|
|
for t in &mon_threads {
|
|
t.thread().unpark();
|
|
}
|
|
for t in &output_threads {
|
|
t.thread().unpark();
|
|
}
|
|
|
|
// Dispatch messages
|
|
loop {
|
|
let message = mon_receiver.recv().unwrap();
|
|
for out in &output_senders {
|
|
out.send(message.clone()).unwrap();
|
|
}
|
|
}
|
|
}
|