From 648babaf4ccffcfe1327eac857f8b0cbe290319c Mon Sep 17 00:00:00 2001 From: JesusPerez Date: Sat, 11 Sep 2021 19:28:11 +0100 Subject: [PATCH] chore: monitor cloud definition and process --- Cargo.toml | 4 +- src/clouds.rs | 1 + src/clouds/monitor_rules.rs | 263 ++++++++++++++++++++++++++++++++++++ src/clouds/on_clouds.rs | 1 + src/defs.rs | 14 ++ src/lib.rs | 1 + src/monitor.rs | 237 ++++++++++++++++++++++++++++++++ 7 files changed, 520 insertions(+), 1 deletion(-) create mode 100644 src/clouds/monitor_rules.rs create mode 100644 src/monitor.rs diff --git a/Cargo.toml b/Cargo.toml index efcda5a..8d3a33f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] - anyhow = "1.0.40" chrono = "0.4.19" envmnt = "0.9.0" @@ -18,9 +17,12 @@ serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.8.17" +toml = "0.5.8" tempfile = "3.2.0" tar = "0.4.33" tera = "1.8.0" tokio = { version = "1.5.0", features = ["full"] } rfm = "0.8.0" +app_env = { version = "0.1.0", path = "../defs/app_env" } reqenv = { version = "0.1.0", path = "../handlers/reqenv" } +kloud = { version = "0.1.0", path = "../defs/kloud" } diff --git a/src/clouds.rs b/src/clouds.rs index 460bf65..cfeb4fe 100644 --- a/src/clouds.rs +++ b/src/clouds.rs @@ -1,3 +1,4 @@ pub mod defs; pub mod on_clouds; pub mod upcloud; +pub mod monitor_rules; diff --git a/src/clouds/monitor_rules.rs b/src/clouds/monitor_rules.rs new file mode 100644 index 0000000..a1f0a06 --- /dev/null +++ b/src/clouds/monitor_rules.rs @@ -0,0 +1,263 @@ +use std::collections::HashMap; +use serde::{Serialize,Deserialize,Deserializer}; +use std::fmt; +use anyhow::{Result}; + +use kloud::utils::load_fs_content; +use crate::monitor::write_str_data; + +#[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] +pub enum RuleContext { + None, + Server(String), + Service(String), + Ip(String), +} +impl Default for RuleContext { + fn default() -> Self { + RuleContext::None + } +} +impl fmt::Display for RuleContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RuleContext::Server(c) => write!(f,"server: {}",c), + RuleContext::None => write!(f,"none"), + RuleContext::Service(c) => write!(f,"service: {}",c), + RuleContext::Ip(c) => write!(f,"ip: {}",c), + } + //write!(f, "{:?}", self) + } +} + +impl RuleContext { + pub fn get_value(&self) -> String { + match self { + RuleContext::Server(c) => format!("{}",c), + RuleContext::None => format!(""), + RuleContext::Service(c) => format!("{}",c), + RuleContext::Ip(c) => format!("{}",c), + } + } +} + +#[derive(Eq, PartialEq, Clone, Serialize, Debug, Deserialize)] +pub enum MonitorAction { + None, + Start(String,String,String), + Stop(String,String,String), + Restart(String,String,String), + Notify(String,String,String), +} +impl Default for MonitorAction { + fn default() -> Self { + MonitorAction::None + } +} +impl fmt::Display for MonitorAction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MonitorAction::None => write!(f,"none"), + MonitorAction::Start(v,p,a) => write!(f,"start: {} {} {}",v,p,a), + MonitorAction::Stop(v,p,a) => write!(f,"stop: {} {} {}",v,p,a), + MonitorAction::Restart(v,p,a) => write!(f,"restart: {} {} {}",v,p,a), + MonitorAction::Notify(v,p,a) => write!(f,"notify: {} {} {}",v,p,a), + } + //write!(f, "{:?}", self) + } +} + +impl MonitorAction { + pub fn get_value(&self) -> String { + match self { + MonitorAction::None => format!(""), + MonitorAction::Start(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Stop(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Restart(v,p,a) => format!("{} {} {}",v,p,a), + MonitorAction::Notify(v,p,a) => format!("{} {} {}",v,p,a), + } + } +} + +#[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] +pub enum RuleOperator { + None, + Equal, + NotEqual, + Contains, + NotContains, +} + +impl Default for RuleOperator { + fn default() -> Self { + RuleOperator::None + } +} + +impl fmt::Display for RuleOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RuleOperator::None => write!(f,"none"), + RuleOperator::Equal => write!(f,"equal"), + RuleOperator::NotEqual=> write!(f,"not_equal"), + RuleOperator::Contains => write!(f,"matches"), + RuleOperator::NotContains => write!(f,"not_matches"), + } + } +} + +fn deserialize_rulecontext<'de, D>(deserializer: D) -> Result +where D: Deserializer<'de> { + let buf = String::deserialize(deserializer)?; + // let res = String::from_str(&buf).unwrap_or_else(|e|{ + // eprintln!("deserialize rulecontext error: {}",e); + // String::from("") + // }); + // println!("buf: {}",&buf); + let v: Vec<&str> = buf.split(':').collect(); + if v.len() > 1 { + Ok(match v[0] { + "Service" => RuleContext::Service(v[1].to_owned()), + "Server" => RuleContext::Server(v[1].to_owned()), + "Ip" => RuleContext::Ip(v[1].to_owned()), + _ => RuleContext::None + + }) + } else { + Ok(RuleContext::None) + } +} +fn deserialize_ruleactions<'de, D>(deserializer: D) -> Result, D::Error> +where D: Deserializer<'de> { + let buf = Vec::deserialize(deserializer)?; + let mut actions = Vec::new(); + buf.iter().for_each(|it: &String| { + let v: Vec<&str> = it.split(':').collect(); + if v.len() > 1 { + let arg: Vec<&str> = v[1].split(',').collect(); + if arg.len() > 1 { + actions.push(match v[0] { + "Start" => MonitorAction::Start(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Stop" => MonitorAction::Stop(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Restart" => MonitorAction::Restart(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + "Notify" => MonitorAction::Notify(arg[0].to_owned(),arg[1].to_owned(),arg[2].to_owned()), + _ => MonitorAction::None + }) + } + } + }); + Ok(actions) +} +// in kubernetes on wuji-cp-0 if pod with ns rook-ceph and name match "osd-0" is not in "Running state" && is in last "save state info" +// context target(server) selector(type pod - ns - match name - state !Running) + +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorRule { + pub id: String, + pub name: String, + pub description: String, + #[serde(deserialize_with = "deserialize_rulecontext")] + pub context: RuleContext, + pub command: String, + pub selector: HashMap, + pub operator: RuleOperator, + pub value: String, + pub result: bool, + pub use_state: bool, + pub schedule: String, + #[serde(deserialize_with = "deserialize_ruleactions")] + pub actions: Vec, +} + +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorRules { + pub name: String, + pub description: String, + pub rules: Vec, +} +impl MonitorRules { + pub fn new(name: String, description: String, rules: Vec ) -> Self { + Self { + name, + description, + rules, + } + } + pub fn load(root: &str, path: &str, format: &str) -> Self { + println!("Load Montior Rules ..."); + let data_rules = load_fs_content(&root, &path, &format); + let err_load = |e|{ + eprintln!("Error loading MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + MonitorRules::default() + }; + let res: MonitorRules; + if data_rules.is_empty() { + res = MonitorRules::default(); + } else { + println!("Parse Montior Rules as: {}",&format); + res = match format { + "json" => serde_json::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + "yaml" => serde_yaml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + "toml" => toml::from_str(&data_rules).unwrap_or_else(|e| err_load(format!("{}",e))), + _ => MonitorRules::default(), + }; + } + res + } + pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { + println!("Dump Montior Rules ..."); + let err_load = |e| { + eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + String::from("") + }; + let data_str = match format { + "json" => serde_json::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + "yaml" => serde_yaml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + "toml" => toml::to_string(&self).unwrap_or_else(|e| err_load(format!("{}",e))), + _ => String::from(""), + }; + if ! data_str.is_empty() { + println!("Dump loading MonitorRules to {}/{}.{}",&root,&path,&format); + let output_path = format!("{}/{}.{}",&root,&path,&format); + write_str_data(output_path, data_str, "Monitor rules dump".to_string())?; + } + Ok(()) + } +} + +#[allow(clippy::missing_docs_in_private_items)] +#[allow(non_snake_case)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct MonitorStates { + pub when: String, + pub id: String, + pub data: String, // to save JSON info +} +impl MonitorStates { + pub fn load(root: &str, path: &str, format: &str) -> Self { + println!("Load Montior States ..."); + let data_rules = load_fs_content(&root, &path, &format); + let res: MonitorStates = serde_json::from_str(&data_rules).unwrap_or_else(|e|{ + eprintln!("Error loading MonitorStates {}/{}.{}: {}",&root,&path,&format,e); + MonitorStates::default() + }); + res + } + pub fn dump(&self, root: &str, path: &str, format: &str) -> Result<()> { + println!("Dump Montior States ..."); + let data_str = serde_json::to_string(&self).unwrap_or_else(|e| { + eprintln!("Error dump MonitorRules {}/{}.{}: {}",&root,&path,&format,e); + String::from("") + }); + if ! data_str.is_empty() { + println!("Dump MonitorStates to {}/{}.{}",&root,&path,&format); + let output_path = format!("{}/{}.{}",&root,&path,&format); + write_str_data(output_path, data_str, "Monitor states dump".to_string())?; + } + Ok(()) + } +} diff --git a/src/clouds/on_clouds.rs b/src/clouds/on_clouds.rs index e9051f3..7770de6 100644 --- a/src/clouds/on_clouds.rs +++ b/src/clouds/on_clouds.rs @@ -626,6 +626,7 @@ pub async fn run_clouds_check(reqenv: &ReqEnv,cloud: &Cloud) -> Result<()> { let now = chrono::Utc::now().timestamp(); envmnt::set(format!("LAST_CHECK{}",&output_path), format!("{}",&now)); let result = on_cloud_req("check_job",&cloud,&reqenv,"monitor,liveness","","*").await; + // println!("{}",&output_path); if Path::new(&output_path).exists() { fs::remove_file(&output_path)?; } diff --git a/src/defs.rs b/src/defs.rs index 020e5cd..f2d6c62 100644 --- a/src/defs.rs +++ b/src/defs.rs @@ -115,6 +115,7 @@ pub enum TskSrvcName { devel, scale, klouds, + web, pause, } @@ -151,6 +152,7 @@ impl fmt::Display for TskSrvcName { TskSrvcName::klouds => write!(f,"klouds"), TskSrvcName::cloudmandala => write!(f,"cloudmandala"), TskSrvcName::zterton => write!(f,"zterton"), + TskSrvcName::web => write!(f,"web"), TskSrvcName::nfs => write!(f,"nfs"), TskSrvcName::pause => write!(f,"pause"), } @@ -185,6 +187,7 @@ impl TskSrvcName { "devel" => TskSrvcName::devel, "scale" => TskSrvcName::scale, "klouds" => TskSrvcName::klouds, + "web" => TskSrvcName::web, "pause" => TskSrvcName::pause, &_ => TskSrvcName::default(), } @@ -274,6 +277,17 @@ impl fmt::Display for ProviderName { } } +impl ProviderName { + pub fn set_provider(provider: String) -> ProviderName { + match provider.as_str() { + "none" => ProviderName::none, + "manual" => ProviderName::manual, + "upcloud" => ProviderName::upcloud, + _ => ProviderName::none, + } + } +} + #[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] #[allow(non_camel_case_types)] pub enum InfaceMode { diff --git a/src/lib.rs b/src/lib.rs index ba3bbe4..f22abfc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,4 @@ pub mod tsksrvcs; pub mod pkgs; pub mod cmds; pub mod utils; +pub mod monitor; diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 0000000..1f644ae --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,237 @@ +use std::{fs}; //,io}; +use std::fs::OpenOptions; +use std::path::Path; +use std::io::{Write}; +use std::str; +use anyhow::{anyhow,Result}; +// use tokio::runtime::Handle; +use std::process::{Command}; + +use app_env::{ + appenv::AppEnv, + // config::{Config} +}; +use crate::clouds::defs::{ + Cloud, +}; + +use crate::defs::ProviderName; + +use crate::clouds::monitor_rules::{ + MonitorRules, + MonitorRule, + RuleContext, + RuleOperator, + MonitorAction, +}; + +pub fn write_str_data(output_path: String, data_str: String, msg: String) -> Result<()> { + if Path::new(&output_path).exists() { + fs::remove_file(&output_path)?; + } + let mut file = OpenOptions::new().write(true).create(true).open(&output_path)?; + file.write_all(data_str.as_bytes())?; + if ! msg.is_empty() { + let now = chrono::Utc::now().timestamp(); + let out = format!("{}: [{}] -> {}\n",&now,&msg,&output_path); + println!("{}",&out); + } + Ok(()) +} +pub async fn on_action_server(task: &str, srvr: &str, provider: &str, args: &str, rule: &MonitorRule) -> Result<()> { + let provider_name = ProviderName::set_provider(provider.to_owned()); + match provider_name { + ProviderName::upcloud => { + let cmd = format!("upclapi -c {}server -id {} {}",&task,&srvr,&args); + println!("action {}: {} on {} {}",&task,&srvr,&provider_name,&args); + let res = run_command(&cmd).unwrap_or_else(|e|{ + eprintln!("Error {}: {}",&cmd,e); + String::from("") + }); + println!("{}",&res); + Ok(()) + }, + _ => Err(anyhow!("Provider {} not defined", &provider_name)) + } +} +pub async fn on_action_notify(chnl: &str, msg: &str, args: &str, rule: &MonitorRule) -> Result<()> { + Ok(()) +} +fn get_server(target: &str, val: &str) -> String { + let srvr: String; + if val == "$target" || val == "$server" { + srvr = target.to_owned(); + } else { + srvr = val.to_owned(); + } + srvr +} +pub async fn on_monitor_action(action: &MonitorAction, target: &str, rule: &MonitorRule) -> Result<()> { + match action { + MonitorAction::None => return Ok(()), + MonitorAction::Start(v,p,a) => + on_action_server("start",&get_server(target, v),&p,&a,&rule).await?, + MonitorAction::Stop(v,p,a) => + on_action_server("stop",&get_server(target, v),&p,&a,&rule).await?, + MonitorAction::Restart(v,p,a) => + on_action_server("restart",&get_server(target, v),&p,&a,&rule).await?, + MonitorAction::Notify(v,p,a) => on_action_notify(&v,&p,&a,&rule).await?, + }; + Ok(()) +} +pub fn on_monitor_operator(operator: &RuleOperator, target: &str, value: &str) -> bool { + match operator { + RuleOperator::None => true, + RuleOperator::Equal => target == value, + RuleOperator::NotEqual=> target != value, + RuleOperator::Contains => target.contains(value), + RuleOperator::NotContains => !target.contains(value), + } +} +pub async fn on_server_rule(srvr: &str, out_cmd: &str, rule: &MonitorRule) -> Result<()> { + Ok(()) +} +pub async fn on_service_rule(srvc: &str, out_cmd: &str, rule: &MonitorRule) -> Result<()> { + println!("Service: {} -> {}",&srvc,&out_cmd); + if !out_cmd.is_empty() { + let mut targets = out_cmd.split_whitespace(); + while let Some(target) = targets.next() { + // println!("target: {}",&target); + for action in &rule.actions { + on_monitor_action(action, &target, &rule).await?; + } + }; + } + Ok(()) +} +pub async fn on_ip_rule(ip: &str, out_cmd: &str, rule: &MonitorRule) -> Result<()> { + + // tokio::spawn(async move {run_on_rule(rule.to_owned()).await; }); + Ok(()) +} +pub fn run_command(str_cmd: &str) -> Result { + let mut args: Vec<&str> = str_cmd.split(" ").collect(); + let cmd = args[0].to_owned(); + args.remove(0); + // println!("cmd: {}, args: {:?}",&cmd,&args); + let result = Command::new(&cmd) + .args(&args) + .output()?; + // dbg!(&result); + match &result.status.code() { + Some(code) => + if format!("{}",code) == "0" { + let mut res=String::from(str::from_utf8(&result.stdout).unwrap_or_else(|_| "")); + if res.ends_with('\n') { + res.pop(); + } + res = res.replace('\n'," "); + Ok(res) + } else { + let err = str::from_utf8(&result.stderr).unwrap_or_else(|_| ""); + Err(anyhow!("Running {}: failed:\n {}",&cmd,&err)) + } + None => Ok(String::from("")) + } +} +pub fn run_on_selector(rule: &MonitorRule) -> Result { + let ctx_val = rule.context.get_value(); + let target: Vec<&str> = ctx_val.split("/").collect(); + let srv_path = match target.len() { + 2 => String::from(target[1]), + _ => String::from(""), + }; + let mut cmd = match &rule.context { + RuleContext::Server(v) => String::from(v), + RuleContext::Service(v) => + if v.contains("kubernetes") { + String::from("kubectl") + } else { + String::from(v) + } + , + RuleContext::Ip(v) => String::from(v), + RuleContext::None => String::from(""), + }; + if let Some(ctxpath) = rule.selector.get("ctxpath") { + cmd = format!("{}{} {}",&ctxpath,&srv_path,&cmd); + } + if let Some(run) = rule.selector.get("run") { + cmd = format!("{} {}",&run,&cmd); + } + if let Some(typ) = rule.selector.get("type") { + cmd = format!("{} get {}",&cmd,&typ); + } + if let Some(ns) = rule.selector.get("ns") { + cmd = format!("{} -n {}",&cmd,&ns); + } + if let Some(custom) = rule.selector.get("custom") { + cmd = format!("{} {}",&cmd,&custom); + } + if let Some(pat) = rule.selector.get("match") { + cmd = format!("{} | grep {}",&cmd,&pat); + } + if let Some(awk) = rule.selector.get("awk") { + cmd = format!("{} | awk '{}'",&cmd,&awk); + } + if let Some(sort) = rule.selector.get("sort") { + cmd = format!("{} | sort {}",&cmd,&sort); + } + if cmd.len() > 0 { + run_command(&cmd) + } else { + Err(anyhow!("Selector: failed:\n {}",&rule.id)) + } +} +pub async fn run_monitor(monitor_rules: MonitorRules, _cloud: Cloud, _app_env: AppEnv) -> Result<()> { + // dbg!(&monitor_rules); + println!("Run {}: {}",&monitor_rules.name,&monitor_rules.description); + for (idx,rule) in monitor_rules.rules.iter().enumerate() { + match rule.schedule.as_str() { + "inmediate" => { + // dbg!(&rule); + println!("{} [{}] {}: {}",&idx,&rule.id,&rule.name,&rule.description); + let out_cmd: String; + if rule.command.is_empty() { + out_cmd=run_on_selector(&rule).unwrap_or_else(|e| { + eprintln!("Error on selector {}: {}",&rule.id,e); + String::from("?") + }); + } else { + out_cmd=run_command(&rule.command).unwrap_or_else(|e| { + eprintln!("Error on command {}: {}",&rule.command,e); + String::from("?") + }); + } + if out_cmd.as_str() == "?" { + continue; + } + if ! on_monitor_operator(&rule.operator,&out_cmd, &rule.value) { + continue; + } + match &rule.context { + RuleContext::Server(v) => { + on_server_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e|{ + eprintln!("Error rule {} on {}: {}",&rule.id,&v,e); + }); + }, + RuleContext::None => { continue + }, + RuleContext::Service(v) => { + on_service_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e|{ + eprintln!("Error rule {} on {}: {}",&rule.id,&v,e); + }); + }, + RuleContext::Ip(v) => { + on_ip_rule(&v, &out_cmd, &rule).await.unwrap_or_else(|e|{ + eprintln!("Error rule {} on {}: {}",&rule.id,&v,e); + }); + }, + } + }, + _ => continue, + }; + } + + Ok(()) +} \ No newline at end of file