diff --git a/src/monitor.rs b/src/monitor.rs index dbd3334..61f32b6 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -1,316 +1,2 @@ -use std::{fs}; //,io}; -use std::fs::OpenOptions; -use std::path::Path; -use std::io::{Write}; -use std::str; -use anyhow::{anyhow,Result}; -// use tokio::runtime::Handle; -use std::process::{Command}; - -use app_env::{ - appenv::{AppEnv}, - DataStore, - applogs::AppLogs, - // config::{Config} -}; -use crate::clouds::defs::{ - Cloud, -}; - -use crate::defs::{ProviderName}; - -use crate::clouds::monitor_rules::{ - MonitorRules, - MonitorRule, - RuleContext, - RuleSchedule, - RuleOperator, - MonitorAction, -}; - -pub fn write_str_data(output_path: String, data_str: String, msg: String) -> Result<()> { - if Path::new(&output_path).exists() { - fs::remove_file(&output_path)?; - } - let mut file = OpenOptions::new().write(true).create(true).open(&output_path)?; - file.write_all(data_str.as_bytes())?; - if ! msg.is_empty() { - let now = chrono::Utc::now().timestamp(); - let out = format!("{}: [{}] -> {}\n",&now,&msg,&output_path); - println!("{}",&out); - } - Ok(()) -} -pub async fn on_action_server(task: &str, srvr: &str, provider: &str, args: &str, _rule: &MonitorRule) -> Result<()> { - let provider_name = ProviderName::set_provider(provider.to_owned()); - match provider_name { - ProviderName::upcloud => { - let cmd = format!("upclapi -c {}server -id {} {}",&task,&srvr,&args); - println!("action {}: {} on {} {}",&task,&srvr,&provider_name,&args); - let res = run_command(&cmd).unwrap_or_else(|e|{ - eprintln!("Error {}: {}",&cmd,e); - String::from("") - }); - println!("{}",&res); - Ok(()) - }, - _ => Err(anyhow!("Provider {} not defined", &provider_name)) - } -} -pub async fn on_action_notify(chnl: &str, msg: &str, args: &str, rule: &MonitorRule) -> Result<()> { - let notificator_path = envmnt::get_or("NOTIFICATOR_BIN",""); - if notificator_path.is_empty() || ! Path::new(¬ificator_path).exists() { - return Err(anyhow!("Error notificator:")); - } - let message=format!("Monitor {}: {} {}",&rule.id,&msg,&args); - let cmd = format!("{} {} {}",¬ificator_path,&chnl,&message); - let _ = run_command(&cmd).unwrap_or_else(|e|{ - eprintln!("Error {}: {}",&cmd,e); - String::from("") - }); - Ok(()) -} -fn parse_target(target: &str, val: &str) -> String { - let srvr: String; - if val.contains("$target") { - srvr = val.replace("$target", &target.to_owned()); - } else if val.contains("$server") { - srvr = val.replace("$server", &target.to_owned()); - } else { - srvr = val.to_owned(); - } - srvr -} -pub async fn on_monitor_action(action: &MonitorAction, target: &str, rule: &MonitorRule) -> Result<()> { - match action { - MonitorAction::None => return Ok(()), - MonitorAction::Start(v,p,a) => - on_action_server("start",&parse_target(target, v),&p,&a,&rule).await?, - MonitorAction::Stop(v,p,a) => - on_action_server("stop",&parse_target(target, v),&p,&a,&rule).await?, - MonitorAction::Restart(v,p,a) => - on_action_server("restart",&parse_target(target, v),&p,&a,&rule).await?, - MonitorAction::Notify(v,p,a) => on_action_notify(&v,&parse_target(target,&p),&a,&rule).await?, - }; - Ok(()) -} -pub fn on_monitor_operator(operator: &RuleOperator, target: &str, value: &str) -> bool { - match operator { - RuleOperator::None => true, - RuleOperator::Equal => target == value, - RuleOperator::NotEqual=> target != value, - RuleOperator::Contains => target.contains(value), - RuleOperator::NotContains => !target.contains(value), - } -} -pub async fn on_server_rule(_srvr: &str, _out_cmd: &str, _rule: &MonitorRule) -> Result<()> { - Ok(()) -} -pub async fn on_service_rule(srvc: &str, out_cmd: &str, rule: &MonitorRule) -> Result<()> { - println!("Service: {} -> {}",&srvc,&out_cmd); - if !out_cmd.is_empty() { - let mut targets = out_cmd.split_whitespace(); - while let Some(target) = targets.next() { - // println!("target: {}",&target); - for action in &rule.actions { - on_monitor_action(action, &target, &rule).await?; - } - }; - } - Ok(()) -} -pub async fn on_ip_rule(_ip: &str, _out_cmd: &str, _rule: &MonitorRule) -> Result<()> { - - // tokio::spawn(async move {run_on_rule(rule.to_owned()).await; }); - Ok(()) -} -pub fn run_command(str_cmd: &str) -> Result { - let mut args: Vec<&str> = str_cmd.split(" ").collect(); - let cmd = args[0].to_owned(); - args.remove(0); - // println!("cmd: {}, args: {:?}",&cmd,&args); - let result = Command::new(&cmd) - .args(&args) - .output()?; - // dbg!(&result); - match &result.status.code() { - Some(code) => - if format!("{}",code) == "0" { - let mut res=String::from(str::from_utf8(&result.stdout).unwrap_or_else(|_| "")); - if res.ends_with('\n') { - res.pop(); - } - res = res.replace('\n'," "); - Ok(res) - } else { - let err = str::from_utf8(&result.stderr).unwrap_or_else(|_| ""); - Err(anyhow!("Running {}: failed:\n {}",&cmd,&err)) - } - None => Ok(String::from("")) - } -} -pub fn run_on_selector(rule: &MonitorRule) -> Result { - let ctx_val = rule.context.get_value(); - let target: Vec<&str> = ctx_val.split("/").collect(); - let srv_path = match target.len() { - 2 => String::from(target[1]), - _ => String::from(""), - }; - let mut cmd = match &rule.context { - RuleContext::Server(v) => String::from(v), - RuleContext::Service(v) => - if v.contains("kubernetes") { - String::from("kubectl") - } else { - String::from(v) - } - , - RuleContext::Ip(v) => String::from(v), - RuleContext::None => String::from(""), - }; - if let Some(ctxpath) = rule.selector.get("ctxpath") { - cmd = format!("{}{} {}",&ctxpath,&srv_path,&cmd); - } - if let Some(run) = rule.selector.get("run") { - cmd = format!("{} {}",&run,&cmd); - } - if let Some(typ) = rule.selector.get("type") { - cmd = format!("{} get {}",&cmd,&typ); - } - if let Some(ns) = rule.selector.get("ns") { - cmd = format!("{} -n {}",&cmd,&ns); - } - if let Some(custom) = rule.selector.get("custom") { - cmd = format!("{} {}",&cmd,&custom); - } - if let Some(pat) = rule.selector.get("match") { - cmd = format!("{} | grep {}",&cmd,&pat); - } - if let Some(awk) = rule.selector.get("awk") { - cmd = format!("{} | awk '{}'",&cmd,&awk); - } - if let Some(sort) = rule.selector.get("sort") { - cmd = format!("{} | sort {}",&cmd,&sort); - } - if cmd.len() > 0 { - run_command(&cmd) - } else { - Err(anyhow!("Selector: failed:\n {}",&rule.id)) - } -} -pub async fn log_monitor_state(rule: &MonitorRule, app_env: &AppEnv, env_name: &str, env_state: isize, msg: &str, out_cmd: &str) -> Result<()> { - let now = format!("{}", chrono::Utc::now().timestamp()); - let log_entry = AppLogs { - id: format!("{}_{}", &rule.id,&now), - name: format!("rule: {}", &rule.name), - when: String::from("run_monitor_rule"), - source: format!("{}", &env_name), - target: format!("{}", &out_cmd), - state: format!("{}", &env_state), - msg: format!("{}", &msg), - }; - if app_env.config.logs_store == DataStore::File { - log_entry.write_applogs_entry(format!("{}/monitor.log",&app_env.config.logs_path).as_str()).await?; - } - Ok(()) -} -pub async fn run_monitor_rule(idx: usize, rule: &MonitorRule, app_env: &AppEnv) -> Result<()> { - if envmnt::get_isize("DEBUG",0) > 0 { - println!("{} [{}] {}: {}",&idx,&rule.id,&rule.name,&rule.description); - } - let out_cmd: String; - if rule.command.is_empty() { - out_cmd=run_on_selector(&rule).unwrap_or_else(|e| { - eprintln!("Error on selector {}: {}",&rule.id,e); - String::from("?") - }); - } else { - out_cmd=run_command(&rule.command).unwrap_or_else(|e| { - eprintln!("Error on command {}: {}",&rule.command,e); - String::from("?") - }); - } - if out_cmd.as_str() == "?" { - return Ok(()); - } - let env_name = format!("MONITOR_{}",&rule.id); - let env_state = envmnt::get_isize(&env_name,0); - if ! on_monitor_operator(&rule.operator,&out_cmd, &rule.value) { - if env_state > 0 { - let msg = format!("{} Ok in state: {}",&env_name,&env_state); - println!("{}",&msg); - envmnt::set_isize(&env_name, 0); - log_monitor_state(&rule, &app_env, &env_name, env_state, &msg, &out_cmd).await?; - for item in rule.actions.iter() { - match item { - MonitorAction::Notify(v,p,a) => on_action_notify(&v,&parse_target(format!("Ok state from -> {}",&env_state).as_str(),&p),&a,&rule).await?, - _ => continue, - }; - } - } - return Ok(()); - } - if env_state > 0 && env_state < rule.wait_checks { - let msg = format!("{} is already set: {} increment counter",&env_name,&env_state); - envmnt::increment(&env_name); - println!("{}",&msg); - log_monitor_state(&rule, &app_env, &env_name, env_state, &msg, &out_cmd).await?; - return Ok(()); - } - let mut err = String::from(""); - match &rule.context { - RuleContext::Server(v) => { - on_server_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e| { - err = format!("Error rule {} on {}: {}",&rule.id,&v,e); - eprintln!("{}",err) - }); - }, - RuleContext::None => { - return Ok(()); - }, - RuleContext::Service(v) => { - on_service_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e| { - err = format!("Error rule {} on {}: {}",&rule.id,&v,e); - eprintln!("{}",err) - }); - }, - RuleContext::Ip(v) => { - on_ip_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e| { - err = format!("Error rule {} on {}: {}",&rule.id,&v,e); - eprintln!("{}",err) - }); - }, - } - let msg = match err.is_empty() { - true => format!("{}: 1 done",&env_name), - false => format!("{} error: {}",&env_name,&err), - }; - envmnt::set_isize(&env_name,1); - println!("{}",&msg); - log_monitor_state(&rule, &app_env, &env_name, env_state, &msg, &out_cmd).await?; - Ok(()) -} -pub async fn run_monitor(monitor_rules: MonitorRules, _cloud: Cloud, app_env: AppEnv) -> Result<()> { - // println!("Run {}: {}",&monitor_rules.name,&monitor_rules.description); - for (idx,rule) in monitor_rules.rules.iter().enumerate() { - match rule.schedule { - RuleSchedule::Check => { - // dbg!(&rule); - let mut err = String::from(""); - run_monitor_rule(idx,rule, &app_env).await.unwrap_or_else(|e| { - err = format!("Error rule {}: {}",&rule.id,e); - eprintln!("{}",&err) - }); - match err.is_empty() { - true => continue, - false => log_monitor_state(&rule, &app_env, &rule.name, -1, &err, "").await.unwrap_or_else(|e| - eprintln!("Unable to log monitor entry: {}",e) - ), - }; - }, - RuleSchedule::OnDemand => continue, - RuleSchedule::None => continue, - }; - } - Ok(()) -} \ No newline at end of file +pub mod utils; +pub mod defs; diff --git a/src/monitor/defs.rs b/src/monitor/defs.rs new file mode 100644 index 0000000..a422d2a --- /dev/null +++ b/src/monitor/defs.rs @@ -0,0 +1,578 @@ +use std::fmt; +use std::collections::HashMap; +use serde::{Serialize,Deserialize,Deserializer}; +use anyhow::{anyhow,Result}; +use std::path::Path; + +use app_env::{ + appenv::{AppEnv}, + DataStore, + applogs::AppLogs, + // config::{Config} +}; +use crate::defs::{ProviderName}; +use kloud::utils::load_fs_content; +use crate::clouds::defs::{ Cloud }; + +use crate::monitor::utils::{run_command,write_str_data}; + +fn parse_target(target: String, val: &str) -> String { + let srvr: String; + if val.contains("$target") { + srvr = val.replace("$target", &target); + } else if val.contains("$server") { + srvr = val.replace("$server", &target); + } else { + srvr = val.to_owned(); + } + srvr +} + +#[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] +pub enum RuleContext { + None, + Server(String), + Service(String), + Ip(String), +} +impl Default for RuleContext { + fn default() -> Self { + RuleContext::None + } +} +impl fmt::Display for RuleContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RuleContext::Server(c) => write!(f,"server: {}",c), + RuleContext::None => write!(f,"none"), + RuleContext::Service(c) => write!(f,"service: {}",c), + RuleContext::Ip(c) => write!(f,"ip: {}",c), + } + //write!(f, "{:?}", self) + } +} + +impl RuleContext { + pub fn get_value(&self) -> String { + match self { + RuleContext::Server(c) => format!("{}",c), + RuleContext::None => format!(""), + RuleContext::Service(c) => format!("{}",c), + RuleContext::Ip(c) => format!("{}",c), + } + } + +} + +#[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] +pub enum MonitorAction { + None, + Start(String,String,String), + Stop(String,String,String), + Restart(String,String,String), + Notify(String,String,String), +} +impl Default for MonitorAction { + fn default() -> Self { + MonitorAction::None + } +} +impl fmt::Display for MonitorAction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MonitorAction::None => write!(f,"none"), + MonitorAction::Start(v,p,a) => write!(f,"start: {} {} {}",v,p,a), + MonitorAction::Stop(v,p,a) => write!(f,"stop: {} {} {}",v,p,a), + MonitorAction::Restart(v,p,a) => write!(f,"restart: {} {} {}",v,p,a), + MonitorAction::Notify(v,p,a) => write!(f,"notify: {} {} {}",v,p,a), + } + //write!(f, "{:?}", self) + } +} + +impl MonitorAction { + pub fn get_value(&self) -> String { + match self { + MonitorAction::None => format!(""), + MonitorAction::Start(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Stop(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Restart(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Notify(v,p,a) => format!("{} {} {}",v,p,a), + } + } + pub fn is_notify(&self) -> bool { + match self { + MonitorAction::Notify(_,_,_) => true, + _ => false + } + } + pub async fn on(&self, target: String, rule: &MonitorRule) -> Result<()> { + match self { + MonitorAction::None => return Ok(()), + MonitorAction::Start(v,p,a) => + self.on_server("start",parse_target(target, v),p,a,&rule).await?, + MonitorAction::Stop(v,p,a) => + self.on_server("stop",parse_target(target, v),p,a,&rule).await?, + MonitorAction::Restart(v,p,a) => + self.on_server("restart",parse_target(target, v),p,a,&rule).await?, + MonitorAction::Notify(v,p,a) => + self.on_notify(&v,parse_target(target ,p),a,&rule).await?, + }; + Ok(()) + } + pub async fn on_server(&self, task: &str, srvr: String, provider: &str, args: &str, _rule: &MonitorRule) -> Result<()> { + let provider_name = ProviderName::set_provider(provider.to_owned()); + match provider_name { + ProviderName::upcloud => { + let cmd = format!("upclapi -c {}server -id {} {}",&task,&srvr,&args); + println!("action {}: {} on {} {}",&task,&srvr,&provider_name,&args); + let res = run_command(&cmd).unwrap_or_else(|e|{ + eprintln!("Error {}: {}",&cmd,e); + String::from("") + }); + println!("{}",&res); + Ok(()) + }, + _ => Err(anyhow!("Provider {} not defined", &provider_name)) + } + } + pub async fn on_notify(&self, chnl: &str, msg: String, args: &str, rule: &MonitorRule) -> Result<()> { + let notificator_path = envmnt::get_or("NOTIFICATOR_BIN",""); + if notificator_path.is_empty() || ! Path::new(¬ificator_path).exists() { + return Err(anyhow!("Error notificator:")); + } + let message=format!("Monitor {}: {} {}",&rule.id,&msg,&args); + let cmd = format!("{} {} {}",¬ificator_path,&chnl,&message); + let _ = run_command(&cmd).unwrap_or_else(|e|{ + eprintln!("Error {}: {}",&cmd,e); + String::from("") + }); + Ok(()) + } +} + +#[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] +pub enum RuleSchedule { + None, + Check, + OnDemand, +} + +impl Default for RuleSchedule { + fn default() -> Self { + RuleSchedule::None + } +} + +impl fmt::Display for RuleSchedule { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RuleSchedule::None => write!(f,"none"), + RuleSchedule::Check=> write!(f,"chechk"), + RuleSchedule::OnDemand=> write!(f,"ondemand"), + } + } +} + +#[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] +pub enum RuleOperator { + None, + Equal, + NotEqual, + Contains, + NotContains, +} + +impl Default for RuleOperator { + fn default() -> Self { + RuleOperator::None + } +} +impl RuleOperator { + pub fn on(&self, target: &str, value: &str) -> bool { + match self { + RuleOperator::None => true, + RuleOperator::Equal => target == value, + RuleOperator::NotEqual=> target != value, + RuleOperator::Contains => target.contains(value), + RuleOperator::NotContains => !target.contains(value), + } + } +} + +impl fmt::Display for RuleOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RuleOperator::None => write!(f,"none"), + RuleOperator::Equal => write!(f,"equal"), + RuleOperator::NotEqual=> write!(f,"not_equal"), + RuleOperator::Contains => write!(f,"matches"), + RuleOperator::NotContains => write!(f,"not_matches"), + } + } +} + +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorStates { + pub when: String, + pub id: String, + pub data: String, // to save JSON info +} +impl MonitorStates { + pub fn load(root: &str, path: &str, format: &str) -> Self { + println!("Load Montior States ..."); + let data_rules = load_fs_content(&root, &path, &format); + let res: MonitorStates = serde_json::from_str(&data_rules).unwrap_or_else(|e|{ + eprintln!("Error loading MonitorStates {}/{}.{}: {}",&root,&path,&format,e); + MonitorStates::default() + }); + res + } + pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { + println!("Dump Montior States ..."); + let data_str = serde_json::to_string(&self).unwrap_or_else(|e| { + eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + String::from("") + }); + if ! data_str.is_empty() { + println!("Dump MonitorStates to {}/{}.{}",&root,&path,&format); + let output_path = format!("{}/{}.{}",&root,&path,&format); + write_str_data(output_path, data_str, "Monitor states dump".to_string())?; + } + Ok(()) + } +} + +fn deserialize_rulecontext<'de, D>(deserializer: D) -> Result +where D: Deserializer<'de> { + let buf = String::deserialize(deserializer)?; + // let res = String::from_str(&buf).unwrap_or_else(|e|{ + // eprintln!("deserialize rulecontext error: {}",e); + // String::from("") + // }); + // println!("buf: {}",&buf); + let v: Vec<&str> = buf.split(':').collect(); + if v.len() > 1 { + Ok(match v[0] { + "Service" => RuleContext::Service(v[1].to_owned()), + "Server" => RuleContext::Server(v[1].to_owned()), + "Ip" => RuleContext::Ip(v[1].to_owned()), + _ => RuleContext::None + + }) + } else { + Ok(RuleContext::None) + } +} +fn deserialize_ruleactions<'de, D>(deserializer: D) -> Result, D::Error> +where D: Deserializer<'de> { + let buf = Vec::deserialize(deserializer)?; + let mut actions = Vec::new(); + buf.iter().for_each(|it: &String| { + let v: Vec<&str> = it.split(':').collect(); + if v.len() > 1 { + let arg: Vec<&str> = v[1].split(',').collect(); + if arg.len() > 1 { + actions.push(match v[0] { + "Start" => MonitorAction::Start(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Stop" => MonitorAction::Stop(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Restart" => MonitorAction::Restart(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Notify" => MonitorAction::Notify(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + _ => MonitorAction::None + }) + } + } + }); + Ok(actions) +} +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorRule { + pub id: String, + pub name: String, + pub description: String, + #[serde(deserialize_with = "deserialize_rulecontext")] + pub context: RuleContext, + pub command: String, + pub selector: HashMap, + pub operator: RuleOperator, + pub value: String, + pub result: bool, + pub use_in_state: bool, + pub wait_checks: isize, + pub schedule: RuleSchedule, + #[serde(deserialize_with = "deserialize_ruleactions")] + pub actions: Vec, +} + +impl MonitorRule { + pub fn on_selector(&self) -> Result { + let ctx_val = self.context.get_value(); + let target: Vec<&str> = ctx_val.split("/").collect(); + let srv_path = match target.len() { + 2 => String::from(target[1]), + _ => String::from(""), + }; + let mut cmd = match &self.context { + RuleContext::Server(v) => String::from(v), + RuleContext::Service(v) => + if v.contains("kubernetes") { + String::from("kubectl") + } else { + String::from(v) + } + , + RuleContext::Ip(v) => String::from(v), + RuleContext::None => String::from(""), + }; + if let Some(ctxpath) = self.selector.get("ctxpath") { + cmd = format!("{}{} {}",&ctxpath,&srv_path,&cmd); + } + if let Some(run) = self.selector.get("run") { + cmd = format!("{} {}",&run,&cmd); + } + if let Some(typ) = self.selector.get("type") { + cmd = format!("{} get {}",&cmd,&typ); + } + if let Some(ns) = self.selector.get("ns") { + cmd = format!("{} -n {}",&cmd,&ns); + } + if let Some(custom) = self.selector.get("custom") { + cmd = format!("{} {}",&cmd,&custom); + } + if let Some(pat) = self.selector.get("match") { + cmd = format!("{} | grep {}",&cmd,&pat); + } + if let Some(awk) = self.selector.get("awk") { + cmd = format!("{} | awk '{}'",&cmd,&awk); + } + if let Some(sort) = self.selector.get("sort") { + cmd = format!("{} | sort {}",&cmd,&sort); + } + if cmd.len() > 0 { + run_command(&cmd) + } else { + Err(anyhow!("Selector: failed:\n {}",&self.id)) + } + } + pub async fn log_state(&self, app_env: &AppEnv, env_name: &str, env_state: isize, msg: &str, out_cmd: &str) -> Result<()> { + let now = format!("{}", chrono::Utc::now().timestamp()); + let log_entry = AppLogs { + id: format!("{}_{}", self.id,&now), + name: format!("self: {}", self.name), + when: String::from("run_monitor_self"), + source: format!("{}", &env_name), + target: format!("{}", &out_cmd), + state: format!("{}", &env_state), + msg: format!("{}", &msg), + }; + if app_env.config.logs_store == DataStore::File { + log_entry.write_applogs_entry(format!("{}/monitor.log",&app_env.config.logs_path).as_str()).await?; + } + Ok(()) + } + pub async fn on_server(&self,_srvr: &str, _out_cmd: &str) -> Result<()> { + Ok(()) + } + pub async fn on_service(&self, srvc: &str, out_cmd: &str) -> Result<()> { + println!("Service: {} -> {}",&srvc,&out_cmd); + if !out_cmd.is_empty() { + let mut targets = out_cmd.split_whitespace(); + while let Some(target) = targets.next() { + // println!("target: {}",&target); + for action in &self.actions { + action.on(target.to_owned(), self).await?; + } + }; + } + Ok(()) + } + pub async fn on_ip(&self,_ip: &str, _out_cmd: &str) -> Result<()> { + + // tokio::spawn(async move {run_on_rule(rule.to_owned()).await; }); + Ok(()) + } + pub async fn on_context(&self, out_cmd: &str) -> String { + let target: String; + let res = match &self.context { + RuleContext::Server(v) => { + target = v.to_owned(); + self.on_server(&v, &out_cmd).await + }, + RuleContext::None => return String::from(""), + RuleContext::Service(v) => { + target = v.to_owned(); + self.on_service(&v, &out_cmd).await + }, + RuleContext::Ip(v) => { + target = v.to_owned(); + self.on_ip(&v, &out_cmd).await + }, + }; + match res { + Ok(_) => String::from(""), + Err(e) => { + format!("Error rule {} on {}: {}",&self.id,target,e) + } + } + } + pub async fn notify(&self, msg: String) { + for item in self.actions.iter() { + if item.is_notify() { + let _ = item.on(msg.to_owned(),&self).await; + } + } + } + pub async fn run(&self,monitor_rules: &MonitorRules, idx: usize, app_env: &AppEnv) -> Result<()> { + if envmnt::get_isize("DEBUG",0) > 0 { + println!("{} [{}] {}: {}",&idx,&self.id,&self.name,&self.description); + } + let out_cmd: String; + if self.command.is_empty() { + out_cmd=self.on_selector().unwrap_or_else(|e| { + eprintln!("Error on selector {}: {}",&self.id,e); + String::from("?") + }); + } else { + out_cmd=run_command(&self.command).unwrap_or_else(|e| { + eprintln!("Error on command {}: {}",&self.command,e); + String::from("?") + }); + } + if out_cmd.as_str() == "?" { + return Ok(()); + } + let monitor_name = format!("{}_MONITOR",&monitor_rules.name.to_uppercase().replace(" ","_")); + let monitor_name_value = envmnt::get_or(&monitor_name,""); + let env_name = format!("MONITOR_{}",&self.id); + let env_state = envmnt::get_isize(&env_name,0); + if ! &self.operator.on(&out_cmd, &self.value) { + if env_state > 0 { + let msg = format!("{} Ok in state: {}",&env_name,&env_state); + println!("{}",&msg); + envmnt::set_isize(&env_name, 0); + self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; + self.notify(format!("Ok state from -> {}",&env_state)).await; + if monitor_name_value == self.id { + envmnt::set(&monitor_name,""); + } + } + return Ok(()); + } + if ! monitor_name_value.is_empty() { + let msg = format!("Monitor {}: rule {} is active, waiting for {}",&monitor_rules.name,&monitor_name_value,&self.id); + println!("{}",&msg); + self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; + return Ok(()); + } + if env_state > 0 && env_state < self.wait_checks { + let msg = format!("{} is already set: {} increment counter",&env_name,&env_state); + envmnt::increment(&env_name); + println!("{}",&msg); + self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; + return Ok(()); + } + let result_rule_context = self.on_context(&out_cmd).await; + let msg = match result_rule_context.is_empty() { + true => + if self.operator == RuleOperator::None { + format!("{}: done",&env_name) + } else { + format!("{}: 1 done",&env_name) + }, + false => format!("{} {}",&env_name,&result_rule_context), + }; + if self.operator == RuleOperator::None { + envmnt::set(&monitor_name,""); + } else { + envmnt::set(&monitor_name,format!("{}",&self.id)); + envmnt::set_isize(&env_name,1); + } + println!("{}",&msg); + self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; + Ok(()) + } +} + +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorRules { + pub name: String, + pub description: String, + pub rules: Vec, +} +impl MonitorRules { + pub fn new(name: String, description: String, rules: Vec ) -> Self { + Self { + name, + description, + rules, + } + } + pub fn load(root: &str, path: &str, format: &str) -> Self { + let data_rules = load_fs_content(&root, &path, &format); + let err_load = |e|{ + eprintln!("Error loading MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + MonitorRules::default() + }; + let res: MonitorRules; + if data_rules.is_empty() { + res = MonitorRules::default(); + } else { + // println!("Parse Montior Rules as: {}",&format); + res = match format { + "json" => serde_json::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + "yaml" => serde_yaml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + "toml" => toml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + _ => MonitorRules::default(), + }; + } + res + } + pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { + let err_load = |e| { + eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + String::from("") + }; + let data_str = match format { + "json" => serde_json::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + "yaml" => serde_yaml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + "toml" => toml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + _ => String::from(""), + }; + if ! data_str.is_empty() { + println!("Dump loading MonitorRules to {}/{}.{}",&root,&path,&format); + let output_path = format!("{}/{}.{}",&root,&path,&format); + write_str_data(output_path, data_str, "Monitor rules dump".to_string())?; + } + Ok(()) + } + pub async fn run(&self, _cloud: Cloud, app_env: AppEnv) -> Result<()> { + // println!("Run {}: {}",&monitor_rules.name,&monitor_rules.description); + for (idx,rule) in self.rules.iter().enumerate() { + match rule.schedule { + RuleSchedule::Check => { + // dbg!(&rule); + let mut err = String::from(""); + rule.run(&self,idx,&app_env).await.unwrap_or_else(|e| { + err = format!("Error rule {}: {}",&rule.id,e); + eprintln!("{}",&err) + }); + match err.is_empty() { + true => continue, + false => rule.log_state(&app_env, &rule.name, -1, &err, "").await.unwrap_or_else(|e| + eprintln!("Unable to log monitor entry: {}",e) + ), + }; + }, + RuleSchedule::OnDemand => continue, + RuleSchedule::None => continue, + }; + } + Ok(()) + } +} diff --git a/src/monitor/utils.rs b/src/monitor/utils.rs new file mode 100644 index 0000000..16c88c9 --- /dev/null +++ b/src/monitor/utils.rs @@ -0,0 +1,47 @@ +use anyhow::{anyhow,Result}; +use std::{fs}; //,io}; +use std::fs::OpenOptions; +use std::path::Path; +use std::io::{Write}; +use std::process::{Command}; +use std::str; + +pub fn write_str_data(output_path: String, data_str: String, msg: String) -> Result<()> { + if Path::new(&output_path).exists() { + fs::remove_file(&output_path)?; + } + let mut file = OpenOptions::new().write(true).create(true).open(&output_path)?; + file.write_all(data_str.as_bytes())?; + if ! msg.is_empty() { + let now = chrono::Utc::now().timestamp(); + let out = format!("{}: [{}] -> {}\n",&now,&msg,&output_path); + println!("{}",&out); + } + Ok(()) +} + +pub fn run_command(str_cmd: &str) -> Result { + let mut args: Vec<&str> = str_cmd.split(" ").collect(); + let cmd = args[0].to_owned(); + args.remove(0); + // println!("cmd: {}, args: {:?}",&cmd,&args); + let result = Command::new(&cmd) + .args(&args) + .output()?; + // dbg!(&result); + match &result.status.code() { + Some(code) => + if format!("{}",code) == "0" { + let mut res=String::from(str::from_utf8(&result.stdout).unwrap_or_else(|_| "")); + if res.ends_with("\n") { + res.pop(); + } + res = res.replace("\n"," ").replace("",""); + Ok(res) + } else { + let err = str::from_utf8(&result.stderr).unwrap_or_else(|_| ""); + Err(anyhow!("Running {}: failed:\n {}",&cmd,&err)) + } + None => Ok(String::from("")) + } +}