chore: add current provisioning state before migration

This commit is contained in:
Jesús Pérez 2025-09-22 23:11:41 +01:00
parent a9703b4748
commit 50745b0f22
660 changed files with 88126 additions and 0 deletions

View file

@ -0,0 +1,547 @@
#!/usr/bin/env nu
# Log Processing Module for Provisioning System
# Advanced log collection, parsing, and analysis using DataFrames
use polars_integration.nu *
use ../lib_provisioning/utils/settings.nu *
# Log sources configuration
export def get_log_sources []: nothing -> record {
{
system: {
paths: ["/var/log/syslog", "/var/log/messages"]
format: "syslog"
enabled: true
}
provisioning: {
paths: [
($env.PROVISIONING_PATH? | default "/usr/local/provisioning" | path join "logs")
"~/.provisioning/logs"
]
format: "json"
enabled: true
}
containers: {
paths: [
"/var/log/containers"
"/var/lib/docker/containers"
]
format: "json"
enabled: ($env.DOCKER_HOST? | is-not-empty)
}
kubernetes: {
command: "kubectl logs"
format: "json"
enabled: ((which kubectl | length) > 0)
}
cloud_providers: {
aws: {
cloudwatch: true
s3_logs: []
enabled: ($env.AWS_PROFILE? | is-not-empty)
}
gcp: {
stackdriver: true
enabled: ($env.GOOGLE_CLOUD_PROJECT? | is-not-empty)
}
}
}
}
# Collect logs from all configured sources
export def collect_logs [
--since: string = "1h"
--sources: list<string> = []
--output_format: string = "dataframe"
--filter_level: string = "info"
--include_metadata = true
]: nothing -> any {
print $"📊 Collecting logs from the last ($since)..."
let log_sources = get_log_sources
let enabled_sources = if ($sources | is-empty) {
$log_sources | transpose source config | where {|row| $row.config.enabled} | get source
} else {
$sources
}
print $"🔍 Enabled sources: ($enabled_sources | str join ', ')"
let collected_logs = ($enabled_sources | each {|source|
print $"📥 Collecting from: ($source)"
collect_from_source $source $log_sources.$source --since $since
} | flatten)
print $"📋 Collected ($collected_logs | length) log entries"
# Filter by log level
let filtered_logs = (filter_by_level $collected_logs $filter_level)
# Process into requested format
match $output_format {
"dataframe" => {
create_infra_dataframe $filtered_logs --source "logs"
}
"json" => {
$filtered_logs | to json
}
"csv" => {
$filtered_logs | to csv
}
_ => {
$filtered_logs
}
}
}
def collect_from_source [
source: string
config: record
--since: string = "1h"
]: nothing -> list {
match $source {
"system" => {
collect_system_logs $config --since $since
}
"provisioning" => {
collect_provisioning_logs $config --since $since
}
"containers" => {
collect_container_logs $config --since $since
}
"kubernetes" => {
collect_kubernetes_logs $config --since $since
}
_ => {
print $"⚠️ Unknown log source: ($source)"
[]
}
}
}
def collect_system_logs [
config: record
--since: string = "1h"
]: record -> list {
$config.paths | each {|path|
if ($path | path exists) {
let content = (read_recent_logs $path --since $since)
$content | each {|line|
parse_system_log_line $line $path
}
} else {
[]
}
} | flatten
}
def collect_provisioning_logs [
config: record
--since: string = "1h"
]: record -> list {
$config.paths | each {|log_dir|
if ($log_dir | path exists) {
let log_files = (ls ($log_dir | path join "*.log") | get name)
$log_files | each {|file|
if ($file | str ends-with ".json") {
collect_json_logs $file --since $since
} else {
collect_text_logs $file --since $since
}
} | flatten
} else {
[]
}
} | flatten
}
def collect_container_logs [
config: record
--since: string = "1h"
]: record -> list {
if ((which docker | length) > 0) {
collect_docker_logs --since $since
} else {
print "⚠️ Docker not available for container log collection"
[]
}
}
def collect_kubernetes_logs [
config: record
--since: string = "1h"
]: record -> list {
if ((which kubectl | length) > 0) {
collect_k8s_logs --since $since
} else {
print "⚠️ kubectl not available for Kubernetes log collection"
[]
}
}
def read_recent_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let since_timestamp = ((date now) - (parse_duration $since))
if ($file_path | path exists) {
# Use tail with approximate line count based on time
let estimated_lines = match $since {
"1m" => 100
"5m" => 500
"1h" => 3600
"1d" => 86400
_ => 1000
}
(tail -n $estimated_lines $file_path | lines)
} else {
[]
}
}
def parse_system_log_line [
line: string
source_file: string
]: nothing -> record {
# Parse standard syslog format
let syslog_pattern = '(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P<hostname>\S+)\s+(?P<process>\S+?)(\[(?P<pid>\d+)\])?:\s*(?P<message>.*)'
let parsed = ($line | parse --regex $syslog_pattern)
if ($parsed | length) > 0 {
let entry = $parsed.0
{
timestamp: (parse_syslog_timestamp $entry.timestamp)
level: (extract_log_level $entry.message)
message: $entry.message
hostname: $entry.hostname
process: $entry.process
pid: ($entry.pid? | default "")
source: $source_file
raw: $line
}
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
source: $source_file
raw: $line
}
}
}
def collect_json_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let lines = (read_recent_logs $file_path --since $since)
$lines | each {|line|
do {
let parsed = ($line | from json)
{
timestamp: (standardize_timestamp ($parsed.timestamp? | default (date now)))
level: ($parsed.level? | default "info")
message: ($parsed.message? | default $line)
service: ($parsed.service? | default "provisioning")
source: $file_path
metadata: ($parsed | reject timestamp level message service?)
raw: $line
}
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
{
timestamp: (date now)
level: "error"
message: $"Failed to parse JSON: ($line)"
source: $file_path
raw: $line
}
}
}
}
def collect_text_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let lines = (read_recent_logs $file_path --since $since)
$lines | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
source: $file_path
raw: $line
}
}
}
def collect_docker_logs [
--since: string = "1h"
]: nothing -> list {
do {
let containers = (docker ps --format "{{.Names}}" | lines)
$containers | each {|container|
let logs = (^docker logs --since $since $container | complete | get stdout | lines)
$logs | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
container: $container
source: "docker"
raw: $line
}
}
} | flatten
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
print "⚠️ Failed to collect Docker logs"
[]
}
}
def collect_k8s_logs [
--since: string = "1h"
]: nothing -> list {
do {
let pods = (kubectl get pods -o jsonpath='{.items[*].metadata.name}' | split row " ")
$pods | each {|pod|
let logs = (kubectl logs --since=$since $pod 2>/dev/null | lines)
$logs | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
pod: $pod
source: "kubernetes"
raw: $line
}
}
} | flatten
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
print "⚠️ Failed to collect Kubernetes logs"
[]
}
}
def parse_syslog_timestamp [ts: string]: string -> datetime {
do {
# Parse syslog timestamp format: "Jan 16 10:30:15"
let current_year = (date now | date format "%Y")
$"($current_year) ($ts)" | into datetime --format "%Y %b %d %H:%M:%S"
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
date now
}
}
def extract_log_level [message: string]: string -> string {
let level_patterns = {
"FATAL": "fatal"
"ERROR": "error"
"WARN": "warn"
"WARNING": "warning"
"INFO": "info"
"DEBUG": "debug"
"TRACE": "trace"
}
let upper_message = ($message | str upcase)
for level_key in ($level_patterns | columns) {
if ($upper_message | str contains $level_key) {
return ($level_patterns | get $level_key)
}
}
"info" # default level
}
def filter_by_level [
logs: list
level: string
]: nothing -> list {
let level_order = ["trace", "debug", "info", "warn", "warning", "error", "fatal"]
let min_index = ($level_order | enumerate | where {|row| $row.item == $level} | get index.0)
$logs | where {|log|
let log_level_index = ($level_order | enumerate | where {|row| $row.item == $log.level} | get index.0? | default 2)
$log_level_index >= $min_index
}
}
def parse_duration [duration: string]: string -> duration {
match $duration {
$dur if ($dur | str ends-with "m") => {
let minutes = ($dur | str replace "m" "" | into int)
$minutes * 60 * 1000 * 1000 * 1000 # nanoseconds
}
$dur if ($dur | str ends-with "h") => {
let hours = ($dur | str replace "h" "" | into int)
$hours * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds
}
$dur if ($dur | str ends-with "d") => {
let days = ($dur | str replace "d" "" | into int)
$days * 24 * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds
}
_ => {
3600 * 1000 * 1000 * 1000 # 1 hour default
}
} | into duration
}
# Analyze logs using DataFrame operations
export def analyze_logs [
logs_df: any
--analysis_type: string = "summary" # summary, errors, patterns, performance
--time_window: string = "1h"
--group_by: list<string> = ["service", "level"]
]: any -> any {
match $analysis_type {
"summary" => {
analyze_log_summary $logs_df $group_by
}
"errors" => {
analyze_log_errors $logs_df
}
"patterns" => {
analyze_log_patterns $logs_df $time_window
}
"performance" => {
analyze_log_performance $logs_df $time_window
}
_ => {
error make { msg: $"Unknown analysis type: ($analysis_type)" }
}
}
}
def analyze_log_summary [logs_df: any, group_cols: list<string>]: nothing -> any {
aggregate_dataframe $logs_df --group_by $group_cols --operations {
count: "count"
first_seen: "min"
last_seen: "max"
}
}
def analyze_log_errors [logs_df: any]: any -> any {
# Filter error logs and analyze patterns
query_dataframe $logs_df "SELECT * FROM logs_df WHERE level IN ('error', 'fatal', 'warn')"
}
def analyze_log_patterns [logs_df: any, time_window: string]: nothing -> any {
# Time series analysis of log patterns
time_series_analysis $logs_df --time_column "timestamp" --value_column "level" --window $time_window
}
def analyze_log_performance [logs_df: any, time_window: string]: nothing -> any {
# Analyze performance-related logs
query_dataframe $logs_df "SELECT * FROM logs_df WHERE message LIKE '%performance%' OR message LIKE '%slow%'"
}
# Generate log analysis report
export def generate_log_report [
logs_df: any
--output_path: string = "log_report.md"
--include_charts = false
]: any -> nothing {
let summary = analyze_logs $logs_df --analysis_type "summary"
let errors = analyze_logs $logs_df --analysis_type "errors"
let report = $"
# Log Analysis Report
Generated: (date now | date format '%Y-%m-%d %H:%M:%S')
## Summary
Total log entries: (query_dataframe $logs_df 'SELECT COUNT(*) as count FROM logs_df')
### Log Levels Distribution
(analyze_log_summary $logs_df ['level'] | to md --pretty)
### Services Overview
(analyze_log_summary $logs_df ['service'] | to md --pretty)
## Error Analysis
(analyze_log_errors $logs_df | to md --pretty)
## Recommendations
Based on the log analysis:
1. **Error Patterns**: Review services with high error rates
2. **Performance**: Investigate slow operations
3. **Monitoring**: Set up alerts for critical error patterns
---
Report generated by Provisioning System Log Analyzer
"
$report | save --force $output_path
print $"📊 Log analysis report saved to: ($output_path)"
}
# Real-time log monitoring
export def monitor_logs [
--follow = true
--alert_level: string = "error"
--callback: string = ""
]: nothing -> nothing {
print $"👀 Starting real-time log monitoring (alert level: ($alert_level))..."
if $follow {
# Start continuous monitoring
while true {
let recent_logs = collect_logs --since "1m" --filter_level $alert_level
if ($recent_logs | length) > 0 {
print $"🚨 Found ($recent_logs | length) ($alert_level) entries:"
$recent_logs | each {|log|
print $"[($log.timestamp)] ($log.level | str upcase): ($log.message)"
if ($callback | is-not-empty) {
# Execute callback command for alerts
do {
nu -c $callback
} | complete | if ($in.exit_code != 0) {
print $"⚠️ Failed to execute callback: ($callback)"
}
}
}
}
sleep 60sec # Check every minute
}
}
}

