auto_trade_sys/trading_system/redis_log_handler.py
薇薇安 46062e442b a
2026-01-18 20:28:23 +08:00

205 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
将 Python logging 的 ERROR 日志写入 Redis List仅保留最近 N 条)。
设计目标:
- 不影响现有日志(文件/控制台Redis 不可用时静默降级
- 只写入 ERROR/CRITICAL由 handler level 控制)
- 支持“同类内容去重”:仅对“连续相同 signature”的日志做合并计数count++ + 更新时间)
Redis 数据结构:
- List key默认 ats:logs:errorlist 头部是最新日志
- 每条日志是 JSON 字符串
"""
from __future__ import annotations
import json
import logging
import os
import socket
import time
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional
def _beijing_time_str(ts: float) -> str:
beijing_tz = timezone(timedelta(hours=8))
return datetime.fromtimestamp(ts, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]:
try:
obj = json.loads(s)
if isinstance(obj, dict):
return obj
except Exception:
return None
return None
@dataclass(frozen=True)
class RedisLogConfig:
redis_url: str
list_key: str = "ats:logs:error"
max_len: int = 2000
dedupe_consecutive: bool = True
service: str = "trading_system"
hostname: str = socket.gethostname()
connect_timeout_sec: float = 1.0
socket_timeout_sec: float = 1.0
username: Optional[str] = None
password: Optional[str] = None
use_tls: bool = False
ssl_cert_reqs: str = "required"
ssl_ca_certs: Optional[str] = None
class RedisErrorLogHandler(logging.Handler):
"""
将日志写入 Redis List。
注意logging handler 是同步的;这里使用 redis-py 的同步客户端。
"""
def __init__(self, cfg: RedisLogConfig):
super().__init__()
self.cfg = cfg
self._redis = None
self._redis_ok = False
self._last_connect_attempt_ts = 0.0
def _connection_kwargs(self) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"decode_responses": True,
"socket_connect_timeout": self.cfg.connect_timeout_sec,
"socket_timeout": self.cfg.socket_timeout_sec,
}
if self.cfg.username:
kwargs["username"] = self.cfg.username
if self.cfg.password:
kwargs["password"] = self.cfg.password
# TLSrediss:// 或显式开启)
if self.cfg.redis_url.startswith("rediss://") or self.cfg.use_tls:
kwargs["ssl_cert_reqs"] = self.cfg.ssl_cert_reqs
if self.cfg.ssl_ca_certs:
kwargs["ssl_ca_certs"] = self.cfg.ssl_ca_certs
if self.cfg.ssl_cert_reqs == "none":
kwargs["ssl_check_hostname"] = False
elif self.cfg.ssl_cert_reqs == "required":
kwargs["ssl_check_hostname"] = True
else:
kwargs["ssl_check_hostname"] = False
return kwargs
def _get_redis(self):
# 失败后做个简单退避,避免每条 ERROR 都去连 Redis
now = time.time()
if self._redis_ok and self._redis is not None:
return self._redis
if now - self._last_connect_attempt_ts < 5:
return None
self._last_connect_attempt_ts = now
try:
import redis # type: ignore
except Exception:
self._redis = None
self._redis_ok = False
return None
try:
client = redis.from_url(self.cfg.redis_url, **self._connection_kwargs())
client.ping()
self._redis = client
self._redis_ok = True
return self._redis
except Exception:
self._redis = None
self._redis_ok = False
return None
def _build_entry(self, record: logging.LogRecord) -> Dict[str, Any]:
msg = record.getMessage()
exc_text = None
exc_type = None
if record.exc_info:
exc_type = getattr(record.exc_info[0], "__name__", None)
exc_text = "".join(traceback.format_exception(*record.exc_info))
signature = f"{self.cfg.service}|{record.levelname}|{record.name}|{record.pathname}:{record.lineno}|{msg}|{exc_type or ''}"
return {
"ts": int(record.created * 1000),
"time": _beijing_time_str(record.created),
"service": self.cfg.service,
"level": record.levelname,
"logger": record.name,
"message": msg,
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
"process": record.process,
"thread": record.thread,
"hostname": self.cfg.hostname,
"exc_type": exc_type,
"exc_text": exc_text,
"signature": signature,
"count": 1,
}
def emit(self, record: logging.LogRecord) -> None:
try:
client = self._get_redis()
if client is None:
return
entry = self._build_entry(record)
list_key = os.getenv("REDIS_LOG_LIST_KEY", self.cfg.list_key).strip() or self.cfg.list_key
max_len = int(os.getenv("REDIS_LOG_LIST_MAX_LEN", str(self.cfg.max_len)) or self.cfg.max_len)
if max_len <= 0:
max_len = self.cfg.max_len
if self.cfg.dedupe_consecutive:
try:
head_raw = client.lindex(list_key, 0)
except Exception:
head_raw = None
if isinstance(head_raw, str):
head = _safe_json_loads(head_raw)
else:
head = None
if head and head.get("signature") == entry["signature"]:
head["count"] = int(head.get("count", 1)) + 1
head["ts"] = entry["ts"]
head["time"] = entry["time"]
# 保留最新堆栈(有时第一次没有 exc_text
if entry.get("exc_text"):
head["exc_text"] = entry.get("exc_text")
head["exc_type"] = entry.get("exc_type")
try:
pipe = client.pipeline()
pipe.lset(list_key, 0, json.dumps(head, ensure_ascii=False))
pipe.ltrim(list_key, 0, max_len - 1)
pipe.execute()
return
except Exception:
# 失败则尝试正常 push
pass
try:
pipe = client.pipeline()
pipe.lpush(list_key, json.dumps(entry, ensure_ascii=False))
pipe.ltrim(list_key, 0, max_len - 1)
pipe.execute()
except Exception:
# Redis 写失败不应影响业务
return
except Exception:
return