provisioning/core/nulib/dataframes/polars_integration.nu
2025-09-22 23:11:41 +01:00

513 lines
14 KiB
Plaintext
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env nu
# Polars DataFrame Integration for Provisioning System
# High-performance data processing for logs, metrics, and infrastructure state
use ../lib_provisioning/utils/settings.nu *
# Check if Polars plugin is available
export def check_polars_available []: nothing -> bool {
let plugins = (plugin list)
($plugins | any {|p| $p.name == "polars" or $p.name == "nu_plugin_polars"})
}
# Initialize Polars plugin if available
export def init_polars []: nothing -> bool {
if (check_polars_available) {
# Try to load polars plugin
do {
plugin use polars
true
} | complete | if ($in.exit_code == 0) {
true
} else {
print "⚠️ Warning: Polars plugin found but failed to load"
false
}
} else {
print " Polars plugin not available, using native Nushell operations"
false
}
}
# Create DataFrame from infrastructure data
export def create_infra_dataframe [
data: list
--source: string = "infrastructure"
--timestamp = true
]: list -> any {
let use_polars = init_polars
mut processed_data = $data
if $timestamp {
$processed_data = ($processed_data | each {|row|
$row | upsert timestamp (date now)
})
}
if $use_polars {
# Use Polars DataFrame
$processed_data | polars into-df
} else {
# Return enhanced Nushell table with DataFrame-like operations
$processed_data | enhance_nushell_table
}
}
# Process logs into DataFrame format
export def process_logs_to_dataframe [
log_files: list<string>
--format: string = "auto" # auto, json, csv, syslog, custom
--time_column: string = "timestamp"
--level_column: string = "level"
--message_column: string = "message"
]: list<string> -> any {
let use_polars = init_polars
# Collect and parse all log files
let parsed_logs = ($log_files | each {|file|
if ($file | path exists) {
parse_log_file $file --format $format
} else {
[]
}
} | flatten)
if ($parsed_logs | length) == 0 {
if $use_polars {
[] | polars into-df
} else {
[]
}
} else {
# Standardize log format
let standardized = ($parsed_logs | each {|log|
{
timestamp: (standardize_timestamp ($log | get $time_column))
level: ($log | get $level_column)
message: ($log | get $message_column)
source: ($log.source? | default "unknown")
service: ($log.service? | default "provisioning")
metadata: ($log | reject $time_column $level_column $message_column)
}
})
if $use_polars {
$standardized | polars into-df
} else {
$standardized | enhance_nushell_table
}
}
}
# Parse individual log file based on format
def parse_log_file [
file_path: string
--format: string = "auto"
]: string -> list {
if not ($file_path | path exists) {
return []
}
let content = (open $file_path --raw)
match $format {
"json" => {
# Parse JSON logs
$content | lines | each {|line|
do {
$line | from json
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
raw: true
}
}
}
}
"csv" => {
# Parse CSV logs
do {
$content | from csv
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
[]
}
}
"syslog" => {
# Parse syslog format
$content | lines | each {|line|
parse_syslog_line $line
}
}
"auto" => {
# Auto-detect format
if ($file_path | str ends-with ".json") {
parse_log_file $file_path --format "json"
} else if ($file_path | str ends-with ".csv") {
parse_log_file $file_path --format "csv"
} else {
parse_log_file $file_path --format "syslog"
}
}
_ => {
# Custom format - treat as plain text
$content | lines | each {|line|
{
timestamp: (date now)
level: "info"
message: $line
source: $file_path
}
}
}
}
}
# Parse syslog format line
def parse_syslog_line [line: string]: string -> record {
# Basic syslog parsing - can be enhanced
let parts = ($line | parse --regex '(?P<timestamp>\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P<host>\S+)\s+(?P<service>\S+):\s*(?P<message>.*)')
if ($parts | length) > 0 {
let parsed = $parts.0
{
timestamp: $parsed.timestamp
level: "info" # Default level
message: $parsed.message
host: $parsed.host
service: $parsed.service
}
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
}
}
}
# Standardize timestamp formats
def standardize_timestamp [ts: any]: any -> datetime {
match ($ts | describe) {
"string" => {
do {
$ts | into datetime
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
date now
}
}
"datetime" => $ts,
_ => (date now)
}
}
# Enhance Nushell table with DataFrame-like operations
def enhance_nushell_table []: list -> list {
let data = $in
# Add DataFrame-like methods through custom commands
$data | add_dataframe_methods
}
def add_dataframe_methods []: list -> list {
# This function adds metadata to enable DataFrame-like operations
# In a real implementation, we'd add custom commands to the scope
$in
}
# Query DataFrame with SQL-like syntax
export def query_dataframe [
df: any
query: string
--use_polars = false
]: any -> any {
if $use_polars and (check_polars_available) {
# Use Polars query capabilities
$df | polars query $query
} else {
# Fallback to Nushell operations
query_with_nushell $df $query
}
}
def query_with_nushell [df: any, query: string]: nothing -> any {
# Simple SQL-like query parser for Nushell
# This is a basic implementation - can be significantly enhanced
if ($query | str downcase | str starts-with "select") {
let parts = ($query | str replace --regex "(?i)select\\\\s+" "" | split row " from ")
if ($parts | length) >= 2 {
let columns = ($parts.0 | split row ",")
let conditions = if ($parts | length) > 2 { $parts.2 } else { "" }
mut result = $df
if $columns != ["*"] {
$result = ($result | select ($columns | each {|c| $c | str trim}))
}
if ($conditions | str contains "where") {
# Basic WHERE clause processing
$result = (process_where_clause $result $conditions)
}
$result
} else {
$df
}
} else {
$df
}
}
def process_where_clause [data: any, conditions: string]: nothing -> any {
# Basic WHERE clause implementation
# This would need significant enhancement for production use
$data
}
# Aggregate data with common operations
export def aggregate_dataframe [
df: any
--group_by: list<string> = []
--operations: record = {} # {column: operation}
--time_bucket: string = "1h" # For time-based aggregations
]: any -> any {
let use_polars = init_polars
if $use_polars and (check_polars_available) {
# Use Polars aggregation
aggregate_with_polars $df $group_by $operations $time_bucket
} else {
# Use Nushell aggregation
aggregate_with_nushell $df $group_by $operations $time_bucket
}
}
def aggregate_with_polars [
df: any
group_cols: list<string>
operations: record
time_bucket: string
]: nothing -> any {
# Polars aggregation implementation
if ($group_cols | length) > 0 {
$df | polars group-by $group_cols | polars agg [
(polars col "value" | polars sum)
(polars col "value" | polars mean)
(polars col "value" | polars count)
]
} else {
$df
}
}
def aggregate_with_nushell [
df: any
group_cols: list<string>
operations: record
time_bucket: string
]: nothing -> any {
# Nushell aggregation implementation
if ($group_cols | length) > 0 {
$df | group-by ($group_cols | str join " ")
} else {
$df
}
}
# Time series analysis operations
export def time_series_analysis [
df: any
--time_column: string = "timestamp"
--value_column: string = "value"
--window: string = "1h"
--operations: list<string> = ["mean", "sum", "count"]
]: any -> any {
let use_polars = init_polars
if $use_polars and (check_polars_available) {
time_series_with_polars $df $time_column $value_column $window $operations
} else {
time_series_with_nushell $df $time_column $value_column $window $operations
}
}
def time_series_with_polars [
df: any
time_col: string
value_col: string
window: string
ops: list<string>
]: nothing -> any {
# Polars time series operations
$df | polars group-by $time_col | polars agg [
(polars col $value_col | polars mean)
(polars col $value_col | polars sum)
(polars col $value_col | polars count)
]
}
def time_series_with_nushell [
df: any
time_col: string
value_col: string
window: string
ops: list<string>
]: nothing -> any {
# Nushell time series - basic implementation
$df | group-by {|row|
# Group by time windows - simplified
($row | get $time_col) | date format "%Y-%m-%d %H:00:00"
} | each {|group_data|
let values = ($group_data | get $value_col)
{
time_window: "grouped"
mean: ($values | math avg)
sum: ($values | math sum)
count: ($values | length)
}
}
}
# Export DataFrame to various formats
export def export_dataframe [
df: any
output_path: string
--format: string = "csv" # csv, parquet, json, excel
]: any -> nothing {
let use_polars = init_polars
match $format {
"csv" => {
if $use_polars and (check_polars_available) {
$df | polars save $output_path
} else {
$df | to csv | save --force $output_path
}
}
"parquet" => {
if $use_polars and (check_polars_available) {
$df | polars save $output_path
} else {
error make { msg: "Parquet format requires Polars plugin" }
}
}
"json" => {
$df | to json | save --force $output_path
}
_ => {
error make { msg: $"Unsupported format: ($format)" }
}
}
print $"✅ DataFrame exported to: ($output_path) (format: ($format))"
}
# Performance comparison: Polars vs Nushell
export def benchmark_operations [
data_size: int = 10000
operations: list<string> = ["filter", "group", "aggregate"]
]: int -> record {
print $"🔬 Benchmarking operations with ($data_size) records..."
# Generate test data
let test_data = (0..$data_size | each {|i|
{
id: $i
value: (random int 1..100)
category: (random int 1..5 | into string)
timestamp: (date now)
}
})
let results = {}
# Benchmark with Nushell
let nushell_start = (date now)
let nushell_result = (benchmark_nushell_operations $test_data $operations)
let nushell_duration = ((date now) - $nushell_start)
$results | insert nushell {
duration_ms: ($nushell_duration | into int)
operations_per_sec: ($data_size / ($nushell_duration | into int) * 1000)
}
# Benchmark with Polars (if available)
if (check_polars_available) {
let polars_start = (date now)
let polars_result = (benchmark_polars_operations $test_data $operations)
let polars_duration = ((date now) - $polars_start)
$results | insert polars {
duration_ms: ($polars_duration | into int)
operations_per_sec: ($data_size / ($polars_duration | into int) * 1000)
}
$results | insert performance_gain (
($results.nushell.duration_ms / $results.polars.duration_ms)
)
}
$results
}
def benchmark_nushell_operations [data: list, ops: list<string>]: nothing -> any {
mut result = $data
if "filter" in $ops {
$result = ($result | where value > 50)
}
if "group" in $ops {
$result = ($result | group-by category)
}
if "aggregate" in $ops {
$result = ($result | each {|group| {
category: $group.0
count: ($group.1 | length)
avg_value: ($group.1 | get value | math avg)
}})
}
$result
}
def benchmark_polars_operations [data: list, ops: list<string>]: nothing -> any {
mut df = ($data | polars into-df)
if "filter" in $ops {
$df = ($df | polars filter (polars col value))
}
if "group" in $ops {
$df = ($df | polars group-by "category")
}
if "aggregate" in $ops {
$df = ($df | polars agg [
(polars col "id" | polars count)
(polars col "value" | polars mean)
])
}
$df
}