provisioning/taskservs/nushell/observability/process.nu

419 lines
14 KiB
Plaintext
Raw Normal View History

# Log Processing and Analysis Scripts for Nushell Infrastructure
# Advanced log parsing, filtering, and transformation capabilities
# Parse structured logs from various formats
export def parse-logs [
--format(-f): string = "auto" # json, syslog, apache, nginx, auto
--filter: string # Filter expression
--transform: string # Transform expression
] -> list<record> {
let input_data = $in
# Auto-detect format if not specified
let detected_format = if $format == "auto" {
if ($input_data | first | str starts-with "{") {
"json"
} else if ($input_data | first | str contains "T") {
"syslog"
} else {
"text"
}
} else {
$format
}
# Parse based on format
mut parsed_logs = match $detected_format {
"json" => {
$input_data | lines | where $it != "" | each { |line|
try {
$line | from json
} catch {
{raw: $line, parse_error: true}
}
}
}
"syslog" => {
$input_data | lines | each { |line|
# RFC3164 syslog format: <priority>timestamp hostname tag: message
let syslog_pattern = '<(?P<priority>\d+)>(?P<timestamp>\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P<hostname>\S+)\s+(?P<tag>\S+):\s*(?P<message>.*)'
try {
let matches = ($line | parse -r $syslog_pattern)
if ($matches | length) > 0 {
$matches | first
} else {
{raw: $line, format: "syslog"}
}
} catch {
{raw: $line, parse_error: true}
}
}
}
"apache" => {
$input_data | lines | each { |line|
# Apache Combined Log Format
let apache_pattern = '(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<method>\S+)\s+(?P<url>\S+)\s+(?P<protocol>[^"]+)"\s+(?P<status>\d+)\s+(?P<size>\d+|-)\s+"(?P<referer>[^"]*)"\s+"(?P<user_agent>[^"]*)"'
try {
let matches = ($line | parse -r $apache_pattern)
if ($matches | length) > 0 {
$matches | first
} else {
{raw: $line, format: "apache"}
}
} catch {
{raw: $line, parse_error: true}
}
}
}
"nginx" => {
$input_data | lines | each { |line|
# Nginx default log format
let nginx_pattern = '(?P<ip>\S+)\s+-\s+-\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<method>\S+)\s+(?P<url>\S+)\s+(?P<protocol>[^"]+)"\s+(?P<status>\d+)\s+(?P<size>\d+)\s+"(?P<referer>[^"]*)"\s+"(?P<user_agent>[^"]*)"'
try {
let matches = ($line | parse -r $nginx_pattern)
if ($matches | length) > 0 {
$matches | first
} else {
{raw: $line, format: "nginx"}
}
} catch {
{raw: $line, parse_error: true}
}
}
}
_ => {
$input_data | lines | enumerate | each { |item|
{
line_number: $item.index
message: $item.item
timestamp: (date now | format date "%Y-%m-%d %H:%M:%S")
}
}
}
}
# Apply filter if specified
if ($filter | is-not-empty) {
$parsed_logs = ($parsed_logs | filter { |log|
try {
nu -c $"($log) | ($filter)"
} catch {
false
}
})
}
# Apply transformation if specified
if ($transform | is-not-empty) {
$parsed_logs = ($parsed_logs | each { |log|
try {
nu -c $"($log) | ($transform)"
} catch {
$log
}
})
}
return $parsed_logs
}
# Aggregate logs by time windows
export def aggregate-by-time [
logs: list<record>
--window(-w): string = "1h" # Time window: 1m, 5m, 1h, 1d
--field(-f): string = "timestamp" # Timestamp field name
--metric(-m): string = "count" # Aggregation metric: count, sum, avg, max, min
--group(-g): string # Group by field
] -> list<record> {
# Parse time window
let window_duration = match $window {
"1m" => 60
"5m" => 300
"1h" => 3600
"1d" => 86400
_ => 3600 # Default to 1 hour
}
# Convert timestamps to epoch and create time buckets
mut processed_logs = ($logs | each { |log|
let timestamp_value = ($log | get -i $field | default (date now))
let epoch = ($timestamp_value | date to-timezone UTC | format date "%s" | into int)
let bucket = (($epoch / $window_duration) * $window_duration)
$log | insert time_bucket $bucket | insert epoch $epoch
})
# Group by time bucket and optional field
let grouped = if ($group | is-not-empty) {
$processed_logs | group-by time_bucket $group
} else {
$processed_logs | group-by time_bucket
}
# Aggregate based on metric
$grouped | transpose bucket logs | each { |bucket_data|
let bucket_timestamp = ($bucket_data.bucket | into int | into datetime | format date "%Y-%m-%d %H:%M:%S")
let logs_in_bucket = $bucket_data.logs
match $metric {
"count" => {
{
timestamp: $bucket_timestamp
window: $window
count: ($logs_in_bucket | length)
}
}
"sum" => {
# Requires a numeric field to sum
{
timestamp: $bucket_timestamp
window: $window
sum: ($logs_in_bucket | get value | math sum)
}
}
"avg" => {
{
timestamp: $bucket_timestamp
window: $window
average: ($logs_in_bucket | get value | math avg)
}
}
_ => {
{
timestamp: $bucket_timestamp
window: $window
count: ($logs_in_bucket | length)
logs: $logs_in_bucket
}
}
}
} | sort-by timestamp
}
# Detect anomalies in log patterns
export def detect-anomalies [
logs: list<record>
--field(-f): string = "message" # Field to analyze
--threshold(-t): float = 2.0 # Standard deviation threshold
--window(-w): string = "1h" # Time window for baseline
] -> list<record> {
# Calculate baseline statistics
let baseline_window = match $window {
"1m" => 60
"5m" => 300
"1h" => 3600
"1d" => 86400
_ => 3600
}
let now = (date now)
let baseline_start = ($now - ($baseline_window * 1sec))
# Filter logs for baseline period
let baseline_logs = ($logs | where {|log|
let log_time = ($log | get -i timestamp | default $now)
$log_time >= $baseline_start and $log_time <= $now
})
if ($baseline_logs | length) == 0 {
return []
}
# Count occurrences by time buckets
let time_series = ($baseline_logs | aggregate-by-time --window "5m" --field timestamp --metric count)
# Calculate statistics
let counts = ($time_series | get count)
let mean = ($counts | math avg)
let std_dev = ($counts | math stddev)
# Find anomalies (values beyond threshold standard deviations)
let anomaly_threshold_high = ($mean + ($threshold * $std_dev))
let anomaly_threshold_low = ($mean - ($threshold * $std_dev))
let anomalies = ($time_series | where {|bucket|
$bucket.count > $anomaly_threshold_high or $bucket.count < $anomaly_threshold_low
})
return ($anomalies | each { |anomaly|
$anomaly | insert {
anomaly_type: (if $anomaly.count > $anomaly_threshold_high { "spike" } else { "drop" })
severity: (if (($anomaly.count - $mean) | math abs) > (3 * $std_dev) { "high" } else { "medium" })
baseline_mean: $mean
baseline_stddev: $std_dev
}
})
}
# Extract patterns and insights from logs
export def extract-patterns [
logs: list<record>
--field(-f): string = "message" # Field to analyze
--pattern-type(-t): string = "error" # error, ip, url, email, custom
--custom-regex(-r): string # Custom regex pattern
--min-frequency(-m): int = 2 # Minimum pattern frequency
] -> list<record> {
let field_values = ($logs | get $field | where $it != null)
let patterns = match $pattern_type {
"error" => {
# Common error patterns
let error_regexes = [
'error:?\s*(.+)',
'exception:?\s*(.+)',
'failed:?\s*(.+)',
'timeout:?\s*(.+)',
'connection\s*refused:?\s*(.+)'
]
mut all_matches = []
for regex in $error_regexes {
let matches = ($field_values | each { |value|
try {
$value | parse -r $regex | each { |match| $match."capture0" }
} catch {
[]
}
} | flatten)
$all_matches = ($all_matches | append $matches)
}
$all_matches
}
"ip" => {
# IP address pattern
let ip_regex = '\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
$field_values | each { |value|
try {
$value | parse -r $ip_regex
} catch {
[]
}
} | flatten
}
"url" => {
# URL pattern
let url_regex = 'https?://[^\s<>"]+'
$field_values | each { |value|
try {
$value | parse -r $url_regex
} catch {
[]
}
} | flatten
}
"email" => {
# Email pattern
let email_regex = '\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
$field_values | each { |value|
try {
$value | parse -r $email_regex
} catch {
[]
}
} | flatten
}
"custom" => {
if ($custom_regex | is-not-empty) {
$field_values | each { |value|
try {
$value | parse -r $custom_regex
} catch {
[]
}
} | flatten
} else {
[]
}
}
_ => []
}
# Count pattern frequencies
let pattern_counts = ($patterns | group-by {|x| $x} | transpose pattern occurrences | each { |item|
{
pattern: $item.pattern
frequency: ($item.occurrences | length)
examples: ($item.occurrences | first 3)
}
} | where frequency >= $min_frequency | sort-by frequency -r)
return $pattern_counts
}
# Generate log summary report
export def generate-summary [
logs: list<record>
--timeframe(-t): string = "24h" # Timeframe for analysis
--include-patterns(-p) # Include pattern analysis
--include-anomalies(-a) # Include anomaly detection
] -> record {
let total_logs = ($logs | length)
let start_time = (date now | format date "%Y-%m-%d %H:%M:%S")
if $total_logs == 0 {
return {
summary: "No logs to analyze"
timestamp: $start_time
total_logs: 0
}
}
# Basic statistics
let time_range = ($logs | get -i timestamp | default [] | each { |ts| $ts | date to-timezone UTC })
let earliest = ($time_range | math min)
let latest = ($time_range | math max)
# Log level distribution
let level_distribution = ($logs | get -i level | default [] | group-by {|x| $x} | transpose level count | each { |item|
{level: $item.level, count: ($item.count | length)}
} | sort-by count -r)
# Source distribution
let source_distribution = ($logs | get -i source | default [] | group-by {|x| $x} | transpose source count | each { |item|
{source: $item.source, count: ($item.count | length)}
} | sort-by count -r)
mut summary_report = {
analysis_timestamp: $start_time
timeframe: $timeframe
total_logs: $total_logs
time_range: {
earliest: ($earliest | format date "%Y-%m-%d %H:%M:%S")
latest: ($latest | format date "%Y-%m-%d %H:%M:%S")
duration_hours: ((($latest | date to-timezone UTC) - ($earliest | date to-timezone UTC)) / 1hr | math round --precision 2)
}
distribution: {
by_level: $level_distribution
by_source: $source_distribution
}
statistics: {
logs_per_hour: (($total_logs / ((($latest | date to-timezone UTC) - ($earliest | date to-timezone UTC)) / 1hr)) | math round --precision 2)
unique_sources: ($source_distribution | length)
error_rate: (($logs | where {|log| ($log | get -i level | default "") =~ "error|critical|fatal"} | length) / $total_logs * 100 | math round --precision 2)
}
}
# Add pattern analysis if requested
if $include_patterns {
let error_patterns = (extract-patterns $logs --pattern-type error --min-frequency 2)
let ip_patterns = (extract-patterns $logs --pattern-type ip --min-frequency 3)
$summary_report = ($summary_report | insert patterns {
errors: $error_patterns
ip_addresses: ($ip_patterns | first 10)
})
}
# Add anomaly detection if requested
if $include_anomalies {
let anomalies = (detect-anomalies $logs --threshold 2.0 --window "1h")
$summary_report = ($summary_report | insert anomalies {
detected: ($anomalies | length)
high_severity: ($anomalies | where severity == "high" | length)
details: ($anomalies | first 5)
})
}
return $summary_report
}