use std::fmt; use std::collections::HashMap; use serde::{Serialize,Deserialize,Deserializer}; use anyhow::{anyhow,Result}; use std::path::Path; use app_env::{ appenv::{AppEnv}, DataStore, applogs::AppLogs, // config::{Config} }; use crate::defs::{ProviderName}; use kloud::utils::load_fs_content; use crate::clouds::defs::{ Cloud }; use crate::monitor::utils::{run_command,write_str_data}; fn parse_target(target: String, val: &str) -> String { let srvr: String; if val.contains("$target") { srvr = val.replace("$target", &target); } else if val.contains("$server") { srvr = val.replace("$server", &target); } else { srvr = val.to_owned(); } srvr } #[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] pub enum RuleContext { None, Server(String), Service(String), Ip(String), } impl Default for RuleContext { fn default() -> Self { RuleContext::None } } impl fmt::Display for RuleContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RuleContext::Server(c) => write!(f,"server: {}",c), RuleContext::None => write!(f,"none"), RuleContext::Service(c) => write!(f,"service: {}",c), RuleContext::Ip(c) => write!(f,"ip: {}",c), } //write!(f, "{:?}", self) } } impl RuleContext { pub fn get_value(&self) -> String { match self { RuleContext::Server(c) => format!("{}",c), RuleContext::None => format!(""), RuleContext::Service(c) => format!("{}",c), RuleContext::Ip(c) => format!("{}",c), } } } #[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] pub enum MonitorAction { None, Start(String,String,String), Stop(String,String,String), Restart(String,String,String), Notify(String,String,String), } impl Default for MonitorAction { fn default() -> Self { MonitorAction::None } } impl fmt::Display for MonitorAction { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { MonitorAction::None => write!(f,"none"), MonitorAction::Start(v,p,a) => write!(f,"start: {} {} {}",v,p,a), MonitorAction::Stop(v,p,a) => write!(f,"stop: {} {} {}",v,p,a), MonitorAction::Restart(v,p,a) => write!(f,"restart: {} {} {}",v,p,a), MonitorAction::Notify(v,p,a) => write!(f,"notify: {} {} {}",v,p,a), } //write!(f, "{:?}", self) } } impl MonitorAction { pub fn get_value(&self) -> String { match self { MonitorAction::None => format!(""), MonitorAction::Start(v,p,a) => format!("{} {} {}",v,p,a), MonitorAction::Stop(v,p,a) => format!("{} {} {}",v,p,a), MonitorAction::Restart(v,p,a) => format!("{} {} {}",v,p,a), MonitorAction::Notify(v,p,a) => format!("{} {} {}",v,p,a), } } pub fn is_notify(&self) -> bool { match self { MonitorAction::Notify(_,_,_) => true, _ => false } } pub async fn on(&self, target: String, rule: &MonitorRule) -> Result<()> { match self { MonitorAction::None => return Ok(()), MonitorAction::Start(v,p,a) => self.on_server("start",parse_target(target, v),p,a,&rule).await?, MonitorAction::Stop(v,p,a) => self.on_server("stop",parse_target(target, v),p,a,&rule).await?, MonitorAction::Restart(v,p,a) => self.on_server("restart",parse_target(target, v),p,a,&rule).await?, MonitorAction::Notify(v,p,a) => self.on_notify(&v,parse_target(target ,p),a,&rule).await?, }; Ok(()) } pub async fn on_server(&self, task: &str, srvr: String, provider: &str, args: &str, _rule: &MonitorRule) -> Result<()> { let debug = envmnt::get_isize("DEBUG",0); let provider_name = ProviderName::set_provider(provider.to_owned()); match provider_name { ProviderName::upcloud => { let cmd = format!("upclapi -c {}server -id {} {}",&task,&srvr,&args); if debug > 1 { println!("action {}: {} on {} {}",&task,&srvr,&provider_name,&args); } let res = run_command(&cmd).unwrap_or_else(|e|{ eprintln!("Error {}: {}",&cmd,e); String::from("") }); if debug > 0 { println!("{}",&res); } Ok(()) }, _ => Err(anyhow!("Provider {} not defined", &provider_name)) } } pub async fn on_notify(&self, chnl: &str, msg: String, args: &str, rule: &MonitorRule) -> Result<()> { let notificator_path = envmnt::get_or("NOTIFICATOR_BIN",""); if notificator_path.is_empty() || ! Path::new(¬ificator_path).exists() { return Err(anyhow!("Error notificator:")); } let message=format!("Monitor {}: {} {}",&rule.id,&msg,&args); let cmd = format!("{} {} {}",¬ificator_path,&chnl,&message); let _ = run_command(&cmd).unwrap_or_else(|e|{ eprintln!("Error {}: {}",&cmd,e); String::from("") }); Ok(()) } } #[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum RuleSchedule { None, Check, OnDemand, } impl Default for RuleSchedule { fn default() -> Self { RuleSchedule::None } } impl fmt::Display for RuleSchedule { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RuleSchedule::None => write!(f,"none"), RuleSchedule::Check=> write!(f,"chechk"), RuleSchedule::OnDemand=> write!(f,"ondemand"), } } } #[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum RuleOperator { None, Equal, NotEqual, Contains, NotContains, } impl Default for RuleOperator { fn default() -> Self { RuleOperator::None } } impl RuleOperator { pub fn on(&self, target: &str, value: &str) -> bool { match self { RuleOperator::None => true, RuleOperator::Equal => target == value, RuleOperator::NotEqual=> target != value, RuleOperator::Contains => target.contains(value), RuleOperator::NotContains => !target.contains(value), } } } impl fmt::Display for RuleOperator { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RuleOperator::None => write!(f,"none"), RuleOperator::Equal => write!(f,"equal"), RuleOperator::NotEqual=> write!(f,"not_equal"), RuleOperator::Contains => write!(f,"matches"), RuleOperator::NotContains => write!(f,"not_matches"), } } } #[allow(clippy::missing_docs_in_private_items)] #[allow(non_snake_case)] #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct MonitorStates { pub when: String, pub id: String, pub data: String, // to save JSON info } impl MonitorStates { pub fn load(root: &str, path: &str, format: &str) -> Self { println!("Load Montior States ..."); let data_rules = load_fs_content(&root, &path, &format); let res: MonitorStates = serde_json::from_str(&data_rules).unwrap_or_else(|e|{ eprintln!("Error loading MonitorStates {}/{}.{}: {}",&root,&path,&format,e); MonitorStates::default() }); res } pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { println!("Dump Montior States ..."); let data_str = serde_json::to_string(&self).unwrap_or_else(|e| { eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); String::from("") }); if ! data_str.is_empty() { println!("Dump MonitorStates to {}/{}.{}",&root,&path,&format); let output_path = format!("{}/{}.{}",&root,&path,&format); write_str_data(output_path, data_str, "Monitor states dump".to_string())?; } Ok(()) } } fn deserialize_rulecontext<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { let buf = String::deserialize(deserializer)?; // let res = String::from_str(&buf).unwrap_or_else(|e|{ // eprintln!("deserialize rulecontext error: {}",e); // String::from("") // }); // println!("buf: {}",&buf); let v: Vec<&str> = buf.split(':').collect(); if v.len() > 1 { Ok(match v[0] { "Service" => RuleContext::Service(v[1].to_owned()), "Server" => RuleContext::Server(v[1].to_owned()), "Ip" => RuleContext::Ip(v[1].to_owned()), "None"|"" => RuleContext::None, _ => { eprintln!("rulecontext not defined for: {} use None",v[0]); RuleContext::None } }) } else { Ok(RuleContext::None) } } fn deserialize_ruleactions<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de> { let buf = Vec::deserialize(deserializer)?; let mut actions = Vec::new(); buf.iter().for_each(|it: &String| { let v: Vec<&str> = it.split(':').collect(); if v.len() > 1 { let arg: Vec<&str> = v[1].split(',').collect(); if arg.len() > 1 { actions.push(match v[0] { "Start" => MonitorAction::Start(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), "Stop" => MonitorAction::Stop(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), "Restart" => MonitorAction::Restart(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), "Notify" => MonitorAction::Notify(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), "None"|"" => MonitorAction::None, _ => { eprintln!("monitor action not defined for: {} use None",v[0]); MonitorAction::None } }) } } }); Ok(actions) } #[allow(clippy::missing_docs_in_private_items)] #[allow(non_snake_case)] #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct MonitorRule { pub id: String, pub name: String, pub description: String, #[serde(deserialize_with = "deserialize_rulecontext")] pub context: RuleContext, pub command: String, pub selector: HashMap, pub operator: RuleOperator, pub value: String, pub result: bool, pub use_in_state: bool, pub wait_checks: isize, pub schedule: RuleSchedule, #[serde(deserialize_with = "deserialize_ruleactions")] pub actions: Vec, } impl MonitorRule { pub fn on_selector(&self) -> Result { let ctx_val = self.context.get_value(); let target: Vec<&str> = ctx_val.split("/").collect(); let srv_path = match target.len() { 2 => String::from(target[1]), _ => String::from(""), }; let mut cmd = match &self.context { RuleContext::Server(v) => String::from(v), RuleContext::Service(v) => if v.contains("kubernetes") { String::from("kubectl") } else { String::from(v) } , RuleContext::Ip(v) => String::from(v), RuleContext::None => String::from(""), }; if let Some(ctxpath) = self.selector.get("ctxpath") { cmd = format!("{}{} {}",&ctxpath,&srv_path,&cmd); } if let Some(run) = self.selector.get("run") { cmd = format!("{} {}",&run,&cmd); } if let Some(typ) = self.selector.get("type") { cmd = format!("{} get {}",&cmd,&typ); } if let Some(ns) = self.selector.get("ns") { cmd = format!("{} -n {}",&cmd,&ns); } if let Some(custom) = self.selector.get("custom") { cmd = format!("{} {}",&cmd,&custom); } if let Some(pat) = self.selector.get("match") { cmd = format!("{} | grep {}",&cmd,&pat); } if let Some(awk) = self.selector.get("awk") { cmd = format!("{} | awk '{}'",&cmd,&awk); } if let Some(sort) = self.selector.get("sort") { cmd = format!("{} | sort {}",&cmd,&sort); } if cmd.len() > 0 { run_command(&cmd) } else { Err(anyhow!("Selector: failed:\n {}",&self.id)) } } pub async fn log_state(&self, app_env: &AppEnv, env_name: &str, env_state: isize, msg: &str, out_cmd: &str) -> Result<()> { let now = format!("{}", chrono::Utc::now().timestamp()); let log_entry = AppLogs { id: format!("{}_{}", self.id,&now), name: format!("self: {}", self.name), when: String::from("run_monitor_self"), source: format!("{}", &env_name), target: format!("{}", &out_cmd), state: format!("{}", &env_state), msg: format!("{}", &msg), }; if app_env.config.logs_store == DataStore::File { log_entry.write_applogs_entry(format!("{}/monitor.log",&app_env.config.logs_path).as_str()).await?; } Ok(()) } pub async fn on_server(&self,_srvr: &str, _out_cmd: &str) -> Result<()> { Ok(()) } pub async fn on_service(&self, srvc: &str, out_cmd: &str) -> Result<()> { let debug = envmnt::get_isize("DEBUG",0); if debug > 1 { println!("Service: {} -> {}",&srvc,&out_cmd); } if !out_cmd.is_empty() { let mut targets = out_cmd.split_whitespace(); while let Some(target) = targets.next() { // println!("target: {}",&target); for action in &self.actions { action.on(target.to_owned(), self).await?; } }; } Ok(()) } pub async fn on_ip(&self,_ip: &str, _out_cmd: &str) -> Result<()> { // tokio::spawn(async move {run_on_rule(rule.to_owned()).await; }); Ok(()) } pub async fn on_context(&self, out_cmd: &str) -> String { let target: String; let res = match &self.context { RuleContext::Server(v) => { target = v.to_owned(); self.on_server(&v, &out_cmd).await }, RuleContext::None => return String::from(""), RuleContext::Service(v) => { target = v.to_owned(); self.on_service(&v, &out_cmd).await }, RuleContext::Ip(v) => { target = v.to_owned(); self.on_ip(&v, &out_cmd).await }, }; match res { Ok(_) => String::from(""), Err(e) => { format!("Error rule {} on {}: {}",&self.id,target,e) } } } pub async fn notify(&self, msg: String) { for item in self.actions.iter() { if item.is_notify() { let _ = item.on(msg.to_owned(),&self).await; } } } pub async fn run(&self,monitor_rules: &MonitorRules, idx: usize, app_env: &AppEnv) -> Result<()> { let debug = envmnt::get_isize("DEBUG",0); if debug > 0 { println!("{} [{}] {}: {}",&idx,&self.id,&self.name,&self.description); } let out_cmd: String; if self.command.is_empty() { out_cmd=self.on_selector().unwrap_or_else(|e| { eprintln!("Error on selector {}: {}",&self.id,e); String::from("?") }); } else { out_cmd=run_command(&self.command).unwrap_or_else(|e| { eprintln!("Error on command {}: {}",&self.command,e); String::from("?") }); } if out_cmd.as_str() == "?" { return Ok(()); } let monitor_name = format!("{}_MONITOR",&monitor_rules.name.to_uppercase().replace(" ","_")); let monitor_name_value = envmnt::get_or(&monitor_name,""); let env_name = format!("MONITOR_{}",&self.id); let env_state = envmnt::get_isize(&env_name,0); if ! &self.operator.on(&out_cmd, &self.value) { if env_state > 0 { let msg = format!("{} Ok in state: {}",&env_name,&env_state); if debug > 0 { println!("{}",&msg); } envmnt::set_isize(&env_name, 0); self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; self.notify(format!("Ok state from -> {}",&env_state)).await; if monitor_name_value == self.id { envmnt::set(&monitor_name,""); } } return Ok(()); } if ! monitor_name_value.is_empty() { let msg = format!("Monitor {}: rule {} is active, waiting for {}",&monitor_rules.name,&monitor_name_value,&self.id); if debug > 0 { println!("{}",&msg); } self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; return Ok(()); } if env_state > 0 && env_state < self.wait_checks { let msg = format!("{} is already set: {} increment counter",&env_name,&env_state); envmnt::increment(&env_name); if debug > 0 { println!("{}",&msg); } self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; return Ok(()); } let result_rule_context = self.on_context(&out_cmd).await; let msg = match result_rule_context.is_empty() { true => if self.operator == RuleOperator::None { format!("{}: done",&env_name) } else { format!("{}: 1 done",&env_name) }, false => format!("{} {}",&env_name,&result_rule_context), }; if self.operator == RuleOperator::None { envmnt::set(&monitor_name,""); } else { envmnt::set(&monitor_name,format!("{}",&self.id)); envmnt::set_isize(&env_name,1); } if debug > 0 { println!("{}",&msg); } self.log_state(&app_env, &env_name, env_state, &msg, &out_cmd).await?; Ok(()) } } #[allow(clippy::missing_docs_in_private_items)] #[allow(non_snake_case)] #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct MonitorRules { pub name: String, pub description: String, pub rules: Vec, } impl MonitorRules { pub fn new(name: String, description: String, rules: Vec ) -> Self { Self { name, description, rules, } } pub fn load(root: &str, path: &str, format: &str) -> Self { let data_rules = load_fs_content(&root, &path, &format); let err_load = |e|{ eprintln!("Error loading MonitorRules {}/{}.{}: {}",&root,&path,&format,e); MonitorRules::default() }; let res: MonitorRules; if data_rules.is_empty() { res = MonitorRules::default(); } else { // println!("Parse Montior Rules as: {}",&format); res = match format { "json" => serde_json::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), "yaml" => serde_yaml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), "toml" => toml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), _ => MonitorRules::default(), }; } res } pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { let err_load = |e| { eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); String::from("") }; let data_str = match format { "json" => serde_json::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), "yaml" => serde_yaml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), "toml" => toml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), _ => String::from(""), }; if ! data_str.is_empty() { if envmnt::get_isize("DEBUG",0) > 0 { println!("Dump loading MonitorRules to {}/{}.{}",&root,&path,&format); } let output_path = format!("{}/{}.{}",&root,&path,&format); write_str_data(output_path, data_str, "Monitor rules dump".to_string())?; } Ok(()) } pub async fn run(&self, _cloud: Cloud, app_env: AppEnv) -> Result<()> { // println!("Run {}: {}",&monitor_rules.name,&monitor_rules.description); for (idx,rule) in self.rules.iter().enumerate() { match rule.schedule { RuleSchedule::Check => { // dbg!(&rule); let mut err = String::from(""); rule.run(&self,idx,&app_env).await.unwrap_or_else(|e| { err = format!("Error rule {}: {}",&rule.id,e); eprintln!("{}",&err) }); match err.is_empty() { true => continue, false => rule.log_state(&app_env, &rule.name, -1, &err, "").await.unwrap_or_else(|e| eprintln!("Unable to log monitor entry: {}",e) ), }; }, RuleSchedule::OnDemand => continue, RuleSchedule::None => continue, }; } Ok(()) } }