

รู้จักกับ Python Rich Site Reliability Engineering (SRE) ในปี 2026
ในยุคที่ระบบดิจิทัลกลายเป็นหัวใจหลักของทุกธุรกิจ การทำให้ระบบทำงานได้อย่างต่อเนื่องและมีประสิทธิภาพสูงสุดจึงเป็นสิ่งที่หลีกเลี่ยงไม่ได้ Site Reliability Engineering (SRE) ซึ่งถูกพัฒนาขึ้นครั้งแรกโดย Google ได้กลายเป็นแนวทางปฏิบัติที่สำคัญสำหรับทีมวิศวกรทั่วโลก และเมื่อนำ Python ซึ่งเป็นภาษาโปรแกรมมิ่งที่ทรงพลังและยืดหยุ่นสูง มารวมกับแนวคิด SRE เราจะได้เครื่องมือที่ทรงพลังสำหรับการจัดการระบบขนาดใหญ่
บทความนี้จะพาคุณไปสำรวจโลกของ Python Rich SRE อย่างละเอียด ตั้งแต่พื้นฐานไปจนถึงเทคนิคขั้นสูงที่ใช้ในปี 2026 เราจะพูดถึงเครื่องมือ ไลบรารีสำคัญ วิธีการออกแบบระบบ การตรวจสอบ และการตอบสนองต่อเหตุการณ์แบบอัตโนมัติ โดยเน้นการใช้งานจริงที่สามารถนำไปปรับใช้ได้ทันที
1. หลักการพื้นฐานของ SRE และบทบาทของ Python
1.1 SRE คืออะไร? ทำไมถึงสำคัญ?
SRE คือแนวทางที่นำหลักการทางวิศวกรรมซอฟต์แวร์มาประยุกต์ใช้กับการจัดการโครงสร้างพื้นฐานและระบบปฏิบัติการ เป้าหมายหลักคือการสร้างระบบที่:
- มีความน่าเชื่อถือสูง (Reliability) – ระบบต้องทำงานได้ตามที่คาดหวัง
- สามารถปรับขนาดได้ (Scalability) – รองรับการเติบโตของปริมาณงาน
- มีประสิทธิภาพ (Efficiency) – ใช้ทรัพยากรอย่างคุ้มค่า
- สามารถกู้คืนได้เร็ว (Recovery) – เมื่อเกิดปัญหา ระบบต้องกลับมาทำงานได้เร็ว
1.2 ทำไมต้อง Python สำหรับ SRE?
Python กลายเป็นภาษาหลักของทีม SRE ด้วยเหตุผลหลายประการ:
- อ่านง่ายและเรียนรู้เร็ว – ลดระยะเวลาในการพัฒนาและบำรุงรักษา
- ระบบนิเวศที่สมบูรณ์ – มีไลบรารีสำหรับทุกความต้องการ ตั้งแต่การสื่อสารกับ API ไปจนถึงการประมวลผลข้อมูลขนาดใหญ่
- รองรับการทำงานแบบอัตโนมัติ – สคริปต์ Python สามารถทำงานซ้ำๆ ได้อย่างมีประสิทธิภาพ
- การทำงานร่วมกับคลาวด์ – มี SDK สำหรับผู้ให้บริการคลาวด์รายใหญ่ทุกราย
- การวิเคราะห์ข้อมูลและ Machine Learning – สามารถใช้สร้างระบบตรวจจับความผิดปกติอัจฉริยะได้
2. เครื่องมือและไลบรารีสำคัญสำหรับ Python SRE ในปี 2026
2.1 ไลบรารีสำหรับการตรวจสอบและแจ้งเตือน
| ไลบรารี | ฟังก์ชันหลัก | กรณีการใช้งาน |
|---|---|---|
prometheus-client |
สร้างและจัดการ metrics สำหรับ Prometheus | ตรวจสอบ CPU, Memory, Request Rate |
statsd |
ส่ง metrics ไปยัง Graphite/StatsD | ติดตาม latency ของ API |
opentelemetry-sdk |
การติดตามแบบกระจาย (Distributed Tracing) | วิเคราะห์ bottleneck ใน microservices |
slack-sdk |
ส่งการแจ้งเตือนไปยัง Slack | แจ้งเตือนเมื่อเกิดเหตุการณ์สำคัญ |
2.2 ไลบรารีสำหรับการทำงานอัตโนมัติ
การทำงานอัตโนมัติเป็นหัวใจของ SRE ไลบรารีเหล่านี้จะช่วยให้คุณสร้างระบบที่จัดการตัวเองได้:
- Fabric / Invoke – สำหรับการรันคำสั่ง SSH และการจัดการเซิร์ฟเวอร์
- Ansible Runner – การทำงานร่วมกับ Ansible สำหรับการจัดการ configuration
- Kubernetes Client (k8s-client) – จัดการ Kubernetes cluster โดยตรง
- Boto3 (AWS SDK) – จัดการทรัพยากร AWS ทั้งหมด
- Celery – สำหรับงานแบบ asynchronous และ scheduled tasks
3. การสร้างระบบตรวจสอบอัจฉริยะด้วย Python
3.1 การออกแบบระบบตรวจสอบที่มีประสิทธิภาพ
ระบบตรวจสอบที่ดีควรมีองค์ประกอบ 4 ส่วนหลัก:
- การเก็บข้อมูล (Data Collection) – รวบรวม metrics, logs, traces
- การประมวลผล (Processing) – วิเคราะห์และกรองข้อมูล
- การตรวจจับ (Detection) – ระบุความผิดปกติ
- การตอบสนอง (Response) – ดำเนินการแก้ไขอัตโนมัติ
3.2 ตัวอย่างการสร้าง custom exporter สำหรับ Prometheus
ต่อไปนี้คือตัวอย่างการสร้าง exporter ที่ตรวจสอบสุขภาพของ API และส่ง metrics ไปยัง Prometheus:
import time
import requests
from prometheus_client import start_http_server, Gauge, Counter, Histogram
# สร้าง metrics
API_HEALTH = Gauge('api_health_status', 'API health status (1=healthy, 0=unhealthy)', ['endpoint'])
API_LATENCY = Histogram('api_request_latency_seconds', 'API request latency in seconds', ['endpoint'])
ERROR_COUNT = Counter('api_error_total', 'Total number of API errors', ['endpoint', 'error_type'])
ENDPOINTS = [
'https://api.example.com/health',
'https://api.example.com/status',
'https://api.example.com/ready'
]
def check_endpoint(endpoint):
"""ตรวจสอบ endpoint และบันทึก metrics"""
try:
start_time = time.time()
response = requests.get(endpoint, timeout=5)
latency = time.time() - start_time
# บันทึก latency
API_LATENCY.labels(endpoint=endpoint).observe(latency)
if response.status_code == 200:
API_HEALTH.labels(endpoint=endpoint).set(1)
return True
else:
API_HEALTH.labels(endpoint=endpoint).set(0)
ERROR_COUNT.labels(endpoint=endpoint, error_type='http_error').inc()
return False
except requests.exceptions.Timeout:
API_HEALTH.labels(endpoint=endpoint).set(0)
ERROR_COUNT.labels(endpoint=endpoint, error_type='timeout').inc()
return False
except requests.exceptions.ConnectionError:
API_HEALTH.labels(endpoint=endpoint).set(0)
ERROR_COUNT.labels(endpoint=endpoint, error_type='connection_error').inc()
return False
def main():
# เริ่ม HTTP server สำหรับ Prometheus
start_http_server(8000)
print("Prometheus exporter started on port 8000")
while True:
for endpoint in ENDPOINTS:
check_endpoint(endpoint)
time.sleep(30) # ตรวจสอบทุก 30 วินาที
if __name__ == '__main__':
main()
3.3 การตรวจจับความผิดปกติด้วย Machine Learning
ในปี 2026 การใช้ ML เพื่อตรวจจับความผิดปกติเป็นมาตรฐาน ตัวอย่างการใช้ Isolation Forest สำหรับตรวจจับ anomaly:
import numpy as np
from sklearn.ensemble import IsolationForest
import pandas as pd
from datetime import datetime, timedelta
class AnomalyDetector:
def __init__(self, contamination=0.1):
self.model = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100
)
self.trained = False
def prepare_features(self, metrics_data):
"""แปลงข้อมูล metrics เป็น feature vector"""
df = pd.DataFrame(metrics_data)
# สร้าง features จาก time series
features = []
for col in ['cpu_usage', 'memory_usage', 'request_rate', 'error_rate']:
if col in df.columns:
# ค่าเฉลี่ย, std, min, max ในช่วงเวลาที่กำหนด
features.extend([
df[col].mean(),
df[col].std(),
df[col].min(),
df[col].max(),
df[col].diff().mean() # อัตราการเปลี่ยนแปลง
])
return np.array(features).reshape(1, -1)
def train(self, historical_data):
"""ฝึกโมเดลด้วยข้อมูลประวัติ"""
X_train = []
for data_point in historical_data:
features = self.prepare_features([data_point])
X_train.append(features[0])
self.model.fit(X_train)
self.trained = True
print(f"Model trained with {len(X_train)} samples")
def predict(self, current_metrics):
"""ทำนายว่าข้อมูลปัจจุบันผิดปกติหรือไม่"""
if not self.trained:
return False, 0.0
features = self.prepare_features(current_metrics)
prediction = self.model.predict(features)[0]
score = self.model.score_samples(features)[0]
# prediction = -1 หมายถึง anomaly
is_anomaly = prediction == -1
return is_anomaly, score
# ตัวอย่างการใช้งาน
detector = AnomalyDetector(contamination=0.05)
# ฝึกโมเดลด้วยข้อมูลประวัติ
historical_data = [
{'cpu_usage': 45.2, 'memory_usage': 60.1, 'request_rate': 1000, 'error_rate': 0.01},
{'cpu_usage': 48.7, 'memory_usage': 62.3, 'request_rate': 1050, 'error_rate': 0.02},
# ... ข้อมูลเพิ่มเติม
]
detector.train(historical_data)
# ตรวจสอบข้อมูลปัจจุบัน
current_metrics = [
{'cpu_usage': 95.8, 'memory_usage': 90.2, 'request_rate': 3000, 'error_rate': 0.15}
]
is_anomaly, score = detector.predict(current_metrics)
print(f"Anomaly detected: {is_anomaly}, Score: {score:.4f}")
4. การจัดการเหตุการณ์และการตอบสนองอัตโนมัติ
4.1 การออกแบบระบบ Incident Response
ระบบตอบสนองต่อเหตุการณ์ที่ดีควรมีขั้นตอนดังนี้:
- Detection – ตรวจจับปัญหาโดยอัตโนมัติ
- Notification – แจ้งทีมที่เกี่ยวข้อง
- Diagnosis – วิเคราะห์สาเหตุ
- Mitigation – ดำเนินการแก้ไขเบื้องต้น
- Resolution – แก้ไขปัญหาแบบถาวร
- Post-mortem – วิเคราะห์และปรับปรุง
4.2 ตัวอย่างระบบ Auto-remediation
ต่อไปนี้คือระบบที่สามารถตรวจสอบและแก้ไขปัญหาเบื้องต้นโดยอัตโนมัติ:
import asyncio
import aiohttp
from kubernetes import client, config
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AutoRemediationSystem:
def __init__(self):
# โหลด Kubernetes config
config.load_incluster_config()
self.k8s_apps = client.AppsV1Api()
self.k8s_core = client.CoreV1Api()
# กำหนด remediation actions
self.remediation_actions = {
'high_cpu': self.scale_up_deployment,
'memory_leak': self.restart_pod,
'unhealthy_node': self.cordon_node,
'high_error_rate': self.rollback_deployment
}
async def monitor_and_remediate(self):
"""ตรวจสอบและดำเนินการแก้ไขอย่างต่อเนื่อง"""
while True:
try:
# ตรวจสอบ metrics จาก Prometheus
metrics = await self.get_current_metrics()
for issue in metrics['alerts']:
if issue['type'] in self.remediation_actions:
logger.info(f"Detected issue: {issue['type']} on {issue['resource']}")
# ดำเนินการแก้ไข
action = self.remediation_actions[issue['type']]
result = await action(issue['resource'])
if result['success']:
logger.info(f"Successfully remediated: {issue['type']}")
await self.send_notification(
f"Auto-remediation succeeded for {issue['type']}",
'success'
)
else:
logger.error(f"Failed to remediate: {issue['type']}")
await self.send_notification(
f"Auto-remediation failed for {issue['type']}: {result['error']}",
'critical'
)
await asyncio.sleep(60) # ตรวจสอบทุก 1 นาที
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(30)
async def get_current_metrics(self):
"""ดึง metrics ล่าสุดจาก Prometheus"""
# ตัวอย่างข้อมูลจำลอง
return {
'alerts': [
{'type': 'high_cpu', 'resource': 'default/api-server', 'severity': 'critical'},
{'type': 'memory_leak', 'resource': 'default/cache-service', 'severity': 'warning'}
]
}
async def scale_up_deployment(self, deployment_name):
"""เพิ่มจำนวน replicas ของ deployment"""
try:
namespace, name = deployment_name.split('/')
deployment = self.k8s_apps.read_namespaced_deployment(name, namespace)
current_replicas = deployment.spec.replicas
new_replicas = min(current_replicas * 2, 10) # ไม่เกิน 10 replicas
deployment.spec.replicas = new_replicas
self.k8s_apps.patch_namespaced_deployment(name, namespace, deployment)
return {'success': True, 'action': f'Scaled from {current_replicas} to {new_replicas}'}
except Exception as e:
return {'success': False, 'error': str(e)}
async def restart_pod(self, pod_name):
"""รีสตาร์ท pod ที่มีปัญหา"""
try:
namespace, name = pod_name.split('/')
# ลบ pod เพื่อให้ Kubernetes สร้างใหม่
self.k8s_core.delete_namespaced_pod(name, namespace)
return {'success': True, 'action': f'Pod {name} restarted'}
except Exception as e:
return {'success': False, 'error': str(e)}
async def send_notification(self, message, severity):
"""ส่งการแจ้งเตือนไปยัง Slack"""
async with aiohttp.ClientSession() as session:
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
payload = {
"text": f"[{severity.upper()}] {message}",
"attachments": [{
"color": "danger" if severity == 'critical' else "warning",
"fields": [
{"title": "Timestamp", "value": datetime.now().isoformat(), "short": True},
{"title": "Severity", "value": severity, "short": True}
]
}]
}
await session.post(webhook_url, json=payload)
# เริ่มระบบ
async def main():
system = AutoRemediationSystem()
await system.monitor_and_remediate()
if __name__ == '__main__':
asyncio.run(main())
5. การจัดการ Configuration และ Secrets
5.1 แนวทางปฏิบัติที่ดีสำหรับ Configuration Management
การจัดการ configuration เป็นสิ่งสำคัญที่มักถูกมองข้าม ต่อไปนี้คือแนวทางปฏิบัติที่ดี:
- ใช้ Environment Variables – สำหรับค่าที่เปลี่ยนแปลงตามสภาพแวดล้อม
- แยก Configuration ออกจาก Code – ใช้ไฟล์ .env หรือ config files
- ใช้ Vault หรือ Cloud Secret Manager – สำหรับ secrets ที่สำคัญ
- Version Control Configuration – เก็บไว้ใน Git แต่ไม่รวม secrets
- Validate Configuration – ตรวจสอบความถูกต้องก่อนใช้งาน
5.2 การเปรียบเทียบเครื่องมือจัดการ Secrets
| เครื่องมือ | ข้อดี | ข้อเสีย | กรณีการใช้งาน |
|---|---|---|---|
| HashiCorp Vault | Dynamic secrets, Audit logging, Multi-cloud | ซับซ้อนในการตั้งค่า, ต้องการทรัพยากรสูง | องค์กรขนาดใหญ่, ต้องการ compliance |
| AWS Secrets Manager | ใช้งานง่าย, บูรณาการกับ AWS services | ล็อคอินกับ AWS, ค่าใช้จ่ายตามการเรียกใช้ | ทีมที่ใช้ AWS เป็นหลัก |
| Google Secret Manager | ราคาถูก, รองรับ multi-region | ฟีเจอร์น้อยกว่า Vault | ทีมที่ใช้ GCP |
| Kubernetes Secrets | ฟรี, ใช้งานกับ K8s ได้ทันที | ความปลอดภัยต่ำ (base64 เท่านั้น) | การพัฒนาและทดสอบ |
5.3 ตัวอย่างการจัดการ Configuration ด้วย Python
ต่อไปนี้คือคลาสสำหรับจัดการ configuration ที่ปลอดภัย:
import os
import json
from typing import Any, Dict, Optional
from pathlib import Path
import hvac # HashiCorp Vault client
class ConfigurationManager:
def __init__(self, env: str = None):
self.env = env or os.getenv('APP_ENV', 'development')
self.config = {}
self.vault_client = None
# โหลด configuration
self.load_config()
def load_config(self):
"""โหลด configuration จากหลายแหล่ง"""
# 1. ค่าเริ่มต้นจากไฟล์
default_config = self._load_file('config/default.yaml')
# 2. ค่าเฉพาะ environment
env_config = self._load_file(f'config/{self.env}.yaml')
# 3. Environment variables
env_vars = self._load_env_vars()
# 4. Secrets จาก Vault
vault_secrets = self._load_vault_secrets()
# รวม configuration (ลำดับความสำคัญจากน้อยไปมาก)
self.config = {
**default_config,
**env_config,
**env_vars,
**vault_secrets
}
# Validate configuration
self._validate_config()
def _load_file(self, path: str) -> Dict[str, Any]:
"""โหลดไฟล์ configuration"""
config_path = Path(path)
if not config_path.exists():
return {}
if path.endswith('.yaml') or path.endswith('.yml'):
import yaml
with open(config_path) as f:
return yaml.safe_load(f)
elif path.endswith('.json'):
with open(config_path) as f:
return json.load(f)
return {}
def _load_env_vars(self) -> Dict[str, Any]:
"""โหลดจาก environment variables"""
env_config = {}
prefix = 'APP_'
for key, value in os.environ.items():
if key.startswith(prefix):
# แปลง APP_DATABASE_HOST -> database_host
config_key = key[len(prefix):].lower().replace('_', '.')
env_config[config_key] = self._parse_value(value)
return env_config
def _parse_value(self, value: str) -> Any:
"""แปลง string เป็น type ที่เหมาะสม"""
if value.lower() in ('true', 'yes', '1'):
return True
elif value.lower() in ('false', 'no', '0'):
return False
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
def _load_vault_secrets(self) -> Dict[str, Any]:
"""โหลด secrets จาก Vault"""
vault_addr = os.getenv('VAULT_ADDR')
vault_token = os.getenv('VAULT_TOKEN')
if not vault_addr or not vault_token:
return {}
try:
self.vault_client = hvac.Client(url=vault_addr, token=vault_token)
if not self.vault_client.is_authenticated():
logger.warning("Vault authentication failed")
return {}
# อ่าน secrets ตาม environment
secret_path = f'secret/{self.env}/app'
secrets = self.vault_client.secrets.kv.v2.read_secret_version(
path=secret_path
)
return secrets.get('data', {}).get('data', {})
except Exception as e:
logger.error(f"Failed to load Vault secrets: {e}")
return {}
def _validate_config(self):
"""ตรวจสอบความถูกต้องของ configuration"""
required_keys = ['database.host', 'database.port', 'api.key']
for key in required_keys:
if not self.get(key):
raise ValueError(f"Missing required configuration: {key}")
def get(self, key: str, default: Any = None) -> Any:
"""ดึงค่า configuration โดยใช้ dot notation"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
if value is None:
return default
else:
return default
return value
def set(self, key: str, value: Any):
"""ตั้งค่าชั่วคราว (runtime only)"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
# ตัวอย่างการใช้งาน
config = ConfigurationManager(env='production')
# ดึงค่า
db_host = config.get('database.host', 'localhost')
db_port = config.get('database.port', 5432)
api_key = config.get('api.key')
print(f"Database: {db_host}:{db_port}")
print(f"API Key: {'***' if api_key else 'Not set'}")
6. การจัดการ Performance และ Cost Optimization
6.1 การวิเคราะห์ Performance ด้วย Python
การวิเคราะห์ performance เป็นสิ่งสำคัญสำหรับ SRE เพื่อระบุ bottleneck และวางแผนการปรับขนาด:
- Profiling – ใช้ cProfile หรือ py-spy เพื่อวิเคราะห์การใช้ CPU
- Memory Profiling – ใช้ memory_profiler หรือ tracemalloc
- Latency Analysis – ใช้ OpenTelemetry สำหรับ distributed tracing
- Database Query Analysis – วิเคราะห์ slow queries
6.2 การเปรียบเทียบกลยุทธ์การปรับขนาด
| กลยุทธ์ | ข้อดี | ข้อเสีย | ค่าใช้จ่าย |
|---|---|---|---|
| Horizontal Scaling | ยืดหยุ่นสูง, ทนทานต่อความเสียหาย | ซับซ้อนในการจัดการ state | ปานกลาง |
| Vertical Scaling | ง่ายต่อการจัดการ, ไม่ต้องเปลี่ยน code | มีขีดจำกัด, downtime ระหว่างการปรับ | สูง (เครื่องใหญ่มีราคาแพง) |
| Auto-scaling | ปรับตามความต้องการจริง, ประหยัดค่าใช้จ่าย | ต้องตั้งค่าให้ดี, อาจเกิด thrashing | ประหยัดที่สุดในระยะยาว |
| Spot/Preemptible Instances | ประหยัดสูงสุด (60-90%) | อาจถูกยกเลิกได้ทุกเมื่อ | ต่ำมาก |
7. การทดสอบความน่าเชื่อถือ (Reliability Testing)
7.1 ประเภทของการทดสอบที่ SRE ควรทำ
การทดสอบความน่าเชื่อถือเป็นส่วนสำคัญของ SRE ที่ช่วยให้มั่นใจว่าระบบจะทำงานได้ดีภายใต้สภาวะต่างๆ:
- Load Testing – ทดสอบว่าระบบรองรับปริมาณงานที่คาดหวังได้หรือไม่
- Stress Testing – ทดสอบขีดจำกัดของระบบ
- Chaos Engineering – ทดสอบความทนทานโดยการสร้างความเสียหายแบบสุ่ม
- Resilience Testing – ทดสอบการกู้คืนจากความล้มเหลว
- Disaster Recovery Testing – ทดสอบแผนการกู้คืนระบบ
7.2 ตัวอย่าง Chaos Engineering Experiment
ต่อไปนี้คือตัวอย่างการทดสอบ Chaos Engineering ด้วย Python ที่จำลองการหยุดทำงานของ service:
import asyncio
import aiohttp
import random
import time
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChaosExperiment:
def __init__(self, target_services):
self.target_services = target_services
self.experiment_running = False
self.metrics = {
'success_count': 0,
'failure_count': 0,
'total_latency': 0.0
}
async def inject_failure(self, service_name, failure_type):
"""จำลองความเสียหายใน service"""
if failure_type == 'kill':
logger.warning(f"Killing service: {service_name}")
# จำลองการหยุด service
await asyncio.sleep(random.uniform(1, 5))
elif failure_type == 'latency':
latency = random.uniform(1, 10)
logger.warning(f"Adding {latency:.2f}s latency to: {service_name}")
await asyncio.sleep(latency)
elif failure_type == 'error':
logger.warning(f"Returning errors from: {service_name}")
# จำลองการส่ง error response
return {'status': 500, 'error': 'Internal Server Error'}
return {'status': 200, 'message': 'OK'}
async def monitor_health(self, service_url):
"""ตรวจสอบสุขภาพของ service"""
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(service_url, timeout=5) as response:
latency = time.time() - start_time
if response.status == 200:
self.metrics['success_count'] += 1
else:
self.metrics['failure_count'] += 1
self.metrics['total_latency'] += latency
return response.status == 200
except Exception as e:
self.metrics['failure_count'] += 1
logger.error(f"Health check failed for {service_url}: {e}")
return False
async def run_experiment(self, duration=300):
"""รัน Chaos experiment"""
logger.info(f"Starting Chaos experiment for {duration} seconds")
self.experiment_running = True
end_time = time.time() + duration
while time.time() < end_time:
# เลือก service และ failure type แบบสุ่ม
service = random.choice(self.target_services)
failure_type = random.choice(['kill', 'latency', 'error'])
logger.info(f"Injecting {failure_type} into {service['name']}")
# สร้างความเสียหาย
await self.inject_failure(service['name'], failure_type)
# ตรวจสอบผลกระทบ
for _ in range(5): # ตรวจสอบ 5 ครั้งหลังการ inject
is_healthy = await self.monitor_health(service['url'])
if not is_healthy:
logger.warning(f"Service {service['name']} is unhealthy after {failure_type}")
await asyncio.sleep(2)
# รอสักครู่ก่อนการทดสอบครั้งถัดไป
await asyncio.sleep(random.uniform(10, 30))
self.experiment_running = False
self.generate_report()
def generate_report(self):
"""สร้างรายงานผลการทดสอบ"""
total_requests = self.metrics['success_count'] + self.metrics['failure_count']
success_rate = (self.metrics['success_count'] / total_requests * 100) if total_requests > 0 else 0
avg_latency = self.metrics['total_latency'] / total_requests if total_requests > 0 else 0
report = f"""
Chaos Experiment Report
=======================
Date: {datetime.now().isoformat()}
Duration: 300 seconds
Results:
- Total Requests: {total_requests}
- Success Rate: {success_rate:.2f}%
- Average Latency: {avg_latency:.3f}s
- Failures: {self.metrics['failure_count']}
Recommendations:
- {'Improve service resilience' if success_rate < 99.9 else 'System is resilient enough'}
- {'Consider adding retry logic' if avg_latency > 2 else 'Latency is acceptable'}
"""
logger.info(report)
return report
# ตัวอย่างการใช้งาน
async def main():
target_services = [
{'name': 'api-gateway', 'url': 'http://api-gateway:8080/health'},
{'name': 'user-service', 'url': 'http://user-service:8081/health'},
{'name': 'payment-service', 'url': 'http://payment-service:8082/health'}
]
experiment = ChaosExperiment(target_services)
await experiment.run_experiment(duration=180) # รัน 3 นาที
if __name__ == '__main__':
asyncio.run(main())
8. แนวทางปฏิบัติที่ดีที่สุด (Best Practices) สำหรับ Python SRE
8.1 การออกแบบระบบให้มีความยืดหยุ่น
- ใช้ Design Patterns – เช่น Circuit Breaker, Retry with Backoff, Bulkhead
- ทำ Idempotency – การดำเนินการเดียวกันควรให้ผลลัพธ์เดียวกันเสมอ
- ใช้ Asynchronous Processing – สำหรับงานที่ไม่ต้องการผลลัพธ์ทันที
- Caching ที่เหมาะสม – ใช้ Redis หรือ Memcached สำหรับข้อมูลที่อ่านบ่อย
8.2 การตรวจสอบและการแจ้งเตือน
- ตั้งค่า Alert Thresholds ที่เหมาะสม – ไม่ไวเกินไปและไม่ช้าเกินไป
- ใช้ Multiple Alert Channels – Slack, Email, PagerDuty, SMS
- สร้าง Runbooks – สำหรับการแก้ไขปัญหาที่พบบ่อย
- ทดสอบ Alert System – อย่างสม่ำเสมอ
8.3 การจัดการ Logging
- ใช้ Structured Logging – JSON format เพื่อให้ง่ายต่อการวิเคราะห์
- รวม Correlation ID – เพื่อติดตาม request ข้าม services
- จัดการ Log Levels – DEBUG, INFO, WARNING, ERROR, CRITICAL