From 26b2f7730001e2542663e4f43e195af4de22ea9b Mon Sep 17 00:00:00 2001 From: JesusPerez Date: Sun, 12 Sep 2021 17:07:58 +0100 Subject: [PATCH] chore: move tasks to job schedule, move main to async, result for tasks, clean-up --- src/main.rs | 224 +++++++++++++++++++--------------------------------- 1 file changed, 82 insertions(+), 142 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5397cf5..58050b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,48 +1,32 @@ // use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_cron_scheduler::{JobScheduler, Job}; +pub type BxDynResult = std::result::Result>; use app_env::{ AppStore, appenv::AppEnv, appinfo::AppInfo, appdata::AppData, - config::{Config,WebServer} + config::{Config} }; use clds::clouds::{ monitor_rules::{MonitorRules}, }; use clds::monitor::run_monitor; - use app_auth::AuthStore; use reject_filters::{handle_rejection}; -// use zterton::models::{Terton}; -//use std::fs; //, io}; -// use std::fs::OpenOptions; -// use std::io::Write; -//use std::path::Path; -// use serde_yaml::Value; use anyhow::{Result}; -// use std::env; -//use warp::{http::Response as HttpResponse, Filter, filters::BoxedFilter}; use warp::{ // http::{StatusCode}, http::{method::Method, HeaderMap}, Filter, }; -// use warp::filters::header::headers_cloned; -// use warp::path::FullPath; -// use warp::http::{Uri, HeaderMap, HeaderValue}; - -//use crate::utils::set_ta_data; use crate::defs::{DataDBs,CollsData,load_cloud_env}; use clds::clouds::defs::{ Cloud, }; -// use clds::app_env::config::Config; use clds::clouds::on_clouds::{make_cloud_cache,run_clouds_check}; use reqenv::ReqEnv; -// #[macro_use] -// extern crate kloud_entries_macro_derive; - // static WEBSERVER: AtomicUsize = AtomicUsize::new(0); const PKG_VERSION: &'static str = env!("CARGO_PKG_VERSION"); // const PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); @@ -55,9 +39,6 @@ pub mod graphql; pub mod filters; pub mod handlers; -// pub const MODEL_PATH: &String = String::from("./auth/auth_model.conf"); -// pub const POLICY_PATH: &String = String::from("./auth/policy.csv"); - async fn create_auth_store(app_env: &AppEnv,verbose: &str) -> AuthStore { let config = app_env.get_curr_websrvr_config(); let model_path = config.st_auth_model_path(); @@ -87,31 +68,10 @@ async fn up_web_server(webpos: usize) -> Result<()> { if verbose != "quiet" { println!("ZTerton web services at {}",&zterton_env); } - // envmnt::set("ZTERTON", "WEBSERVER"); return Ok(()); - /* - let app = Zterton::new( - app_env.config.srv_protocol.to_owned(), - app_env.config.srv_host.to_owned(), - app_env.config.srv_port, - ); - let serverstring = format!("{}:{}",&srv_host,&srv_port); - match std::net::TcpStream::connect(&serverstring) { - Ok(_serverstream) => { - Ok(()) - Err(anyhow!("Source {}: Connection to '{}' for tsksrvc '{}' failed: {}",&source,&serverstring,&tsk_name,&e)) - }, - Err(e) => { - // handle_input(serverstream); - } - } - */ } // WEBSERVER.store(1,Ordering::Relaxed); - // TODO pass root file-name frmt from AppEnv Config - // if init at load - // set_ta_data(&app_env).await?; println!("Loading webserver: {} ({})",&config.name,&app_env.curr_web); let (app, socket) = zterton::start_web(&mut app_env).await; @@ -122,8 +82,6 @@ async fn up_web_server(webpos: usize) -> Result<()> { // As static casbin println!("Load auth store ..."); let auth_store = create_auth_store(&app_env,"").await; - // dbg!(&auth_store.users.read().await); - // dbg!(&auth_store.shadows.read().await); println!("Load data store ..."); @@ -133,8 +91,6 @@ async fn up_web_server(webpos: usize) -> Result<()> { auth: auth_store.to_owned(), }; println!("Load web filters ..."); - // let store = warp::any().map(move || ta_store.clone()); - //let routes = warp::any().map(|| "Hello, World!"); // let us get some static boxes from config values: let log_name = app_env.config.st_log_name(); @@ -160,7 +116,7 @@ async fn up_web_server(webpos: usize) -> Result<()> { app_auth_filters::auth(app_store.clone(),auth_store.clone(),cors.clone()); // .with(cors.clone()); let gqli_api = - // // If not graphiQL comment/remove next line Interface GiQL MUST BEFORE graphql post with schema + // If not graphiQL comment/remove next line Interface GiQL MUST BEFORE graphql post with schema // app_api.to_owned() graphql::graphiql(gql_path, giql_path, data_dbs.clone()).await; if giql_path.len() > 0 { @@ -179,14 +135,6 @@ async fn up_web_server(webpos: usize) -> Result<()> { let kloud_api = filters::CollFilters::new("kloud") .filters_config(data_dbs.clone(),cloud.clone(),cors.clone()); - // let ta_api = - // filters::CollFilters::new("ta").filters(&config, data_dbs.clone(),cors.clone()); - -// let tp_api = filters::CollFilters::new("tp").filters(&config, data_dbs.clone(),cors.clone()); -// .or(tracking_point::load_tp_filters().filters(&config, data_dbs.clone())) -// .or(topographic_anatomy::filters::ta(&config, data_dbs.clone())) -// .or(tracking_point::filters::tp(&config, data_dbs.clone())) - let file_api = app_file_filters::files(app_store.clone(),auth_store.clone()).with(cors.clone()); // Path for static files, better to be LAST let fs_api = warp::fs::dir(html_path).with(warp::compression::gzip()); @@ -194,16 +142,13 @@ async fn up_web_server(webpos: usize) -> Result<()> { let app_api = auth_api .or(gqli_api).or(gql_api) .or(kloud_api) - // .or(ta_api) - // .or(tp_api) .or(file_api) .or(fs_api) .recover(move | error: warp::Rejection| handle_rejection(error, app_store.clone())) .boxed(); // Wrap routes with log to get info let routes = app_api.with(warp::log(log_name)); -// let routes = app_api.with(cors).with(warp::log(log_name)); - + //let routes = app_api.with(cors).with(warp::log(log_name)); println!( "Starting http server: {}://{}:{}", &app.protocol, &app.host, &app.port @@ -226,7 +171,7 @@ async fn up_web_server(webpos: usize) -> Result<()> { } Ok(()) } -pub async fn run_cache_clouds() { +pub async fn run_cache_clouds() -> Result<()> { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); @@ -243,7 +188,7 @@ pub async fn run_cache_clouds() { let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet", &arg_cfg_path); if ! config_content.contains("run_mode") { - return; + return Ok(()); } app_env.config = Config::new(config_content,"quiet"); let app_store = AppStore::new(AppData::new(app_env.to_owned())); @@ -257,11 +202,12 @@ pub async fn run_cache_clouds() { Method::GET, "/config", "config", "kloud" ); - let _ = make_cloud_cache(&reqenv,&cloud).await; + let res = make_cloud_cache(&reqenv,&cloud).await; println!("Cache service on Clouds: done {} __________ ",chrono::Utc::now().timestamp()); + res } -pub async fn run_check_clouds() { +pub async fn run_check_clouds() -> Result<()> { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); @@ -278,7 +224,7 @@ pub async fn run_check_clouds() { let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet",&arg_cfg_path); if ! config_content.contains("run_mode") { - return; + return Ok(()); } app_env.config = Config::new(config_content,"quiet"); let app_store = AppStore::new(AppData::new(app_env.to_owned())); @@ -292,10 +238,11 @@ pub async fn run_check_clouds() { Method::GET, "/config", "config", "kloud" ); - let _ = run_clouds_check(&reqenv,&cloud).await; + let res = run_clouds_check(&reqenv,&cloud).await; println!("Check Cloud service: done {} __________ ",chrono::Utc::now().timestamp()); + res } -pub async fn run_clouds_monitor() { +pub async fn run_clouds_monitor() -> Result<()> { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); @@ -312,24 +259,19 @@ pub async fn run_clouds_monitor() { let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet",&arg_cfg_path); if ! config_content.contains("run_mode") { - return; + return Ok(()); } app_env.config = Config::new(config_content,"quiet"); - - let monitor_rules = MonitorRules::load(&app_env.config.monitor_rules_path,&app_env.config.monitor_rules_file,&app_env.config.monitor_rules_format); - // monitor_rules.rules[0].context = RuleContext::Service(String::from("kubernetes")); + //let monitor_sched_task = app_env.config.get_schedtask("monitor"); + let monitor_rules = MonitorRules::load(&app_env.config.monitor_rules_path,&app_env.config.monitor_rules_file,&app_env.config.monitor_rules_format); if monitor_rules.rules.len() > 0 { - let _ = run_monitor(monitor_rules,cloud,app_env).await; - } - println!("Monitor Cloud: done {} __________ ",chrono::Utc::now().timestamp()); + run_monitor(monitor_rules,cloud,app_env).await? + } + Ok(()) } -// for standalone server & async use -// #[tokio::main] -// pub async fn main() -> Result<()> { - -pub fn main() -> Result<()> { +#[tokio::main] +pub async fn main() -> BxDynResult<()> { //std::io::Result<()> { let args: Vec = std::env::args().collect(); - // println!("I got {:?} arguments: {:?}.", args.len() - 1, &args[1..]); if args.len() > 1 { match args[1].as_str() { "-h" | "--help" => @@ -341,6 +283,7 @@ pub fn main() -> Result<()> { _ => println!("{}",PKG_NAME), } } + let mut sched = JobScheduler::new(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); args.iter().enumerate().for_each(|(idx,arg)| { @@ -355,73 +298,70 @@ pub fn main() -> Result<()> { dotenv::from_path(env_path)?; } pretty_env_logger::init(); -// assert!(output.is_ok()); - let loop_duration: u64; - let run_websrvrs: bool; - let run_cache: bool; - let run_check: bool; - let run_monitor: bool; - let websrvrs: Vec; - { - let config_content = Config::load_file_content("quiet", &arg_cfg_path); - if config_content.contains("run_mode") { - let config = Config::new(config_content,"quiet"); - loop_duration = config.loop_duration; - // loop_duration = 10; - run_websrvrs = config.run_websrvrs; - run_cache = config.run_cache; - run_check = config.run_check; - run_monitor = config.run_monitor; - websrvrs = config.websrvrs; - if run_cache { - println!("Running 'cloud_cache' every {} seconds in LOOP",&loop_duration); - } - if run_check { - println!("Running 'cloud_check' every {} seconds in LOOP",&loop_duration); + let config_content = Config::load_file_content("quiet", &arg_cfg_path); + if !config_content.contains("run_mode") { + panic!("Error no run_mode found"); + } + let config = Config::new(config_content,"quiet"); + if config.run_schedtasks { + for it in &config.schedtasks { + if ! it.on_start { + continue; } - } else { - loop_duration = 0; - run_websrvrs = false; - run_cache = false; - run_check = false; - run_monitor = false; - websrvrs = Vec::new(); + match it.name.as_str() { + "monitor" => tokio::spawn(async {run_clouds_monitor().await}), + "check" => tokio::spawn(async {run_check_clouds().await}), + "cache" => tokio::spawn(async {run_cache_clouds().await}), + _ => { + eprintln!("Error task {} not defined",&it.name); + continue; + }, + }; } } - - // println!("content: {}",&config_content); - let rt = tokio::runtime::Runtime::new().unwrap_or_else(|e| - panic!("Error create tokio runtime {}",e) - ); - if run_websrvrs { - websrvrs.iter().enumerate().for_each(|(pos,it)| { - rt.block_on(async move { - println!("{} -> {}",it.name,pos); - tokio::spawn(async move {up_web_server(pos).await}); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - }); - }); + if config.run_websrvrs { + for (pos,it) in config.websrvrs.iter().enumerate() { + println!("{} -> {}",it.name,pos); + tokio::spawn(async move {up_web_server(pos).await}); + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + } } - loop { - rt.block_on(async move { - if run_monitor { - tokio::spawn(async {run_clouds_monitor().await }); // For async task + if config.run_schedtasks { + for it in config.schedtasks { + if it.schedule.is_empty() { + eprintln!("Task {} no schedule defined",&it.name); + continue; } - if run_check { - tokio::spawn(async {run_check_clouds().await }); // For async task - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let res = match it.name.as_str() { + "monitor" => + sched.add(Job::new(&it.schedule.to_owned(), move |uuid, _l| { + println!("Schedule {} {}: {}",&it.name,&it.schedule,uuid); + tokio::spawn(async {run_clouds_monitor().await}); + })?), + "check" => + sched.add(Job::new(&it.schedule.to_owned(), move |uuid, _l| { + println!("Schedule {} {}: {}",&it.name,&it.schedule,uuid); + tokio::spawn(async {run_check_clouds().await}); + })?), + "cache" => + sched.add(Job::new(&it.schedule.to_owned(), move |uuid, _l| { + println!("Schedule {} {}: {}",&it.name,&it.schedule,uuid); + tokio::spawn(async {run_cache_clouds().await}); + })?), + _ => { + eprintln!("Error task {} not defined",&it.name); + continue; + }, + }; + match res { + Ok(_) => { continue; }, + Err(e) => { + eprintln!("Error scheduling task {}",e); + continue; + }, } - // { - // // For blocking task: - // let join_handle = tokio::task::spawn_blocking(|| do_my_task()); - // tokio::spawn(async { do_my_task().await; }); - // join_handle.await; // TODO: should handle error here - // } - if run_cache { - tokio::spawn(async {run_cache_clouds().await }); // For async task - } - println!("LOOP: {} __________",chrono::Utc::now().timestamp()); - tokio::time::sleep(tokio::time::Duration::from_secs(loop_duration)).await; - }); + }; + let _= sched.start().await; } + Ok(()) }