diff --git a/backend/api/main.py b/backend/api/main.py index d1b104c..6c2c10b 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -90,6 +90,32 @@ def setup_logging(): console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) + # 追加:将 ERROR 日志写入 Redis(不影响现有文件/控制台日志) + try: + from api.redis_log_handler import RedisErrorLogHandler, RedisLogConfig + + redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") + redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true" + redis_username = os.getenv("REDIS_USERNAME", None) + redis_password = os.getenv("REDIS_PASSWORD", None) + ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required") + ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) + + redis_cfg = RedisLogConfig( + redis_url=redis_url, + use_tls=redis_use_tls, + username=redis_username, + password=redis_password, + ssl_cert_reqs=ssl_cert_reqs, + ssl_ca_certs=ssl_ca_certs, + service="backend", + ) + redis_handler = RedisErrorLogHandler(redis_cfg) + redis_handler.setLevel(logging.ERROR) + root_logger.addHandler(redis_handler) + except Exception: + pass + # 设置第三方库的日志级别 logging.getLogger('uvicorn').setLevel(logging.WARNING) logging.getLogger('uvicorn.access').setLevel(logging.WARNING) diff --git a/backend/api/redis_log_handler.py b/backend/api/redis_log_handler.py new file mode 100644 index 0000000..f739675 --- /dev/null +++ b/backend/api/redis_log_handler.py @@ -0,0 +1,186 @@ +""" +FastAPI backend:将 ERROR 日志写入 Redis List(仅保留最近 N 条)。 + +实现与 trading_system/redis_log_handler.py 保持一致(避免跨目录导入带来的 PYTHONPATH 问题)。 +""" + +from __future__ import annotations + +import json +import logging +import os +import socket +import time +import traceback +from dataclasses import dataclass +from datetime import datetime, timezone, timedelta +from typing import Any, Dict, Optional + + +def _beijing_time_str(ts: float) -> str: + beijing_tz = timezone(timedelta(hours=8)) + return datetime.fromtimestamp(ts, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") + + +def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: + try: + obj = json.loads(s) + if isinstance(obj, dict): + return obj + except Exception: + return None + return None + + +@dataclass(frozen=True) +class RedisLogConfig: + redis_url: str + list_key: str = "ats:logs:error" + max_len: int = 2000 + dedupe_consecutive: bool = True + service: str = "backend" + hostname: str = socket.gethostname() + connect_timeout_sec: float = 1.0 + socket_timeout_sec: float = 1.0 + username: Optional[str] = None + password: Optional[str] = None + use_tls: bool = False + ssl_cert_reqs: str = "required" + ssl_ca_certs: Optional[str] = None + + +class RedisErrorLogHandler(logging.Handler): + def __init__(self, cfg: RedisLogConfig): + super().__init__() + self.cfg = cfg + self._redis = None + self._redis_ok = False + self._last_connect_attempt_ts = 0.0 + + def _connection_kwargs(self) -> Dict[str, Any]: + kwargs: Dict[str, Any] = { + "decode_responses": True, + "socket_connect_timeout": self.cfg.connect_timeout_sec, + "socket_timeout": self.cfg.socket_timeout_sec, + } + if self.cfg.username: + kwargs["username"] = self.cfg.username + if self.cfg.password: + kwargs["password"] = self.cfg.password + + if self.cfg.redis_url.startswith("rediss://") or self.cfg.use_tls: + kwargs["ssl_cert_reqs"] = self.cfg.ssl_cert_reqs + if self.cfg.ssl_ca_certs: + kwargs["ssl_ca_certs"] = self.cfg.ssl_ca_certs + if self.cfg.ssl_cert_reqs == "none": + kwargs["ssl_check_hostname"] = False + elif self.cfg.ssl_cert_reqs == "required": + kwargs["ssl_check_hostname"] = True + else: + kwargs["ssl_check_hostname"] = False + return kwargs + + def _get_redis(self): + now = time.time() + if self._redis_ok and self._redis is not None: + return self._redis + if now - self._last_connect_attempt_ts < 5: + return None + self._last_connect_attempt_ts = now + + try: + import redis # type: ignore + except Exception: + self._redis = None + self._redis_ok = False + return None + + try: + client = redis.from_url(self.cfg.redis_url, **self._connection_kwargs()) + client.ping() + self._redis = client + self._redis_ok = True + return self._redis + except Exception: + self._redis = None + self._redis_ok = False + return None + + def _build_entry(self, record: logging.LogRecord) -> Dict[str, Any]: + msg = record.getMessage() + exc_text = None + exc_type = None + if record.exc_info: + exc_type = getattr(record.exc_info[0], "__name__", None) + exc_text = "".join(traceback.format_exception(*record.exc_info)) + + signature = f"{self.cfg.service}|{record.levelname}|{record.name}|{record.pathname}:{record.lineno}|{msg}|{exc_type or ''}" + + return { + "ts": int(record.created * 1000), + "time": _beijing_time_str(record.created), + "service": self.cfg.service, + "level": record.levelname, + "logger": record.name, + "message": msg, + "pathname": record.pathname, + "lineno": record.lineno, + "funcName": record.funcName, + "process": record.process, + "thread": record.thread, + "hostname": self.cfg.hostname, + "exc_type": exc_type, + "exc_text": exc_text, + "signature": signature, + "count": 1, + } + + def emit(self, record: logging.LogRecord) -> None: + try: + client = self._get_redis() + if client is None: + return + + entry = self._build_entry(record) + list_key = os.getenv("REDIS_LOG_LIST_KEY", self.cfg.list_key).strip() or self.cfg.list_key + max_len = int(os.getenv("REDIS_LOG_LIST_MAX_LEN", str(self.cfg.max_len)) or self.cfg.max_len) + if max_len <= 0: + max_len = self.cfg.max_len + + if self.cfg.dedupe_consecutive: + try: + head_raw = client.lindex(list_key, 0) + except Exception: + head_raw = None + + if isinstance(head_raw, str): + head = _safe_json_loads(head_raw) + else: + head = None + + if head and head.get("signature") == entry["signature"]: + head["count"] = int(head.get("count", 1)) + 1 + head["ts"] = entry["ts"] + head["time"] = entry["time"] + if entry.get("exc_text"): + head["exc_text"] = entry.get("exc_text") + head["exc_type"] = entry.get("exc_type") + try: + pipe = client.pipeline() + pipe.lset(list_key, 0, json.dumps(head, ensure_ascii=False)) + pipe.ltrim(list_key, 0, max_len - 1) + pipe.execute() + return + except Exception: + pass + + try: + pipe = client.pipeline() + pipe.lpush(list_key, json.dumps(entry, ensure_ascii=False)) + pipe.ltrim(list_key, 0, max_len - 1) + pipe.execute() + except Exception: + return + except Exception: + return + diff --git a/backend/api/routes/system.py b/backend/api/routes/system.py index 045dbd2..d579631 100644 --- a/backend/api/routes/system.py +++ b/backend/api/routes/system.py @@ -12,6 +12,123 @@ logger = logging.getLogger(__name__) # 路由统一挂在 /api/system 下,前端直接调用 /api/system/... router = APIRouter(prefix="/api/system") +def _get_redis_client_for_logs(): + """ + 获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。 + 返回:redis.Redis 或 None + """ + # 1) 复用 config_manager(避免重复连接) + try: + import config_manager # backend/config_manager.py(已负责加载 .env) + + cm = getattr(config_manager, "config_manager", None) + if cm is not None: + redis_client = getattr(cm, "_redis_client", None) + redis_connected = getattr(cm, "_redis_connected", False) + if redis_client is not None and redis_connected: + try: + redis_client.ping() + return redis_client + except Exception: + pass + except Exception: + pass + + # 2) 自行创建 + try: + import redis # type: ignore + + redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") + redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true" + redis_username = os.getenv("REDIS_USERNAME", None) + redis_password = os.getenv("REDIS_PASSWORD", None) + ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required") + ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) + + kwargs: Dict[str, Any] = { + "decode_responses": True, + "username": redis_username, + "password": redis_password, + "socket_connect_timeout": 1, + "socket_timeout": 1, + } + if redis_url.startswith("rediss://") or redis_use_tls: + kwargs["ssl_cert_reqs"] = ssl_cert_reqs + if ssl_ca_certs: + kwargs["ssl_ca_certs"] = ssl_ca_certs + if ssl_cert_reqs == "none": + kwargs["ssl_check_hostname"] = False + elif ssl_cert_reqs == "required": + kwargs["ssl_check_hostname"] = True + else: + kwargs["ssl_check_hostname"] = False + + client = redis.from_url(redis_url, **kwargs) + client.ping() + return client + except Exception: + return None + + +@router.get("/logs") +async def get_logs( + limit: int = 200, + service: Optional[str] = None, + level: Optional[str] = None, + x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), +) -> Dict[str, Any]: + """ + 从 Redis List 读取最新日志(默认 ats:logs:error)。 + + 参数: + - limit: 返回条数(最大 2000) + - service: 过滤(backend / trading_system) + - level: 过滤(ERROR / CRITICAL ...) + """ + _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) + + if limit <= 0: + limit = 200 + if limit > 2000: + limit = 2000 + + list_key = os.getenv("REDIS_LOG_LIST_KEY", "ats:logs:error").strip() or "ats:logs:error" + + client = _get_redis_client_for_logs() + if client is None: + raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志") + + try: + raw_items = client.lrange(list_key, 0, limit - 1) + except Exception as e: + raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}") + + items: list[Dict[str, Any]] = [] + for raw in raw_items or []: + try: + obj = raw + if isinstance(raw, bytes): + obj = raw.decode("utf-8", errors="ignore") + if isinstance(obj, str): + parsed = __import__("json").loads(obj) + else: + continue + if not isinstance(parsed, dict): + continue + if service and str(parsed.get("service")) != service: + continue + if level and str(parsed.get("level")) != level: + continue + items.append(parsed) + except Exception: + continue + + return { + "key": list_key, + "count": len(items), + "items": items, + } + def _require_admin(token: Optional[str], provided: Optional[str]) -> None: """ diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 3afeb88..cf7249c 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -5,6 +5,7 @@ import ConfigGuide from './components/ConfigGuide' import TradeList from './components/TradeList' import StatsDashboard from './components/StatsDashboard' import Recommendations from './components/Recommendations' +import LogMonitor from './components/LogMonitor' import './App.css' function App() { @@ -19,6 +20,7 @@ function App() { 交易推荐 配置 交易记录 + 日志监控 @@ -30,6 +32,7 @@ function App() { } /> } /> } /> + } /> diff --git a/frontend/src/components/LogMonitor.css b/frontend/src/components/LogMonitor.css new file mode 100644 index 0000000..9307343 --- /dev/null +++ b/frontend/src/components/LogMonitor.css @@ -0,0 +1,219 @@ +.log-monitor { + display: flex; + flex-direction: column; + gap: 16px; +} + +.log-header { + display: flex; + align-items: flex-end; + justify-content: space-between; + gap: 16px; +} + +.log-header h2 { + margin: 0; +} + +.log-subtitle { + margin-top: 6px; + color: #666; + font-size: 12px; +} + +.log-actions .btn { + border: 1px solid #ddd; + padding: 8px 12px; + border-radius: 8px; + background: #fff; + cursor: pointer; +} + +.log-actions .btn:disabled { + opacity: 0.6; + cursor: not-allowed; +} + +.log-controls { + display: flex; + flex-wrap: wrap; + gap: 12px; + align-items: flex-end; + padding: 12px; + border: 1px solid #eee; + border-radius: 10px; + background: #fff; +} + +.control { + display: flex; + flex-direction: column; + gap: 6px; + min-width: 140px; +} + +.control.inline { + min-width: auto; +} + +.control label { + font-size: 12px; + color: #666; +} + +.control input, +.control select { + border: 1px solid #ddd; + border-radius: 8px; + padding: 8px 10px; + outline: none; +} + +.log-error { + padding: 10px 12px; + border-radius: 10px; + border: 1px solid #f3c4c4; + background: #fff5f5; + color: #b42318; +} + +.log-table { + border: 1px solid #eee; + border-radius: 10px; + overflow: hidden; + background: #fff; +} + +.log-row { + display: grid; + grid-template-columns: 170px 140px 110px 1fr; + gap: 12px; + padding: 10px 12px; + border-top: 1px solid #f3f3f3; + align-items: start; +} + +.log-head { + border-top: none; + background: #fafafa; + font-weight: 600; + color: #333; +} + +.log-empty { + padding: 16px 12px; + color: #666; +} + +.c-time, +.c-svc, +.c-level { + font-size: 12px; + color: #333; +} + +.c-msg { + display: flex; + flex-direction: column; + gap: 6px; +} + +.msg-line { + display: flex; + gap: 8px; + align-items: baseline; +} + +.msg-text { + font-size: 13px; + color: #111; + word-break: break-word; +} + +.msg-count { + font-size: 12px; + color: #666; +} + +.msg-meta { + font-size: 12px; + color: #666; +} + +.msg-details summary { + cursor: pointer; + font-size: 12px; + color: #333; +} + +.stack { + margin: 8px 0 0; + padding: 10px; + border-radius: 8px; + background: #0b1020; + color: #dbeafe; + overflow: auto; + font-size: 12px; + line-height: 1.35; +} + +.pill { + display: inline-block; + padding: 3px 8px; + border-radius: 999px; + font-size: 12px; + border: 1px solid #ddd; + background: #fff; +} + +.pill-error { + border-color: #f3c4c4; + background: #fff5f5; + color: #b42318; +} + +.pill-critical { + border-color: #f1aeb5; + background: #ffe4e6; + color: #9f1239; +} + +.pill-warning { + border-color: #f6d48b; + background: #fffbeb; + color: #92400e; +} + +.pill-info { + border-color: #bfdbfe; + background: #eff6ff; + color: #1d4ed8; +} + +.pill-debug { + border-color: #e5e7eb; + background: #f9fafb; + color: #374151; +} + +@media (max-width: 900px) { + .log-row { + grid-template-columns: 1fr; + } + .log-head { + display: none; + } + .c-time::before { + content: "时间:"; + color: #666; + } + .c-svc::before { + content: "服务:"; + color: #666; + } + .c-level::before { + content: "级别:"; + color: #666; + } +} + diff --git a/frontend/src/components/LogMonitor.jsx b/frontend/src/components/LogMonitor.jsx new file mode 100644 index 0000000..8d0a3c8 --- /dev/null +++ b/frontend/src/components/LogMonitor.jsx @@ -0,0 +1,170 @@ +import React, { useEffect, useMemo, useState } from 'react' +import { api } from '../services/api' +import './LogMonitor.css' + +const LEVELS = ['', 'ERROR', 'CRITICAL', 'WARNING', 'INFO', 'DEBUG'] +const SERVICES = ['', 'backend', 'trading_system'] + +function formatCount(item) { + const c = Number(item?.count || 1) + return c > 1 ? `×${c}` : '' +} + +export default function LogMonitor() { + const [items, setItems] = useState([]) + const [loading, setLoading] = useState(false) + const [error, setError] = useState('') + + const [limit, setLimit] = useState(200) + const [service, setService] = useState('') + const [level, setLevel] = useState('') + const [autoRefresh, setAutoRefresh] = useState(true) + const [refreshSec, setRefreshSec] = useState(5) + + const params = useMemo(() => { + const p = { limit: String(limit) } + if (service) p.service = service + if (level) p.level = level + return p + }, [limit, service, level]) + + const load = async () => { + setLoading(true) + setError('') + try { + const res = await api.getSystemLogs(params) + setItems(res?.items || []) + } catch (e) { + setError(e?.message || '获取日志失败') + } finally { + setLoading(false) + } + } + + useEffect(() => { + load() + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [params]) + + useEffect(() => { + if (!autoRefresh) return + const sec = Number(refreshSec) + if (!sec || sec <= 0) return + const t = setInterval(() => load(), sec * 1000) + return () => clearInterval(t) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [autoRefresh, refreshSec, params]) + + return ( +
+
+
+