View file

@ -0,0 +1,513 @@
#!/usr/bin/env nu
# Polars DataFrame Integration for Provisioning System
# High-performance data processing for logs, metrics, and infrastructure state
use ../lib_provisioning/utils/settings.nu *
# Check if Polars plugin is available
export def check_polars_available []: nothing -> bool {
let plugins = (plugin list)
($plugins | any {|p| $p.name == "polars" or $p.name == "nu_plugin_polars"})
}
# Initialize Polars plugin if available
export def init_polars []: nothing -> bool {
if (check_polars_available) {
# Try to load polars plugin
do {
plugin use polars
true
} | complete | if ($in.exit_code == 0) {
true
} else {
print "⚠️ Warning: Polars plugin found but failed to load"
false
}
} else {
print " Polars plugin not available, using native Nushell operations"
false
}
}
# Create DataFrame from infrastructure data
export def create_infra_dataframe [
data: list
--source: string = "infrastructure"
--timestamp = true
]: list -> any {
let use_polars = init_polars
mut processed_data = $data
if $timestamp {
$processed_data = ($processed_data | each {|row|
$row | upsert timestamp (date now)
})
}
if $use_polars {
# Use Polars DataFrame
$processed_data | polars into-df
} else {
# Return enhanced Nushell table with DataFrame-like operations
$processed_data | enhance_nushell_table
}
}
# Process logs into DataFrame format
export def process_logs_to_dataframe [
log_files: list<string>
--format: string = "auto" # auto, json, csv, syslog, custom
--time_column: string = "timestamp"
--level_column: string = "level"
--message_column: string = "message"
]: list<string> -> any {
let use_polars = init_polars
# Collect and parse all log files
let parsed_logs = ($log_files | each {|file|
if ($file | path exists) {
parse_log_file $file --format $format
} else {
[]
}
} | flatten)
if ($parsed_logs | length) == 0 {
if $use_polars {
[] | polars into-df
} else {
[]
}
} else {
# Standardize log format
let standardized = ($parsed_logs | each {|log|
{
timestamp: (standardize_timestamp ($log | get $time_column))
level: ($log | get $level_column)
message: ($log | get $message_column)
source: ($log.source? | default "unknown")
service: ($log.service? | default "provisioning")
metadata: ($log | reject $time_column $level_column $message_column)
}
})
if $use_polars {
$standardized | polars into-df
} else {
$standardized | enhance_nushell_table
}
}
}
# Parse individual log file based on format
def parse_log_file [
file_path: string
--format: string = "auto"
]: string -> list {
if not ($file_path | path exists) {
return []
}
let content = (open $file_path --raw)
match $format {
"json" => {
# Parse JSON logs
$content | lines | each {|line|
do {
$line | from json
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
raw: true
}
}
}
}
"csv" => {
# Parse CSV logs
do {
$content | from csv
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
[]
}
}
"syslog" => {
# Parse syslog format
$content | lines | each {|line|
parse_syslog_line $line
}
}
"auto" => {
# Auto-detect format
if ($file_path | str ends-with ".json") {
parse_log_file $file_path --format "json"
} else if ($file_path | str ends-with ".csv") {
parse_log_file $file_path --format "csv"
} else {
parse_log_file $file_path --format "syslog"
}
}
_ => {
# Custom format - treat as plain text
$content | lines | each {|line|
{
timestamp: (date now)
level: "info"
message: $line
source: $file_path
}
}
}
}
}
# Parse syslog format line
def parse_syslog_line [line: string]: string -> record {
# Basic syslog parsing - can be enhanced
let parts = ($line | parse --regex '(?P<timestamp>\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P<host>\S+)\s+(?P<service>\S+):\s*(?P<message>.*)')
if ($parts | length) > 0 {
let parsed = $parts.0
{
timestamp: $parsed.timestamp
level: "info" # Default level
message: $parsed.message
host: $parsed.host
service: $parsed.service
}
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
}
}
}
# Standardize timestamp formats
def standardize_timestamp [ts: any]: any -> datetime {
match ($ts | describe) {
"string" => {
do {
$ts | into datetime
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
date now
}
}
"datetime" => $ts,
_ => (date now)
}
}
# Enhance Nushell table with DataFrame-like operations
def enhance_nushell_table []: list -> list {
let data = $in
# Add DataFrame-like methods through custom commands
$data | add_dataframe_methods
}
def add_dataframe_methods []: list -> list {
# This function adds metadata to enable DataFrame-like operations
# In a real implementation, we'd add custom commands to the scope
$in
}
# Query DataFrame with SQL-like syntax
export def query_dataframe [
df: any
query: string
--use_polars = false
]: any -> any {
if $use_polars and (check_polars_available) {
# Use Polars query capabilities
$df | polars query $query
} else {
# Fallback to Nushell operations
query_with_nushell $df $query
}
}
def query_with_nushell [df: any, query: string]: nothing -> any {
# Simple SQL-like query parser for Nushell
# This is a basic implementation - can be significantly enhanced
if ($query | str downcase | str starts-with "select") {
let parts = ($query | str replace --regex "(?i)select\\\\s+" "" | split row " from ")
if ($parts | length) >= 2 {
let columns = ($parts.0 | split row ",")
let conditions = if ($parts | length) > 2 { $parts.2 } else { "" }
mut result = $df
if $columns != ["*"] {
$result = ($result | select ($columns | each {|c| $c | str trim}))
}
if ($conditions | str contains "where") {
# Basic WHERE clause processing
$result = (process_where_clause $result $conditions)
}
$result
} else {
$df
}
} else {
$df
}
}
def process_where_clause [data: any, conditions: string]: nothing -> any {
# Basic WHERE clause implementation
# This would need significant enhancement for production use
$data
}
# Aggregate data with common operations
export def aggregate_dataframe [
df: any
--group_by: list<string> = []
--operations: record = {} # {column: operation}
--time_bucket: string = "1h" # For time-based aggregations
]: any -> any {
let use_polars = init_polars
if $use_polars and (check_polars_available) {
# Use Polars aggregation
aggregate_with_polars $df $group_by $operations $time_bucket
} else {
# Use Nushell aggregation
aggregate_with_nushell $df $group_by $operations $time_bucket
}
}
def aggregate_with_polars [
df: any
group_cols: list<string>
operations: record
time_bucket: string
]: nothing -> any {
# Polars aggregation implementation
if ($group_cols | length) > 0 {
$df | polars group-by $group_cols | polars agg [
(polars col "value" | polars sum)
(polars col "value" | polars mean)
(polars col "value" | polars count)
]
} else {
$df
}
}
def aggregate_with_nushell [
df: any
group_cols: list<string>
operations: record
time_bucket: string
]: nothing -> any {
# Nushell aggregation implementation
if ($group_cols | length) > 0 {
$df | group-by ($group_cols | str join " ")
} else {
$df
}
}
# Time series analysis operations
export def time_series_analysis [
df: any
--time_column: string = "timestamp"
--value_column: string = "value"
--window: string = "1h"
--operations: list<string> = ["mean", "sum", "count"]
]: any -> any {
let use_polars = init_polars
if $use_polars and (check_polars_available) {
time_series_with_polars $df $time_column $value_column $window $operations
} else {
time_series_with_nushell $df $time_column $value_column $window $operations
}
}
def time_series_with_polars [
df: any
time_col: string
value_col: string
window: string
ops: list<string>
]: nothing -> any {
# Polars time series operations
$df | polars group-by $time_col | polars agg [
(polars col $value_col | polars mean)
(polars col $value_col | polars sum)
(polars col $value_col | polars count)
]
}
def time_series_with_nushell [
df: any
time_col: string
value_col: string
window: string
ops: list<string>
]: nothing -> any {
# Nushell time series - basic implementation
$df | group-by {|row|
# Group by time windows - simplified
($row | get $time_col) | date format "%Y-%m-%d %H:00:00"
} | each {|group_data|
let values = ($group_data | get $value_col)
{
time_window: "grouped"
mean: ($values | math avg)
sum: ($values | math sum)
count: ($values | length)
}
}
}
# Export DataFrame to various formats
export def export_dataframe [
df: any
output_path: string
--format: string = "csv" # csv, parquet, json, excel
]: any -> nothing {
let use_polars = init_polars
match $format {
"csv" => {
if $use_polars and (check_polars_available) {
$df | polars save $output_path
} else {
$df | to csv | save --force $output_path
}
}
"parquet" => {
if $use_polars and (check_polars_available) {
$df | polars save $output_path
} else {
error make { msg: "Parquet format requires Polars plugin" }
}
}
"json" => {
$df | to json | save --force $output_path
}
_ => {
error make { msg: $"Unsupported format: ($format)" }
}
}
print $"✅ DataFrame exported to: ($output_path) (format: ($format))"
}
# Performance comparison: Polars vs Nushell
export def benchmark_operations [
data_size: int = 10000
operations: list<string> = ["filter", "group", "aggregate"]
]: int -> record {
print $"🔬 Benchmarking operations with ($data_size) records..."
# Generate test data
let test_data = (0..$data_size | each {|i|
{
id: $i
value: (random int 1..100)
category: (random int 1..5 | into string)
timestamp: (date now)
}
})
let results = {}
# Benchmark with Nushell
let nushell_start = (date now)
let nushell_result = (benchmark_nushell_operations $test_data $operations)
let nushell_duration = ((date now) - $nushell_start)
$results | insert nushell {
duration_ms: ($nushell_duration | into int)
operations_per_sec: ($data_size / ($nushell_duration | into int) * 1000)
}
# Benchmark with Polars (if available)
if (check_polars_available) {
let polars_start = (date now)
let polars_result = (benchmark_polars_operations $test_data $operations)
let polars_duration = ((date now) - $polars_start)
$results | insert polars {
duration_ms: ($polars_duration | into int)
operations_per_sec: ($data_size / ($polars_duration | into int) * 1000)
}
$results | insert performance_gain (
($results.nushell.duration_ms / $results.polars.duration_ms)
)
}
$results
}
def benchmark_nushell_operations [data: list, ops: list<string>]: nothing -> any {
mut result = $data
if "filter" in $ops {
$result = ($result | where value > 50)
}
if "group" in $ops {
$result = ($result | group-by category)
}
if "aggregate" in $ops {
$result = ($result | each {|group| {
category: $group.0
count: ($group.1 | length)
avg_value: ($group.1 | get value | math avg)
}})
}
$result
}
def benchmark_polars_operations [data: list, ops: list<string>]: nothing -> any {
mut df = ($data | polars into-df)
if "filter" in $ops {
$df = ($df | polars filter (polars col value))
}
if "group" in $ops {
$df = ($df | polars group-by "category")
}
if "aggregate" in $ops {
$df = ($df | polars agg [
(polars col "id" | polars count)
(polars col "value" | polars mean)
])
}
$df
}