513 lines
14 KiB
Plaintext
513 lines
14 KiB
Plaintext
![]() |
#!/usr/bin/env nu
|
|||
|
|
|||
|
# Polars DataFrame Integration for Provisioning System
|
|||
|
# High-performance data processing for logs, metrics, and infrastructure state
|
|||
|
|
|||
|
use ../lib_provisioning/utils/settings.nu *
|
|||
|
|
|||
|
# Check if Polars plugin is available
|
|||
|
export def check_polars_available []: nothing -> bool {
|
|||
|
let plugins = (plugin list)
|
|||
|
($plugins | any {|p| $p.name == "polars" or $p.name == "nu_plugin_polars"})
|
|||
|
}
|
|||
|
|
|||
|
# Initialize Polars plugin if available
|
|||
|
export def init_polars []: nothing -> bool {
|
|||
|
if (check_polars_available) {
|
|||
|
# Try to load polars plugin
|
|||
|
do {
|
|||
|
plugin use polars
|
|||
|
true
|
|||
|
} | complete | if ($in.exit_code == 0) {
|
|||
|
true
|
|||
|
} else {
|
|||
|
print "⚠️ Warning: Polars plugin found but failed to load"
|
|||
|
false
|
|||
|
}
|
|||
|
} else {
|
|||
|
print "ℹ️ Polars plugin not available, using native Nushell operations"
|
|||
|
false
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Create DataFrame from infrastructure data
|
|||
|
export def create_infra_dataframe [
|
|||
|
data: list
|
|||
|
--source: string = "infrastructure"
|
|||
|
--timestamp = true
|
|||
|
]: list -> any {
|
|||
|
|
|||
|
let use_polars = init_polars
|
|||
|
|
|||
|
mut processed_data = $data
|
|||
|
|
|||
|
if $timestamp {
|
|||
|
$processed_data = ($processed_data | each {|row|
|
|||
|
$row | upsert timestamp (date now)
|
|||
|
})
|
|||
|
}
|
|||
|
|
|||
|
if $use_polars {
|
|||
|
# Use Polars DataFrame
|
|||
|
$processed_data | polars into-df
|
|||
|
} else {
|
|||
|
# Return enhanced Nushell table with DataFrame-like operations
|
|||
|
$processed_data | enhance_nushell_table
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Process logs into DataFrame format
|
|||
|
export def process_logs_to_dataframe [
|
|||
|
log_files: list<string>
|
|||
|
--format: string = "auto" # auto, json, csv, syslog, custom
|
|||
|
--time_column: string = "timestamp"
|
|||
|
--level_column: string = "level"
|
|||
|
--message_column: string = "message"
|
|||
|
]: list<string> -> any {
|
|||
|
|
|||
|
let use_polars = init_polars
|
|||
|
|
|||
|
# Collect and parse all log files
|
|||
|
let parsed_logs = ($log_files | each {|file|
|
|||
|
if ($file | path exists) {
|
|||
|
parse_log_file $file --format $format
|
|||
|
} else {
|
|||
|
[]
|
|||
|
}
|
|||
|
} | flatten)
|
|||
|
|
|||
|
if ($parsed_logs | length) == 0 {
|
|||
|
if $use_polars {
|
|||
|
[] | polars into-df
|
|||
|
} else {
|
|||
|
[]
|
|||
|
}
|
|||
|
} else {
|
|||
|
# Standardize log format
|
|||
|
let standardized = ($parsed_logs | each {|log|
|
|||
|
{
|
|||
|
timestamp: (standardize_timestamp ($log | get $time_column))
|
|||
|
level: ($log | get $level_column)
|
|||
|
message: ($log | get $message_column)
|
|||
|
source: ($log.source? | default "unknown")
|
|||
|
service: ($log.service? | default "provisioning")
|
|||
|
metadata: ($log | reject $time_column $level_column $message_column)
|
|||
|
}
|
|||
|
})
|
|||
|
|
|||
|
if $use_polars {
|
|||
|
$standardized | polars into-df
|
|||
|
} else {
|
|||
|
$standardized | enhance_nushell_table
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Parse individual log file based on format
|
|||
|
def parse_log_file [
|
|||
|
file_path: string
|
|||
|
--format: string = "auto"
|
|||
|
]: string -> list {
|
|||
|
|
|||
|
if not ($file_path | path exists) {
|
|||
|
return []
|
|||
|
}
|
|||
|
|
|||
|
let content = (open $file_path --raw)
|
|||
|
|
|||
|
match $format {
|
|||
|
"json" => {
|
|||
|
# Parse JSON logs
|
|||
|
$content | lines | each {|line|
|
|||
|
do {
|
|||
|
$line | from json
|
|||
|
} | complete | if ($in.exit_code == 0) {
|
|||
|
$in.stdout
|
|||
|
} else {
|
|||
|
{
|
|||
|
timestamp: (date now)
|
|||
|
level: "unknown"
|
|||
|
message: $line
|
|||
|
raw: true
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
"csv" => {
|
|||
|
# Parse CSV logs
|
|||
|
do {
|
|||
|
$content | from csv
|
|||
|
} | complete | if ($in.exit_code == 0) {
|
|||
|
$in.stdout
|
|||
|
} else {
|
|||
|
[]
|
|||
|
}
|
|||
|
}
|
|||
|
"syslog" => {
|
|||
|
# Parse syslog format
|
|||
|
$content | lines | each {|line|
|
|||
|
parse_syslog_line $line
|
|||
|
}
|
|||
|
}
|
|||
|
"auto" => {
|
|||
|
# Auto-detect format
|
|||
|
if ($file_path | str ends-with ".json") {
|
|||
|
parse_log_file $file_path --format "json"
|
|||
|
} else if ($file_path | str ends-with ".csv") {
|
|||
|
parse_log_file $file_path --format "csv"
|
|||
|
} else {
|
|||
|
parse_log_file $file_path --format "syslog"
|
|||
|
}
|
|||
|
}
|
|||
|
_ => {
|
|||
|
# Custom format - treat as plain text
|
|||
|
$content | lines | each {|line|
|
|||
|
{
|
|||
|
timestamp: (date now)
|
|||
|
level: "info"
|
|||
|
message: $line
|
|||
|
source: $file_path
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Parse syslog format line
|
|||
|
def parse_syslog_line [line: string]: string -> record {
|
|||
|
# Basic syslog parsing - can be enhanced
|
|||
|
let parts = ($line | parse --regex '(?P<timestamp>\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P<host>\S+)\s+(?P<service>\S+):\s*(?P<message>.*)')
|
|||
|
|
|||
|
if ($parts | length) > 0 {
|
|||
|
let parsed = $parts.0
|
|||
|
{
|
|||
|
timestamp: $parsed.timestamp
|
|||
|
level: "info" # Default level
|
|||
|
message: $parsed.message
|
|||
|
host: $parsed.host
|
|||
|
service: $parsed.service
|
|||
|
}
|
|||
|
} else {
|
|||
|
{
|
|||
|
timestamp: (date now)
|
|||
|
level: "unknown"
|
|||
|
message: $line
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Standardize timestamp formats
|
|||
|
def standardize_timestamp [ts: any]: any -> datetime {
|
|||
|
match ($ts | describe) {
|
|||
|
"string" => {
|
|||
|
do {
|
|||
|
$ts | into datetime
|
|||
|
} | complete | if ($in.exit_code == 0) {
|
|||
|
$in.stdout
|
|||
|
} else {
|
|||
|
date now
|
|||
|
}
|
|||
|
}
|
|||
|
"datetime" => $ts,
|
|||
|
_ => (date now)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Enhance Nushell table with DataFrame-like operations
|
|||
|
def enhance_nushell_table []: list -> list {
|
|||
|
let data = $in
|
|||
|
|
|||
|
# Add DataFrame-like methods through custom commands
|
|||
|
$data | add_dataframe_methods
|
|||
|
}
|
|||
|
|
|||
|
def add_dataframe_methods []: list -> list {
|
|||
|
# This function adds metadata to enable DataFrame-like operations
|
|||
|
# In a real implementation, we'd add custom commands to the scope
|
|||
|
$in
|
|||
|
}
|
|||
|
|
|||
|
# Query DataFrame with SQL-like syntax
|
|||
|
export def query_dataframe [
|
|||
|
df: any
|
|||
|
query: string
|
|||
|
--use_polars = false
|
|||
|
]: any -> any {
|
|||
|
|
|||
|
if $use_polars and (check_polars_available) {
|
|||
|
# Use Polars query capabilities
|
|||
|
$df | polars query $query
|
|||
|
} else {
|
|||
|
# Fallback to Nushell operations
|
|||
|
query_with_nushell $df $query
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
def query_with_nushell [df: any, query: string]: nothing -> any {
|
|||
|
# Simple SQL-like query parser for Nushell
|
|||
|
# This is a basic implementation - can be significantly enhanced
|
|||
|
|
|||
|
if ($query | str downcase | str starts-with "select") {
|
|||
|
let parts = ($query | str replace --regex "(?i)select\\\\s+" "" | split row " from ")
|
|||
|
if ($parts | length) >= 2 {
|
|||
|
let columns = ($parts.0 | split row ",")
|
|||
|
let conditions = if ($parts | length) > 2 { $parts.2 } else { "" }
|
|||
|
|
|||
|
mut result = $df
|
|||
|
|
|||
|
if $columns != ["*"] {
|
|||
|
$result = ($result | select ($columns | each {|c| $c | str trim}))
|
|||
|
}
|
|||
|
|
|||
|
if ($conditions | str contains "where") {
|
|||
|
# Basic WHERE clause processing
|
|||
|
$result = (process_where_clause $result $conditions)
|
|||
|
}
|
|||
|
|
|||
|
$result
|
|||
|
} else {
|
|||
|
$df
|
|||
|
}
|
|||
|
} else {
|
|||
|
$df
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
def process_where_clause [data: any, conditions: string]: nothing -> any {
|
|||
|
# Basic WHERE clause implementation
|
|||
|
# This would need significant enhancement for production use
|
|||
|
$data
|
|||
|
}
|
|||
|
|
|||
|
# Aggregate data with common operations
|
|||
|
export def aggregate_dataframe [
|
|||
|
df: any
|
|||
|
--group_by: list<string> = []
|
|||
|
--operations: record = {} # {column: operation}
|
|||
|
--time_bucket: string = "1h" # For time-based aggregations
|
|||
|
]: any -> any {
|
|||
|
|
|||
|
let use_polars = init_polars
|
|||
|
|
|||
|
if $use_polars and (check_polars_available) {
|
|||
|
# Use Polars aggregation
|
|||
|
aggregate_with_polars $df $group_by $operations $time_bucket
|
|||
|
} else {
|
|||
|
# Use Nushell aggregation
|
|||
|
aggregate_with_nushell $df $group_by $operations $time_bucket
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
def aggregate_with_polars [
|
|||
|
df: any
|
|||
|
group_cols: list<string>
|
|||
|
operations: record
|
|||
|
time_bucket: string
|
|||
|
]: nothing -> any {
|
|||
|
# Polars aggregation implementation
|
|||
|
if ($group_cols | length) > 0 {
|
|||
|
$df | polars group-by $group_cols | polars agg [
|
|||
|
(polars col "value" | polars sum)
|
|||
|
(polars col "value" | polars mean)
|
|||
|
(polars col "value" | polars count)
|
|||
|
]
|
|||
|
} else {
|
|||
|
$df
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
def aggregate_with_nushell [
|
|||
|
df: any
|
|||
|
group_cols: list<string>
|
|||
|
operations: record
|
|||
|
time_bucket: string
|
|||
|
]: nothing -> any {
|
|||
|
# Nushell aggregation implementation
|
|||
|
if ($group_cols | length) > 0 {
|
|||
|
$df | group-by ($group_cols | str join " ")
|
|||
|
} else {
|
|||
|
$df
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Time series analysis operations
|
|||
|
export def time_series_analysis [
|
|||
|
df: any
|
|||
|
--time_column: string = "timestamp"
|
|||
|
--value_column: string = "value"
|
|||
|
--window: string = "1h"
|
|||
|
--operations: list<string> = ["mean", "sum", "count"]
|
|||
|
]: any -> any {
|
|||
|
|
|||
|
let use_polars = init_polars
|
|||
|
|
|||
|
if $use_polars and (check_polars_available) {
|
|||
|
time_series_with_polars $df $time_column $value_column $window $operations
|
|||
|
} else {
|
|||
|
time_series_with_nushell $df $time_column $value_column $window $operations
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
def time_series_with_polars [
|
|||
|
df: any
|
|||
|
time_col: string
|
|||
|
value_col: string
|
|||
|
window: string
|
|||
|
ops: list<string>
|
|||
|
]: nothing -> any {
|
|||
|
# Polars time series operations
|
|||
|
$df | polars group-by $time_col | polars agg [
|
|||
|
(polars col $value_col | polars mean)
|
|||
|
(polars col $value_col | polars sum)
|
|||
|
(polars col $value_col | polars count)
|
|||
|
]
|
|||
|
}
|
|||
|
|
|||
|
def time_series_with_nushell [
|
|||
|
df: any
|
|||
|
time_col: string
|
|||
|
value_col: string
|
|||
|
window: string
|
|||
|
ops: list<string>
|
|||
|
]: nothing -> any {
|
|||
|
# Nushell time series - basic implementation
|
|||
|
$df | group-by {|row|
|
|||
|
# Group by time windows - simplified
|
|||
|
($row | get $time_col) | date format "%Y-%m-%d %H:00:00"
|
|||
|
} | each {|group_data|
|
|||
|
let values = ($group_data | get $value_col)
|
|||
|
{
|
|||
|
time_window: "grouped"
|
|||
|
mean: ($values | math avg)
|
|||
|
sum: ($values | math sum)
|
|||
|
count: ($values | length)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Export DataFrame to various formats
|
|||
|
export def export_dataframe [
|
|||
|
df: any
|
|||
|
output_path: string
|
|||
|
--format: string = "csv" # csv, parquet, json, excel
|
|||
|
]: any -> nothing {
|
|||
|
|
|||
|
let use_polars = init_polars
|
|||
|
|
|||
|
match $format {
|
|||
|
"csv" => {
|
|||
|
if $use_polars and (check_polars_available) {
|
|||
|
$df | polars save $output_path
|
|||
|
} else {
|
|||
|
$df | to csv | save --force $output_path
|
|||
|
}
|
|||
|
}
|
|||
|
"parquet" => {
|
|||
|
if $use_polars and (check_polars_available) {
|
|||
|
$df | polars save $output_path
|
|||
|
} else {
|
|||
|
error make { msg: "Parquet format requires Polars plugin" }
|
|||
|
}
|
|||
|
}
|
|||
|
"json" => {
|
|||
|
$df | to json | save --force $output_path
|
|||
|
}
|
|||
|
_ => {
|
|||
|
error make { msg: $"Unsupported format: ($format)" }
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
print $"✅ DataFrame exported to: ($output_path) (format: ($format))"
|
|||
|
}
|
|||
|
|
|||
|
# Performance comparison: Polars vs Nushell
|
|||
|
export def benchmark_operations [
|
|||
|
data_size: int = 10000
|
|||
|
operations: list<string> = ["filter", "group", "aggregate"]
|
|||
|
]: int -> record {
|
|||
|
|
|||
|
print $"🔬 Benchmarking operations with ($data_size) records..."
|
|||
|
|
|||
|
# Generate test data
|
|||
|
let test_data = (0..$data_size | each {|i|
|
|||
|
{
|
|||
|
id: $i
|
|||
|
value: (random int 1..100)
|
|||
|
category: (random int 1..5 | into string)
|
|||
|
timestamp: (date now)
|
|||
|
}
|
|||
|
})
|
|||
|
|
|||
|
let results = {}
|
|||
|
|
|||
|
# Benchmark with Nushell
|
|||
|
let nushell_start = (date now)
|
|||
|
let nushell_result = (benchmark_nushell_operations $test_data $operations)
|
|||
|
let nushell_duration = ((date now) - $nushell_start)
|
|||
|
|
|||
|
$results | insert nushell {
|
|||
|
duration_ms: ($nushell_duration | into int)
|
|||
|
operations_per_sec: ($data_size / ($nushell_duration | into int) * 1000)
|
|||
|
}
|
|||
|
|
|||
|
# Benchmark with Polars (if available)
|
|||
|
if (check_polars_available) {
|
|||
|
let polars_start = (date now)
|
|||
|
let polars_result = (benchmark_polars_operations $test_data $operations)
|
|||
|
let polars_duration = ((date now) - $polars_start)
|
|||
|
|
|||
|
$results | insert polars {
|
|||
|
duration_ms: ($polars_duration | into int)
|
|||
|
operations_per_sec: ($data_size / ($polars_duration | into int) * 1000)
|
|||
|
}
|
|||
|
|
|||
|
$results | insert performance_gain (
|
|||
|
($results.nushell.duration_ms / $results.polars.duration_ms)
|
|||
|
)
|
|||
|
}
|
|||
|
|
|||
|
$results
|
|||
|
}
|
|||
|
|
|||
|
def benchmark_nushell_operations [data: list, ops: list<string>]: nothing -> any {
|
|||
|
mut result = $data
|
|||
|
|
|||
|
if "filter" in $ops {
|
|||
|
$result = ($result | where value > 50)
|
|||
|
}
|
|||
|
|
|||
|
if "group" in $ops {
|
|||
|
$result = ($result | group-by category)
|
|||
|
}
|
|||
|
|
|||
|
if "aggregate" in $ops {
|
|||
|
$result = ($result | each {|group| {
|
|||
|
category: $group.0
|
|||
|
count: ($group.1 | length)
|
|||
|
avg_value: ($group.1 | get value | math avg)
|
|||
|
}})
|
|||
|
}
|
|||
|
|
|||
|
$result
|
|||
|
}
|
|||
|
|
|||
|
def benchmark_polars_operations [data: list, ops: list<string>]: nothing -> any {
|
|||
|
mut df = ($data | polars into-df)
|
|||
|
|
|||
|
if "filter" in $ops {
|
|||
|
$df = ($df | polars filter (polars col value))
|
|||
|
}
|
|||
|
|
|||
|
if "group" in $ops {
|
|||
|
$df = ($df | polars group-by "category")
|
|||
|
}
|
|||
|
|
|||
|
if "aggregate" in $ops {
|
|||
|
$df = ($df | polars agg [
|
|||
|
(polars col "id" | polars count)
|
|||
|
(polars col "value" | polars mean)
|
|||
|
])
|
|||
|
}
|
|||
|
|
|||
|
$df
|
|||
|
}
|