In the ever-evolving landscape of DevOps, effectively monitoring and troubleshooting Kubernetes clusters remains a critical challenge. While there are numerous monitoring solutions available, sometimes you need tools tailored to your specific needs. In this article, we'll explore two custom Python scripts we've developed to enhance our Kubernetes monitoring capabilities: a Log Analyzer and a Resource Checker.
The Challenge
Managing a Kubernetes cluster, especially in production environments, can be complex. Two common pain points are:
- Quickly identifying and analyzing errors across multiple pods
- Getting a clear picture of resource utilization at both the pod and node level
Our application namespace, which handles critical batch jobs, was experiencing intermittent issues. We needed a way to rapidly diagnose problems and ensure our resources were allocated efficiently.
The Solution: Custom Monitoring Scripts
To address these challenges, we developed two Python scripts:
k8s_log_analyzer.py
: For analyzing ERROR logs across all pods in a namespacek8s_resource_checker.py
: For checking resource utilization of pods and nodes
Let's dive into each of these tools and see how they can help streamline your Kubernetes monitoring.
Log Analyzer (k8s_log_analyzer.py
)
This script focuses on ERROR-level logs, providing a quick way to identify issues across your entire namespace.
Key features:
- Retrieves ERROR logs from all pods in the specified namespace
- Provides a count of errors per pod
- Shows an error timeline, helping identify error patterns over time
- Displays detailed error messages for deeper analysis
Here's the full script:
import sys import subprocess import re from collections import defaultdict from datetime import datetime def get_pods(namespace): try: result = subprocess.run(['kubectl', 'get', 'pods', '-n', namespace, '-o', 'jsonpath={.items[*].metadata.name}'], capture_output=True, text=True, check=True) return result.stdout.split() except subprocess.CalledProcessError as e: print(f"Error getting pods: {e}") sys.exit(1) def get_pod_error_logs(pod_name, namespace): try: result = subprocess.run(['kubectl', 'logs', '-n', namespace, pod_name, '--tail=1000000'], capture_output=True, text=True, check=True) return [line for line in result.stdout.splitlines() if 'ERROR' in line] except subprocess.CalledProcessError as e: print(f"Error getting logs for pod {pod_name}: {e}") return [] def parse_log_line(line): # Adjust this parser based on your actual log format match = re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z) ERROR (.+)', line) if match: timestamp, message = match.groups() return { 'timestamp': timestamp, 'message': message.strip() } return None def analyze_pod_logs(pod_name, namespace): errors = defaultdict(int) error_timeline = defaultdict(int) error_messages = defaultdict(list) logs = get_pod_error_logs(pod_name, namespace) for line in logs: entry = parse_log_line(line) if not entry: continue errors[pod_name] += 1 hour = datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%Y-%m-%d %H:00:00') error_timeline[hour] += 1 error_messages[pod_name].append(entry['message']) return errors, error_timeline, error_messages def analyze_logs(namespace): total_errors = defaultdict(int) total_error_timeline = defaultdict(int) total_error_messages = defaultdict(list) pods = get_pods(namespace) for pod in pods: errors, error_timeline, error_messages = analyze_pod_logs(pod, namespace) for pod, count in errors.items(): total_errors[pod] += count for timestamp, count in error_timeline.items(): total_error_timeline[timestamp] += count for pod, messages in error_messages.items(): total_error_messages[pod].extend(messages) return total_errors, total_error_timeline, total_error_messages def main(namespace): print(f"Analyzing ERROR logs for namespace: {namespace}") errors, error_timeline, error_messages = analyze_logs(namespace) print(f"\nTotal errors: {sum(errors.values())}") print("\nErrors by pod:") for pod, count in errors.items(): print(f" {pod}: {count}") print("\nErrors over time:") for timestamp, count in sorted(error_timeline.items()): print(f" {timestamp}: {count}") print("\nError messages by pod:") for pod, messages in error_messages.items(): print(f"\n {pod}:") for i, message in enumerate(messages, 1): print(f" {i}. {message}") print(f" {'=' * 100}") # Separator for readability if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python k8s_log_analyzer.py <namespace>") sys.exit(1) namespace = sys.argv[1] main(namespace)
Usage:
python k8s_log_analyzer.py <namespace>
Output example:
Analyzing ERROR logs for namespace: app-namespace Total errors: 5 Errors by pod: app-backend-57659b4b78-lzg6z: 5 Errors over time: 2024-09-02 11:00:00: 2 2024-09-02 12:00:00: 1 2024-09-04 08:00:00: 1 2024-09-05 10:00:00: 1 Error messages by pod: app-backend-57659b4b78-lzg6z: 1. [atchJobThread-2] o.s.batch.core.step.AbstractStep : Encountered an error executing step stepName in job jobName ==================================================================================================== ...
This output quickly shows us that all errors are coming from a single pod and are related to batch job execution, giving us a clear starting point for investigation.
Resource Checker (k8s_resource_checker.py
)
This script provides a comprehensive view of resource allocation and usage across your namespace and cluster.
Key features:
- Displays CPU and memory requests, limits, and current usage for each pod
- Shows node-level resource capacity and allocatable resources
- Helps identify potential resource bottlenecks or misconfigurations
Here's the full script:
import sys import subprocess import json from tabulate import tabulate def run_kubectl_command(command): try: result = subprocess.run(command, capture_output=True, text=True, check=True) return result.stdout except subprocess.CalledProcessError as e: print(f"Error executing kubectl command: {e}") sys.exit(1) def get_pod_resources(namespace): command = [ 'kubectl', 'get', 'pods', '-n', namespace, '-o', 'custom-columns=NAME:.metadata.name,CPU_REQUESTS:.spec.containers[*].resources.requests.cpu,CPU_LIMITS:.spec.containers[*].resources.limits.cpu,MEMORY_REQUESTS:.spec.containers[*].resources.requests.memory,MEMORY_LIMITS:.spec.containers[*].resources.limits.memory' ] return run_kubectl_command(command) def get_pod_usage(namespace): command = [ 'kubectl', 'top', 'pods', '-n', namespace, '--no-headers' ] return run_kubectl_command(command) def get_node_resources(): command = [ 'kubectl', 'get', 'nodes', '-o', 'json' ] output = run_kubectl_command(command) return json.loads(output) def parse_resource_output(output): lines = output.strip().split('\n')[1:] # Skip the header return [line.split() for line in lines] def parse_usage_output(output): lines = output.strip().split('\n') return [line.split() for line in lines] def merge_resource_and_usage(resources, usage): merged = {} for pod in resources: name = pod[0] merged[name] = pod[1:] for pod in usage: name = pod[0] if name in merged: merged[name].extend(pod[1:]) else: merged[name] = ['N/A', 'N/A', 'N/A', 'N/A'] + pod[1:] return [['Name', 'CPU Requests', 'CPU Limits', 'Memory Requests', 'Memory Limits', 'CPU Usage', 'Memory Usage']] + \ [[name] + data for name, data in merged.items()] def format_memory(memory_str): if memory_str.endswith('Ki'): return f"{int(memory_str[:-2]) / 1024:.2f}Mi" elif memory_str.endswith('Mi'): return memory_str elif memory_str.endswith('Gi'): return f"{float(memory_str[:-2]) * 1024:.2f}Mi" else: return memory_str def analyze_node_resources(nodes_data): node_resources = [] for node in nodes_data['items']: name = node['metadata']['name'] capacity = node['status']['capacity'] allocatable = node['status']['allocatable'] cpu_capacity = capacity['cpu'] memory_capacity = format_memory(capacity['memory']) cpu_allocatable = allocatable['cpu'] memory_allocatable = format_memory(allocatable['memory']) node_resources.append([name, cpu_capacity, cpu_allocatable, memory_capacity, memory_allocatable]) return [['Node Name', 'CPU Capacity', 'CPU Allocatable', 'Memory Capacity', 'Memory Allocatable']] + node_resources def main(namespace): print(f"Analyzing resources for namespace: {namespace}") print("\nPod Resources and Usage:") resources_output = get_pod_resources(namespace) usage_output = get_pod_usage(namespace) resources = parse_resource_output(resources_output) usage = parse_usage_output(usage_output) merged_data = merge_resource_and_usage(resources, usage) print(tabulate(merged_data, headers="firstrow", tablefmt="grid")) print("\nNode Resources:") nodes_data = get_node_resources() node_resources = analyze_node_resources(nodes_data) print(tabulate(node_resources, headers="firstrow", tablefmt="grid")) if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python k8s_resource_checker.py <namespace>") sys.exit(1) namespace = sys.argv[1] main(namespace)
Usage:
python k8s_resource_checker.py <namespace>
Output example:
Analyzing resources for namespace: app-namespace Pod Resources and Usage: +---------------------------+---------------+------------+------------------+--------------+------------+--------------+ | Name | CPU Requests | CPU Limits | Memory Requests | Memory Limits| CPU Usage | Memory Usage | +===========================+===============+============+==================+==============+============+==============+ | app-backend-57659b4b78-lzg6z| 500m | 1 | 512Mi | 1Gi | 150m | 600Mi | +---------------------------+---------------+------------+------------------+--------------+------------+--------------+ ... Node Resources: +---------------+---------------+------------------+------------------+----------------------+ | Node Name | CPU Capacity | CPU Allocatable | Memory Capacity | Memory Allocatable | +===============+===============+==================+==================+======================+ | node-1 | 4 | 3.8 | 16Gi | 15.5Gi | +---------------+---------------+------------------+------------------+----------------------+ ...
This output allows us to quickly see if any pods are approaching their resource limits or if our nodes are under resource pressure.
Benefits and Use Cases
These tools offer several advantages:
- Quick Diagnostics: Rapidly identify problematic pods and common error patterns.
- Resource Optimization: Easily spot over- or under-provisioned pods.
- Customization: Extend the scripts to look for specific issues relevant to your applications.
- Lightweight: No need to install heavy monitoring solutions for basic diagnostics.
- Automation Friendly: Easily incorporate into CI/CD pipelines or chatops tools.
We've found these scripts particularly useful for:
- Troubleshooting batch job failures
- Capacity planning and resource allocation
- Quick health checks during deployments
Conclusion
While these custom scripts don't replace comprehensive monitoring solutions, they provide a quick and effective way to gain insights into your Kubernetes clusters. By focusing on common pain points – error analysis and resource utilization – they've become valuable tools in our DevOps toolkit.
Remember, the key to effective DevOps is not just having the right tools, but knowing when and how to use them. These scripts serve as a starting point; feel free to adapt and extend them to meet your specific needs.
Happy monitoring!