日志监控

+
来源:Redis List(只保留最近 N 条,连续同类会合并计数)
+
+
+ +
+
+ +
+
+ + setLimit(Number(e.target.value || 200))} + /> +
+ +
+ + +
+ +
+ + +
+ +
+ +
+ +
+ + setRefreshSec(Number(e.target.value || 5))} + disabled={!autoRefresh} + /> +
+
+ + {error ?
{error}
: null} + +
+
+
时间
+
服务
+
级别
+
内容
+
+ + {items.length === 0 ? ( +
{loading ? '加载中...' : '暂无日志'}
+ ) : ( + items.map((it, idx) => ( +
+
{it.time || ''}
+
{it.service || ''}
+
+ {it.level} +
+
+
+ {it.message || ''} + {formatCount(it)} +
+ {it.logger ?
{it.logger}
: null} + {it.exc_text ? ( +
+ 堆栈 +
{it.exc_text}
+
+ ) : null} +
+
+ )) + )} +
+
+ ) +} + diff --git a/frontend/src/services/api.js b/frontend/src/services/api.js index 17c63fa..0ec2902 100644 --- a/frontend/src/services/api.js +++ b/frontend/src/services/api.js @@ -297,4 +297,16 @@ export const api = { } return response.json(); }, + + // 日志监控(Redis List) + getSystemLogs: async (params = {}) => { + const query = new URLSearchParams(params).toString(); + const url = query ? `${buildUrl('/api/system/logs')}?${query}` : buildUrl('/api/system/logs'); + const response = await fetch(url); + if (!response.ok) { + const error = await response.json().catch(() => ({ detail: '获取日志失败' })); + throw new Error(error.detail || '获取日志失败'); + } + return response.json(); + }, }; diff --git a/trading_system/main.py b/trading_system/main.py index e351393..fd384e7 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -63,6 +63,29 @@ logging.basicConfig( handlers=[file_handler, console_handler] ) +# 追加:将 ERROR 日志写入 Redis(不影响现有文件/控制台日志) +try: + if __name__ == '__main__': + from redis_log_handler import RedisErrorLogHandler, RedisLogConfig + else: + from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig + + redis_cfg = RedisLogConfig( + redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"), + use_tls=bool(getattr(config, "REDIS_USE_TLS", False)), + ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"), + ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None), + username=getattr(config, "REDIS_USERNAME", None), + password=getattr(config, "REDIS_PASSWORD", None), + service="trading_system", + ) + redis_handler = RedisErrorLogHandler(redis_cfg) + redis_handler.setLevel(logging.ERROR) + logging.getLogger().addHandler(redis_handler) +except Exception: + # Redis handler 仅用于增强监控,失败不影响交易系统启动 + pass + logger = logging.getLogger(__name__) diff --git a/trading_system/redis_log_handler.py b/trading_system/redis_log_handler.py new file mode 100644 index 0000000..2df309f --- /dev/null +++ b/trading_system/redis_log_handler.py @@ -0,0 +1,204 @@ +""" +将 Python logging 的 ERROR 日志写入 Redis List(仅保留最近 N 条)。 + +设计目标: +- 不影响现有日志(文件/控制台):Redis 不可用时静默降级 +- 只写入 ERROR/CRITICAL(由 handler level 控制) +- 支持“同类内容去重”:仅对“连续相同 signature”的日志做合并计数(count++ + 更新时间) + +Redis 数据结构: +- List key(默认 ats:logs:error),list 头部是最新日志 +- 每条日志是 JSON 字符串 +""" + +from __future__ import annotations + +import json +import logging +import os +import socket +import time +import traceback +from dataclasses import dataclass +from datetime import datetime, timezone, timedelta +from typing import Any, Dict, Optional + + +def _beijing_time_str(ts: float) -> str: + beijing_tz = timezone(timedelta(hours=8)) + return datetime.fromtimestamp(ts, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") + + +def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: + try: + obj = json.loads(s) + if isinstance(obj, dict): + return obj + except Exception: + return None + return None + + +@dataclass(frozen=True) +class RedisLogConfig: + redis_url: str + list_key: str = "ats:logs:error" + max_len: int = 2000 + dedupe_consecutive: bool = True + service: str = "trading_system" + hostname: str = socket.gethostname() + connect_timeout_sec: float = 1.0 + socket_timeout_sec: float = 1.0 + username: Optional[str] = None + password: Optional[str] = None + use_tls: bool = False + ssl_cert_reqs: str = "required" + ssl_ca_certs: Optional[str] = None + + +class RedisErrorLogHandler(logging.Handler): + """ + 将日志写入 Redis List。 + + 注意:logging handler 是同步的;这里使用 redis-py 的同步客户端。 + """ + + def __init__(self, cfg: RedisLogConfig): + super().__init__() + self.cfg = cfg + self._redis = None + self._redis_ok = False + self._last_connect_attempt_ts = 0.0 + + def _connection_kwargs(self) -> Dict[str, Any]: + kwargs: Dict[str, Any] = { + "decode_responses": True, + "socket_connect_timeout": self.cfg.connect_timeout_sec, + "socket_timeout": self.cfg.socket_timeout_sec, + } + if self.cfg.username: + kwargs["username"] = self.cfg.username + if self.cfg.password: + kwargs["password"] = self.cfg.password + + # TLS(rediss:// 或显式开启) + if self.cfg.redis_url.startswith("rediss://") or self.cfg.use_tls: + kwargs["ssl_cert_reqs"] = self.cfg.ssl_cert_reqs + if self.cfg.ssl_ca_certs: + kwargs["ssl_ca_certs"] = self.cfg.ssl_ca_certs + if self.cfg.ssl_cert_reqs == "none": + kwargs["ssl_check_hostname"] = False + elif self.cfg.ssl_cert_reqs == "required": + kwargs["ssl_check_hostname"] = True + else: + kwargs["ssl_check_hostname"] = False + return kwargs + + def _get_redis(self): + # 失败后做个简单退避,避免每条 ERROR 都去连 Redis + now = time.time() + if self._redis_ok and self._redis is not None: + return self._redis + if now - self._last_connect_attempt_ts < 5: + return None + self._last_connect_attempt_ts = now + + try: + import redis # type: ignore + except Exception: + self._redis = None + self._redis_ok = False + return None + + try: + client = redis.from_url(self.cfg.redis_url, **self._connection_kwargs()) + client.ping() + self._redis = client + self._redis_ok = True + return self._redis + except Exception: + self._redis = None + self._redis_ok = False + return None + + def _build_entry(self, record: logging.LogRecord) -> Dict[str, Any]: + msg = record.getMessage() + exc_text = None + exc_type = None + if record.exc_info: + exc_type = getattr(record.exc_info[0], "__name__", None) + exc_text = "".join(traceback.format_exception(*record.exc_info)) + + signature = f"{self.cfg.service}|{record.levelname}|{record.name}|{record.pathname}:{record.lineno}|{msg}|{exc_type or ''}" + + return { + "ts": int(record.created * 1000), + "time": _beijing_time_str(record.created), + "service": self.cfg.service, + "level": record.levelname, + "logger": record.name, + "message": msg, + "pathname": record.pathname, + "lineno": record.lineno, + "funcName": record.funcName, + "process": record.process, + "thread": record.thread, + "hostname": self.cfg.hostname, + "exc_type": exc_type, + "exc_text": exc_text, + "signature": signature, + "count": 1, + } + + def emit(self, record: logging.LogRecord) -> None: + try: + client = self._get_redis() + if client is None: + return + + entry = self._build_entry(record) + list_key = os.getenv("REDIS_LOG_LIST_KEY", self.cfg.list_key).strip() or self.cfg.list_key + max_len = int(os.getenv("REDIS_LOG_LIST_MAX_LEN", str(self.cfg.max_len)) or self.cfg.max_len) + if max_len <= 0: + max_len = self.cfg.max_len + + if self.cfg.dedupe_consecutive: + try: + head_raw = client.lindex(list_key, 0) + except Exception: + head_raw = None + + if isinstance(head_raw, str): + head = _safe_json_loads(head_raw) + else: + head = None + + if head and head.get("signature") == entry["signature"]: + head["count"] = int(head.get("count", 1)) + 1 + head["ts"] = entry["ts"] + head["time"] = entry["time"] + # 保留最新堆栈(有时第一次没有 exc_text) + if entry.get("exc_text"): + head["exc_text"] = entry.get("exc_text") + head["exc_type"] = entry.get("exc_type") + try: + pipe = client.pipeline() + pipe.lset(list_key, 0, json.dumps(head, ensure_ascii=False)) + pipe.ltrim(list_key, 0, max_len - 1) + pipe.execute() + return + except Exception: + # 失败则尝试正常 push + pass + + try: + pipe = client.pipeline() + pipe.lpush(list_key, json.dumps(entry, ensure_ascii=False)) + pipe.ltrim(list_key, 0, max_len - 1) + pipe.execute() + except Exception: + # Redis 写失败不应影响业务 + return + except Exception: + return +