한줄 요약:
수집→요약→포스팅 파이프라인이 “어제 잘 됐다”로 끝나면 곤란!
👉 구조화 로그 + 지표(메트릭) + 알림 + 비용 트래킹으로 매일 안심 운영 💪
1. 목표
- 구조화 로그(JSON) + 로그 로테이션 세팅
- Prometheus 스타일 메트릭(성공/실패/소요시간/건수) 노출
- 슬랙/이메일 알림(성공/실패/임계치 경보)
- LLM/API 비용 추정(토큰/호출 수 기준)과 리밋·백오프
2. 프로젝트에 공통 운영 레이어 추가
1) 폴더 구조(요약)
project/
├─ orchestrator.py # E2E 오케스트레이터 (응용#9)
├─ common_ops.py # ★ 운영 공통 모듈 (이번에 추가)
├─ logs/
└─ ...
3. common_ops.py (복붙)
# common_ops.py
import os, sys, json, time, logging, functools, traceback
from logging.handlers import RotatingFileHandler
from datetime import datetime
from typing import Callable, Any, Dict, Optional
# ── 1) 구조화(JSON) 로깅 + 로테이션 ───────────────────────────
def setup_logger(log_dir="logs", name="app", level=logging.INFO):
os.makedirs(log_dir, exist_ok=True)
path = os.path.join(log_dir, f"{name}_{datetime.now():%Y%m%d}.log")
logger = logging.getLogger(name)
logger.setLevel(level)
# 파일: 회전(5MB x 5개 보관)
fh = RotatingFileHandler(path, maxBytes=5*1024*1024, backupCount=5, encoding="utf-8")
fh.setLevel(level)
fh.setFormatter(logging.Formatter('%(message)s')) # JSON 문자열
# 콘솔: 가독성용 텍스트
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
# 중복 핸들러 방지
if not logger.handlers:
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def log_json(logger, event: str, **fields):
payload = {"ts": time.time(), "event": event, **fields}
logger.info(json.dumps(payload, ensure_ascii=False))
# ── 2) 슬랙/웹훅 알림(선택) ────────────────────────────────────
import requests
def notify_webhook(text: str, url: Optional[str] = None, timeout=8):
url = url or os.getenv("SLACK_WEBHOOK", "")
if not url:
return False
try:
requests.post(url, json={"text": text}, timeout=timeout)
return True
except Exception:
return False
# ── 3) 메트릭 내장(간단 카운터/타이머) ─────────────────────────
class Metrics:
def __init__(self):
self.counters = {}
self.timings = {}
def inc(self, key, n=1):
self.counters[key] = self.counters.get(key, 0) + n
def timeit(self, key):
start = time.time()
def stop():
elapsed = time.time() - start
self.timings.setdefault(key, []).append(elapsed)
return elapsed
return stop
def snapshot(self):
return {"counters": self.counters, "timings": {k: {"count": len(v), "avg": sum(v)/len(v)} for k,v in self.timings.items() if v}}
METRICS = Metrics()
# ── 4) 리트라이 + 백오프 데코레이터 ────────────────────────────
def retry(max_retries=3, base_delay=1.0, exceptions=(Exception,)):
def outer(fn: Callable[..., Any]):
@functools.wraps(fn)
def wrapped(*args, **kwargs):
for attempt in range(1, max_retries+1):
try:
return fn(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
raise
time.sleep(base_delay * attempt)
return wrapped
return outer
# ── 5) LLM/API 비용 추정(간단 토큰/호출 기반) ──────────────────
class CostTracker:
def __init__(self, cost_per_1k_tokens_input=0.0, cost_per_1k_tokens_output=0.0):
self.calls = 0
self.tokens_in = 0
self.tokens_out = 0
self.cpi = cost_per_1k_tokens_input
self.cpo = cost_per_1k_tokens_output
def add(self, tokens_in: int, tokens_out: int):
self.calls += 1
self.tokens_in += tokens_in
self.tokens_out += tokens_out
def estimate_usd(self):
return (self.tokens_in/1000)*self.cpi + (self.tokens_out/1000)*self.cpo
def as_dict(self):
return {
"calls": self.calls,
"tokens_in": self.tokens_in,
"tokens_out": self.tokens_out,
"est_usd": round(self.estimate_usd(), 6)
}
COST = CostTracker(
cost_per_1k_tokens_input=float(os.getenv("COST_IN", "0.0")),
cost_per_1k_tokens_output=float(os.getenv("COST_OUT", "0.0")),
)
# ── 6) 안전 실행 래퍼(로그/알림/메트릭/비용 묶음) ────────────────
def safe_step(step_name: str):
"""
with safe_step("llm_report") as ctx:
... ctx['add_cost'](tin, tout)
"""
logger = setup_logger()
stop_timer = METRICS.timeit(f"{step_name}.sec")
log_json(logger, f"{step_name}.start")
ctx: Dict[str, Any] = {"add_cost": lambda tin, tout: COST.add(tin, tout)}
try:
yield ctx
METRICS.inc(f"{step_name}.ok")
elapsed = stop_timer()
log_json(logger, f"{step_name}.ok", elapsed=round(elapsed,2))
except Exception as e:
METRICS.inc(f"{step_name}.fail")
elapsed = stop_timer()
err = "".join(traceback.format_exception_only(type(e), e)).strip()
log_json(logger, f"{step_name}.fail", elapsed=round(elapsed,2), error=err)
notify_webhook(f"❌ Step 실패: {step_name}\n{err}")
raise
# Python 3.8 이하 호환용: contextmanager
from contextlib import contextmanager
safe_step = contextmanager(safe_step)
포인트
- JSON 로그: 나중에 엘라스틱/로그수집기로 보내 분석하기 좋음
- RotatingFileHandler: 로그 폭주 방지
- Metrics: 초간단 카운터/타이머(나중에 Prometheus로 대체 가능)
- CostTracker: 토큰/호출 수 바탕 비용 대략 추정 (
.env로 단가 주입)
4. 오케스트레이터에 운영 레이어 연결
orchestrator.py 일부를 다음처럼 감쌉니다.
# orchestrator.py (일부)
from common_ops import safe_step, METRICS, COST, setup_logger, log_json, notify_webhook
import subprocess, os, time, json
logger = setup_logger(name="orchestrator")
def run_py(script):
subprocess.check_call(["python", script])
def main():
with safe_step("auto_report"):
run_py("auto_report.py")
with safe_step("llm_report") as ctx:
# (선택) LLM 호출 토큰 집계가 가능하면 여기서 비용 입력
# ctx['add_cost'](tokens_in, tokens_out)
run_py("llm_report.py")
# 예: 리포트 길이/문자 수 등 추가 로그
size = os.path.getsize("final_report.md")
log_json(logger, "artifact", name="final_report.md", size=size)
with safe_step("wp_post"):
run_py("auto_post_wp.py")
# 전체 스냅샷 보고
snapshot = {"metrics": METRICS.snapshot(), "cost": COST.as_dict()}
log_json(logger, "run.summary", **snapshot)
notify_webhook(f"✅ 파이프라인 OK\n{json.dumps(snapshot, ensure_ascii=False)}")
if __name__ == "__main__":
main()
5. (선택) Prometheus 메트릭 서버 노출
운영 환경에서 Grafana까지 쓰고 싶다면 초간단 메트릭 서버를 띄워 지표를 긁게 하자.
# metrics_server.py
from prometheus_client import Counter, Summary, Gauge, start_http_server
import time
import random
c_ok = Counter('pipeline_ok_total', '성공 횟수', ['step'])
c_fail = Counter('pipeline_fail_total', '실패 횟수', ['step'])
s_dur = Summary('pipeline_duration_seconds', '단계 소요시간', ['step'])
g_cost = Gauge('llm_cost_estimate_usd', '현재 러닝 추정 비용(USD)')
if __name__ == "__main__":
start_http_server(9108) # :9108/metrics 노출
print("Prometheus metrics at :9108/metrics")
while True:
# 실제 환경에선 orchestrator에서 push 하거나, 파일 읽어 반영
g_cost.set(random.random()/10) # 데모용
time.sleep(5)
실제 반영은
orchestrator가 주기적으로 파일/소켓로 메트릭을 갱신하는 방식으로 연결하면 된다.
6. LLM/API 비용 관리 팁
- 샘플링/집계: 개별 기사마다 LLM 호출 말고, 하루 묶음 요약으로 비용 절약
- 소형 모델로 초안 → 상위 모델로 최종본 2단계 전략
- 프롬프트 압축: 키포인트/목차만 전달 후 디테일은 후속 호출
- 리밋 & 백오프:
retry()데코레이터로 429·5xx 대비 - **단가를
.env**로 주입해 월간 비용 추정 (대시보드로 시각화 추천)
7. 임계치 알림 시나리오(예시)
| 임계치 | 조건 | 조치 |
|---|---|---|
| 실패율 | 최근 10회 중 3회 이상 실패 | 슬랙 경보 + 즉시 휴면 후 재시작 |
| 실행시간 | 단일 스텝 > 5분 | 경고 알림(느린 사이트/네트워크 점검) |
| 산출물 크기 | final_report.md < 10KB | “금일 데이터 희박” 경고 |
| 비용 | 일일 추정비용 > 예산 X% | 모델 다운그레이드/배치 크기 축소 |
위 조건은
orchestrator마지막에서METRICS와 산출물 크기로 간단히 계산해도 충분해.
8. 체크리스트
- JSON 구조화 로그 + 로테이션 적용
- 성공/실패/시간 메트릭 수집
- 슬랙/이메일 최소 1개 알림 채널 연결
- LLM 비용(호출/토큰/추정액) 집계
- 임계치 규칙 정의 및 동작 확인
- (선택) Prometheus/Grafana 연동
9.자주 묻는 질문(FAQ)
Q. 왜 JSON 로그를 쓰나요?
A. 검색·집계·대시보드(예: 엘라스틱, Loki)와 찰떡. 키-값으로 필터링 가능.
Q. 토큰 수는 어떻게 알죠?
A. 모델/SDK에 따라 응답에 토큰 사용량이 포함되는 경우가 많아. 없으면 문자수≈토큰 근사(한국어 1토큰≈1~2자)로 추정.
Q. 메트릭은 꼭 Prometheus여야 하나요?
A. 아니야. 처음엔 METRICS.snapshot()만 파일로 남겨도 충분. 필요해지면 Prometheus로 확장.
10. 요약 한 줄
로그(무슨 일이 있었나) + 메트릭(얼마나 잘 됐나) + 알림(지금 알려줘) + 비용(얼마 들었나)
이 네 박자만 지키면 매일 안심 운영 가능! ✅
이전 강좌 👈 [응용#9] 전체 자동화 워크플로우 통합
다음 강좌 👉 [운영#2] 보안·비밀관리(.env/키 회전/권한/로그 민감정보 마스킹)