

แนะนำแนวคิด Shift Left Security ในโลกของ Python SQLAlchemy
ในยุคที่การพัฒนาแอปพลิเคชันต้องคำนึงถึงความปลอดภัยตั้งแต่ต้นน้ำ แนวคิด Shift Left Security ได้กลายเป็นหัวใจสำคัญของการพัฒนาแบบ DevSecOps โดยเฉพาะอย่างยิ่งเมื่อเราทำงานกับฐานข้อมูลผ่าน ORM อย่าง SQLAlchemy การย้ายการตรวจสอบความปลอดภัยไปยังช่วงแรกของวงจรการพัฒนา (Development Phase) จะช่วยลดความเสี่ยงและค่าใช้จ่ายในการแก้ไขช่องโหว่ได้อย่างมหาศาล
บทความนี้จะพาคุณดำดิ่งสู่การประยุกต์ใช้ Shift Left Security กับ Python SQLAlchemy อย่างละเอียด ครอบคลุมตั้งแต่การออกแบบโมเดล การทำ Parameterized Queries การป้องกัน SQL Injection ไปจนถึงการทำ Security Testing แบบอัตโนมัติใน CI/CD Pipeline โดยเฉพาะในปี 2026 ที่เทรนด์ความปลอดภัยทางไซเบอร์ทวีความซับซ้อนมากขึ้น
SiamCafe Blog ขอนำเสนอคู่มือฉบับสมบูรณ์ที่นักพัฒนา Python ทุกคนควรอ่าน เพื่อยกระดับความปลอดภัยของแอปพลิเคชันของคุณตั้งแต่บรรทัดแรกของโค้ด
ทำไมต้อง Shift Left Security กับ SQLAlchemy?
SQLAlchemy เป็น ORM (Object Relational Mapper) ที่ได้รับความนิยมสูงสุดในโลก Python แต่การใช้งานที่ไม่ถูกต้องอาจนำมาซึ่งช่องโหว่ร้ายแรง โดยเฉพาะ SQL Injection, Data Exposure และ Authentication Bypass
ข้อดีของการทำ Shift Left Security กับ SQLAlchemy
- ลดต้นทุนการแก้ไขบั๊ก – การค้นหาช่องโหว่ในช่วง Development มีค่าใช้จ่ายน้อยกว่าช่วง Production ถึง 10-15 เท่า
- ป้องกัน SQL Injection ตั้งแต่ต้นทาง – SQLAlchemy มีกลไกป้องกันในตัว แต่ต้องใช้อย่างถูกต้อง
- เพิ่มความเร็วในการตรวจสอบ – การใช้ Static Analysis Tools ช่วยตรวจจับปัญหาได้ทันทีที่เขียนโค้ด
- สอดคล้องกับมาตรฐาน OWASP – โดยเฉพาะ OWASP Top 10 สำหรับ API Security
ในปี 2026 การโจมตีแบบ SQL Injection ยังคงเป็นภัยคุกคามอันดับต้นๆ แม้จะมี ORM ที่ปลอดภัยแล้วก็ตาม เพราะนักพัฒนาหลายคนยังคงใช้ Raw SQL หรือ Raw Text Query โดยไม่ระวัง
พื้นฐานความปลอดภัยของ SQLAlchemy ที่ต้องรู้
ก่อนจะเจาะลึก Shift Left Security เราต้องเข้าใจกลไกความปลอดภัยพื้นฐานของ SQLAlchemy เสียก่อน
การทำงานของ SQLAlchemy Engine และ Session
SQLAlchemy ทำงานผ่าน Engine ซึ่งทำหน้าที่เชื่อมต่อฐานข้อมูล และ Session ซึ่งเป็นหน่วยจัดการ Transaction การตั้งค่าความปลอดภัยเริ่มต้นที่ Engine และ Session มีผลต่อความปลอดภัยโดยรวม
# ตัวอย่างการตั้งค่า Engine อย่างปลอดภัย
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# ไม่ควร hardcode credentials ในโค้ด
# ใช้ environment variables แทน
import os
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://user:password@localhost/dbname')
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # ตรวจสอบ connection ก่อนใช้งาน
pool_recycle=3600, # recycle connection ทุก 1 ชั่วโมง
connect_args={
'sslmode': 'require', # บังคับใช้ SSL/TLS
'connect_timeout': 10
}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
การป้องกัน SQL Injection ด้วย SQLAlchemy ORM
SQLAlchemy ORM จะทำการแปลง Python Objects เป็น SQL Queries โดยอัตโนมัติ ซึ่งช่วยป้องกัน SQL Injection ได้ในระดับหนึ่ง แต่ถ้าคุณใช้ text() หรือ Raw SQL โดยไม่ระวัง ก็ยังเสี่ยงอยู่ดี
# ตัวอย่างที่ปลอดภัย - ใช้ ORM Query
from sqlalchemy.orm import Session
from models import User
def get_user_by_id(db: Session, user_id: int):
# ORM จะจัดการ parameter binding ให้อัตโนมัติ
return db.query(User).filter(User.id == user_id).first()
# ตัวอย่างที่ไม่ปลอดภัย - ใช้ Raw SQL แบบผิดวิธี
def unsafe_get_user(db: Session, user_id: str):
# อันตราย! ถ้า user_id มาจาก user input
query = f"SELECT * FROM users WHERE id = {user_id}"
return db.execute(query).fetchone()
# ตัวอย่างที่ปลอดภัย - ใช้ text() พร้อม parameter binding
from sqlalchemy import text
def safe_get_user(db: Session, user_id: int):
query = text("SELECT * FROM users WHERE id = :uid")
result = db.execute(query, {"uid": user_id})
return result.fetchone()
การออกแบบโมเดลที่ปลอดภัย (Secure Model Design)
การออกแบบโมเดลข้อมูลที่ปลอดภัยเป็นรากฐานสำคัญของ Shift Left Security ควรเริ่มต้นตั้งแต่ขั้นตอนการออกแบบ Database Schema
หลักการออกแบบโมเดลที่ปลอดภัย
- ใช้ Data Types ที่เหมาะสม – เช่น ใช้ Integer แทน String สำหรับ ID เพื่อป้องกัน Injection
- กำหนด Constraints – เช่น NOT NULL, UNIQUE, CHECK Constraints
- เข้ารหัสข้อมูลสำคัญ – โดยเฉพาะ Password, Credit Card Numbers, Personal Data
- ใช้ Soft Delete – แทนการลบข้อมูลจริง เพื่อการ Audit Trail
# ตัวอย่างโมเดลที่ปลอดภัย
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import bcrypt
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(255), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_deleted = Column(Boolean, default=False) # Soft delete
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
last_login_at = Column(DateTime(timezone=True), nullable=True)
# Role-based access control
role = Column(String(20), default='user') # 'user', 'admin', 'moderator'
def set_password(self, password: str):
# เข้ารหัส password ด้วย bcrypt
self.password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
def check_password(self, password: str) -> bool:
return bcrypt.checkpw(
password.encode('utf-8'),
self.password_hash.encode('utf-8')
)
class AuditLog(Base):
__tablename__ = 'audit_logs'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
action = Column(String(100), nullable=False)
table_name = Column(String(100), nullable=False)
record_id = Column(Integer, nullable=True)
old_values = Column(Text, nullable=True) # JSON string
new_values = Column(Text, nullable=True) # JSON string
ip_address = Column(String(45), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
การทำ Data Masking และ Encryption
สำหรับข้อมูลที่อ่อนไหว เช่น อีเมล เบอร์โทรศัพท์ ควรทำ Data Masking หรือ Encryption ก่อนจัดเก็บ
| ประเภทข้อมูล | วิธีการป้องกัน | ระดับความปลอดภัย | ข้อควรระวัง |
|---|---|---|---|
| Password | Hashing (bcrypt, argon2) | สูงมาก | ไม่สามารถ decrypt ย้อนกลับได้ |
| Credit Card | Encryption (AES-256) + Tokenization | สูง | ต้องจัดการ key อย่างปลอดภัย |
| Email/Phone | Encryption หรือ Hashing สำหรับ search | ปานกลาง-สูง | ต้องสมดุลระหว่าง security และ usability |
| PII (ชื่อ, ที่อยู่) | Encryption + Access Control | ปานกลาง | อาจต้องทำ partial decryption |
การทำ Security Testing อัตโนมัติใน CI/CD Pipeline
หัวใจของ Shift Left Security คือการนำ Security Testing เข้ามาใน Pipeline ตั้งแต่ขั้นตอนการพัฒนา ซึ่งสามารถทำได้หลายระดับ
1. Static Application Security Testing (SAST) สำหรับ SQLAlchemy
เครื่องมือ SAST จะวิเคราะห์ source code เพื่อหาช่องโหว่โดยไม่ต้องรันโปรแกรม สำหรับ SQLAlchemy มีเครื่องมือที่น่าสนใจดังนี้:
- Bandit – SAST tool สำหรับ Python ที่ตรวจจับการใช้ SQLAlchemy ที่ไม่ปลอดภัย
- Semgrep – รองรับ custom rules สำหรับ SQLAlchemy โดยเฉพาะ
- SonarQube – มีกฎสำหรับ Python Security Best Practices
# ตัวอย่าง Semgrep rule สำหรับตรวจจับ SQL Injection ใน SQLAlchemy
# สร้างไฟล์ sqlalchemy-injection.yaml
rules:
- id: sqlalchemy-raw-sql-injection
patterns:
- pattern: |
db.execute("...$VAR..." % $INPUT)
- pattern-not: |
db.execute(text("...:param..."), {"param": $INPUT})
message: "พบการใช้ Raw SQL ที่เสี่ยงต่อ SQL Injection ควรใช้ parameterized query แทน"
languages: [python]
severity: ERROR
- id: sqlalchemy-text-without-params
patterns:
- pattern: |
db.execute(text("$QUERY"))
- pattern-not: |
db.execute(text("...:..."), {...})
message: "การใช้ text() โดยไม่มี parameter binding อาจเสี่ยงต่อ SQL Injection"
languages: [python]
severity: WARNING
2. Dynamic Application Security Testing (DAST)
DAST จะทดสอบแอปพลิเคชันขณะรันจริง โดยจำลองการโจมตี เช่น SQL Injection, XSS
- OWASP ZAP – เครื่องมือ Open Source ที่สามารถ integrate กับ CI/CD ได้
- Burp Suite – เครื่องมือระดับ enterprise ที่มีฟีเจอร์ครบถ้วน
- SQLMap – เครื่องมือเฉพาะทางสำหรับทดสอบ SQL Injection
3. Dependency Scanning
ตรวจสอบไลบรารีที่ใช้ (รวมถึง SQLAlchemy และ dependencies) ว่ามีช่องโหว่ที่รู้จักหรือไม่
- Safety – ตรวจสอบ Python packages กับฐานข้อมูลช่องโหว่
- Snyk – มีทั้ง CLI และ integration กับ GitHub/GitLab
- Dependabot – GitHub’s built-in dependency scanner
การจัดการ Authentication และ Authorization อย่างปลอดภัย
SQLAlchemy มักถูกใช้ร่วมกับ Web Frameworks เช่น FastAPI, Flask, Django ซึ่งการจัดการ Authentication และ Authorization เป็นสิ่งสำคัญ
หลักการ Secure Authentication
- ใช้ Password Hashing ที่แข็งแรง – bcrypt, argon2, scrypt
- จำกัดจำนวนครั้งในการ Login – ป้องกัน Brute Force Attack
- ใช้ Session Management ที่ปลอดภัย – JWT หรือ Session Cookies ที่มี HttpOnly และ Secure flags
- Implement Multi-Factor Authentication (MFA) – สำหรับบัญชีที่มีสิทธิ์สูง
# ตัวอย่างการทำ Authentication ด้วย FastAPI + SQLAlchemy
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from datetime import datetime, timedelta
import bcrypt
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# การตั้งค่า JWT
SECRET_KEY = os.getenv('JWT_SECRET_KEY') # ต้องตั้งค่าใน environment
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, username: str, password: str):
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not user.check_password(password):
return False
return user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username, "role": user.role})
return {"access_token": access_token, "token_type": "bearer"}
# การทำ Authorization ด้วย Role-Based Access Control
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to access this resource"
)
return current_user
@app.get("/admin/users")
async def get_all_users(
db: Session = Depends(get_db),
current_user: User = Depends(require_admin)
):
users = db.query(User).filter(User.is_deleted == False).all()
# ไม่ควรส่ง password_hash กลับไป
return [{"id": u.id, "username": u.username, "email": u.email, "role": u.role} for u in users]
การทำ Logging และ Audit Trail
การบันทึก Log อย่างถูกต้องเป็นส่วนสำคัญของ Shift Left Security เพราะช่วยในการตรวจสอบและตรวจจับการโจมตี
สิ่งที่ควร Log ใน SQLAlchemy Application
- Database Queries – เฉพาะใน development environment เท่านั้น
- Authentication Events – login success, login failed, logout
- Authorization Failures – การพยายามเข้าถึงทรัพยากรที่ไม่ได้รับอนุญาต
- Data Modification Events – CREATE, UPDATE, DELETE operations
- Error และ Exception – โดยเฉพาะ database errors
# ตัวอย่างการตั้งค่า SQLAlchemy Logging อย่างปลอดภัย
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
# ตั้งค่า logging เฉพาะใน development
if os.getenv('ENVIRONMENT') == 'development':
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
else:
# ใน production ให้ log เฉพาะ warning และ error
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
# การใช้ Event Listener เพื่อ Audit
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
# บันทึก query ที่ถูก execute (เฉพาะใน development)
if os.getenv('ENVIRONMENT') == 'development':
conn.info.setdefault('query_start_time', time.time())
logging.debug(f"Query: {statement}")
logging.debug(f"Parameters: {parameters}")
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
if os.getenv('ENVIRONMENT') == 'development':
total_time = time.time() - conn.info['query_start_time']
logging.debug(f"Query completed in {total_time:.2f}s")
# การทำ Audit Trail ด้วย SQLAlchemy Events
from sqlalchemy.orm import Session
def log_audit(db: Session, user_id: int, action: str, table_name: str,
record_id: int, old_values: dict = None, new_values: dict = None):
audit = AuditLog(
user_id=user_id,
action=action,
table_name=table_name,
record_id=record_id,
old_values=json.dumps(old_values) if old_values else None,
new_values=json.dumps(new_values) if new_values else None,
ip_address=request.client.host if hasattr(request, 'client') else None
)
db.add(audit)
db.commit()
# ตัวอย่างการใช้งานใน API
@app.put("/users/{user_id}")
async def update_user(
user_id: int,
user_update: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# เก็บค่าเก่าก่อน update
old_values = {"username": user.username, "email": user.email}
# ทำการ update
for key, value in user_update.dict(exclude_unset=True).items():
setattr(user, key, value)
db.commit()
db.refresh(user)
# บันทึก audit trail
log_audit(
db,
user_id=current_user.id,
action="UPDATE",
table_name="users",
record_id=user_id,
old_values=old_values,
new_values={"username": user.username, "email": user.email}
)
return user
การจัดการ Environment และ Secrets อย่างปลอดภัย
หนึ่งในจุดอ่อนที่พบบ่อยที่สุดคือการจัดเก็บ Database Credentials ไว้ในโค้ดหรือ configuration files
แนวปฏิบัติที่ดีในการจัดการ Secrets
- ใช้ Environment Variables – ผ่าน .env file (ที่ถูก ignore ใน git) หรือ CI/CD Secrets
- ใช้ Secret Management Tools – เช่น HashiCorp Vault, AWS Secrets Manager, Azure Key Vault
- ไม่ commit credentials ลง Git – ใช้ .gitignore และ pre-commit hooks
- หมุนเวียน credentials เป็นประจำ – โดยเฉพาะ database passwords
# ตัวอย่างการใช้ Python-dotenv ในการจัดการ Environment Variables
# .env file (ไม่ควร commit ขึ้น Git)
DATABASE_URL=postgresql://prod_user:[email protected]:5432/proddb
JWT_SECRET_KEY=my-very-secure-secret-key-that-is-at-least-32-characters
ENVIRONMENT=production
# การโหลด Environment Variables
from dotenv import load_dotenv
import os
# โหลด .env เฉพาะใน local development
if os.getenv('ENVIRONMENT') != 'production':
load_dotenv()
# ใช้ os.getenv() เพื่อดึงค่า
DATABASE_URL = os.getenv('DATABASE_URL')
if not DATABASE_URL:
raise ValueError("DATABASE_URL environment variable is not set")
# การใช้ Secret Manager ใน Production (ตัวอย่าง AWS Secrets Manager)
import boto3
import json
from botocore.exceptions import ClientError
def get_secret():
secret_name = "prod/db/credentials"
region_name = "ap-southeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
raise e
secret = json.loads(get_secret_value_response['SecretString'])
return secret
# ใช้ใน production
if os.getenv('ENVIRONMENT') == 'production':
db_secret = get_secret()
DATABASE_URL = f"postgresql://{db_secret['username']}:{db_secret['password']}@{db_secret['host']}:{db_secret['port']}/{db_secret['dbname']}"
การทำ Rate Limiting และ Input Validation
แม้ SQLAlchemy จะช่วยป้องกัน SQL Injection ได้ในระดับหนึ่ง แต่ Input Validation และ Rate Limiting ก็ยังจำเป็นเพื่อป้องกันการโจมตีรูปแบบอื่น
Input Validation สำหรับ SQLAlchemy
- ตรวจสอบชนิดข้อมูล – เช่น ต้องเป็น Integer, String ตามที่กำหนด
- จำกัดความยาวของ String – ป้องกัน Buffer Overflow และ DoS
- ใช้ Allowlist แทน Blocklist – กำหนดว่าอะไรที่อนุญาต ดีกว่าห้ามสิ่งที่อันตราย
- Sanitize Input – สำหรับข้อมูลที่ต้องนำไปแสดงผล (ป้องกัน XSS)
# ตัวอย่างการใช้ Pydantic สำหรับ Input Validation
from pydantic import BaseModel, EmailStr, constr, Field
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
email: EmailStr
password: constr(min_length=8, max_length=128)
role: str = Field(default='user', regex=r'^(user|admin|moderator)$')
class UserUpdate(BaseModel):
username: Optional[constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')]
email: Optional[EmailStr]
is_active: Optional[bool]
# การทำ Rate Limiting ด้วย SlowAPI (สำหรับ FastAPI)
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # จำกัด 5 ครั้งต่อนาที
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
# login logic
pass
@app.post("/register")
@limiter.limit("3/hour") # จำกัด 3 ครั้งต่อชั่วโมง
async def register(user: UserCreate, db: Session = Depends(get_db)):
# registration logic
pass
การทำ Database Encryption at Rest และ in Transit
ความปลอดภัยของข้อมูลไม่ควรหยุดแค่ที่แอปพลิเคชัน แต่ต้องครอบคลุมถึงฐานข้อมูลด้วย
Encryption at Rest
- Transparent Data Encryption (TDE) – สำหรับ PostgreSQL, MySQL, SQL Server
- Column-Level Encryption – เข้ารหัสเฉพาะคอลัมน์ที่อ่อนไหว
- Application-Level Encryption – เข้ารหัสก่อนส่งไปยังฐานข้อมูล
Encryption in Transit
- SSL/TLS – ต้องกำหนด
sslmode='require'ใน connection string - Certificate Validation – ตรวจสอบ server certificate
- SSH Tunneling – สำหรับการเชื่อมต่อที่ต้องการความปลอดภัยสูง
| วิธีการ | ข้อดี | ข้อเสีย | การ implement ใน SQLAlchemy |
|---|---|---|---|
| TDE (Database Level) | ไม่ต้องแก้โค้ด, transparent | ต้องใช้ license เพิ่ม, performance ลดลง | ตั้งค่าที่ database โดยตรง |
| Column-Level Encryption | เลือก encrypt เฉพาะข้อมูลสำคัญ | ซับซ้อน, search/lookup ยาก | ใช้ SQLAlchemy Type Decorators |
| Application-Level Encryption | ควบคุมได้เต็มที่, ทำงาน offline | ต้องจัดการ key เอง, performance overhead | เข้ารหัสก่อน set attribute |
# ตัวอย่างการทำ Application-Level Encryption ด้วย SQLAlchemy Type Decorators
from sqlalchemy import TypeDecorator, String
import base64
from cryptography.fernet import Fernet
import os
# สร้าง key (ควรเก็บไว้ใน secret manager)
ENCRYPTION_KEY = os.getenv('ENCRYPTION_KEY')
if not ENCRYPTION_KEY:
ENCRYPTION_KEY = Fernet.generate_key()
print(f"Generated new encryption key: {ENCRYPTION_KEY.decode()}")
# ในการใช้งานจริง ต้องเก็บ key นี้ไว้ในที่ปลอดภัย
cipher = Fernet(ENCRYPTION_KEY)
class EncryptedString(TypeDecorator):
"""Custom type for encrypted string fields"""
impl = String
def process_bind_param(self, value, dialect):
if value is not None:
# เข้ารหัสก่อนบันทึกลง database
encrypted = cipher.encrypt(value.encode('utf-8'))
return base64.b64encode(encrypted).decode('utf-8')
return None
def process_result_value(self, value, dialect):
if value is not None:
# ถอดรหัสเมื่ออ่านจาก database
encrypted = base64.b64decode(value.encode('utf-8'))
return cipher.decrypt(encrypted).decode('utf-8')
return None
# การใช้งานในโมเดล
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(EncryptedString(255)) # เข้ารหัสอัตโนมัติ
phone = Column(EncryptedString(20)) # เข้ารหัสอัตโนมัติ
# password ควรใช้ hashing ไม่ใช่ encryption
การทำ Database Backup และ Disaster Recovery อย่างปลอดภัย
การ Backup ฐานข้อมูลเป็นสิ่งจำเป็น แต่ต้องทำอย่างปลอดภัยเพื่อไม่ให้ข้อมูลรั่วไหล
แนวปฏิบัติที่ปลอดภัยสำหรับ Backup
- เข้ารหัส Backup Files – ก่อนจัดเก็บใน cloud storage
- ใช้ Access Control – เฉพาะผู้ที่เกี่ยวข้องเท่านั้นที่เข้าถึง backup ได้
- ทดสอบการ Restore เป็นประจำ – อย่างน้อยเดือนละครั้ง
- จัดเก็บ Backup ในหลาย Location – ป้องกัน disaster
- ลบ Backup เก่าตามนโยบาย – เช่น เก็บ 30 วันสำหรับ daily backup
# ตัวอย่างสคริปต์ Backup ฐานข้อมูลอย่างปลอดภัย
#!/usr/bin/env python3
"""
Secure Database Backup Script
"""
import os
import subprocess
import datetime
import boto3
from cryptography.fernet import Fernet
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SecureBackup:
def __init__(self):
self.db_url = os.getenv('DATABASE_URL')
self.backup_key = os.getenv('BACKUP_ENCRYPTION_KEY')
self.s3_bucket = os.getenv('BACKUP_S3_BUCKET')
self.s3_prefix = os.getenv('BACKUP_S3_PREFIX', 'db-backups')
def create_backup(self):
"""สร้าง backup file และเข้ารหัส"""
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f'/tmp/backup_{timestamp}.sql'
encrypted_file = f'{backup_file}.encrypted'
try:
# สร้าง database dump
logger.info(f"Creating database backup: {backup_file}")
subprocess.run([
'pg_dump',
self.db_url,
'--no-owner',
'--no-acl',
'--format=custom',
'--file', backup_file
], check=True)
# เข้ารหัส backup file
logger.info("Encrypting backup file")
cipher = Fernet(self.backup_key.encode())
with open(backup_file, 'rb') as f:
data = f.read()
encrypted = cipher.encrypt(data)
with open(encrypted_file, 'wb') as f:
f.write(encrypted)
# อัปโหลดไปยัง S3
self.upload_to_s3(encrypted_file, timestamp)
# ลบไฟล์ชั่วคราว
os.remove(backup_file)
os.remove(encrypted_file)
logger.info("Backup completed successfully")
except Exception as e:
logger.error(f"Backup failed: {e}")
raise
def upload_to_s3(self, file_path: str, timestamp: str):
"""อัปโหลด backup ไปยัง S3 ด้วย server-side encryption"""
s3 = boto3.client('s3')
key = f"{self.s3_prefix}/{timestamp}_backup.encrypted"
# ใช้ AWS KMS สำหรับ server-side encryption
s3.upload_file(
file_path,
self.s3_bucket,
key,
ExtraArgs={
'ServerSideEncryption': 'aws:kms',
'SSEKMSKeyId': os.getenv('KMS_KEY_ID')
}
)
logger.info(f"Uploaded backup to s3://{self.s3_bucket}/{key}")
if __name__ == "__main__":
backup = SecureBackup()
backup.create_backup()
การทำ Security Headers และ CORS Configuration
เมื่อใช้ SQLAlchemy กับ Web Framework การตั้งค่า Security Headers และ CORS อย่างถูกต้องช่วยป้องกันการโจมตีหลายรูปแบบ
Security Headers ที่สำคัญ
- Content-Security-Policy – ป้องกัน XSS และ Data Injection
- X-Content-Type-Options – ป้องกัน MIME sniffing
- Strict-Transport-Security – บังคับใช้ HTTPS
- X-Frame-Options – ป้องกัน Clickjacking
# ตัวอย่างการตั้งค่า Security Headers ใน FastAPI
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
return response
app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)
# การตั้งค่า CORS อย่างปลอดภัย
from fastapi.middleware.cors import CORSMiddleware
origins = [
"https://siamcafe