From 50026fb048e1ee395f1a5383a049787605584969 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Sun, 18 Jan 2026 20:49:47 +0800 Subject: [PATCH] a --- backend/api/main.py | 3 +- backend/api/redis_log_handler.py | 151 +++++++++++++++-- backend/api/routes/system.py | 216 ++++++++++++++++++++++++- frontend/src/components/LogMonitor.css | 57 +++++++ frontend/src/components/LogMonitor.jsx | 121 +++++++++++++- frontend/src/services/api.js | 22 +++ trading_system/main.py | 3 +- trading_system/redis_log_handler.py | 170 +++++++++++++++++-- 8 files changed, 712 insertions(+), 31 deletions(-) diff --git a/backend/api/main.py b/backend/api/main.py index 6c2c10b..21b6b05 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -111,7 +111,8 @@ def setup_logging(): service="backend", ) redis_handler = RedisErrorLogHandler(redis_cfg) - redis_handler.setLevel(logging.ERROR) + # 让 handler 自己按组筛选(error/warning/info),这里只需要放宽到 INFO + redis_handler.setLevel(logging.INFO) root_logger.addHandler(redis_handler) except Exception: pass diff --git a/backend/api/redis_log_handler.py b/backend/api/redis_log_handler.py index f739675..e1c3c8a 100644 --- a/backend/api/redis_log_handler.py +++ b/backend/api/redis_log_handler.py @@ -1,5 +1,5 @@ """ -FastAPI backend:将 ERROR 日志写入 Redis List(仅保留最近 N 条)。 +FastAPI backend:将日志写入 Redis List(按 error / warning / info 分组,仅保留最近 N 条)。 实现与 trading_system/redis_log_handler.py 保持一致(避免跨目录导入带来的 PYTHONPATH 问题)。 """ @@ -14,13 +14,18 @@ import time import traceback from dataclasses import dataclass from datetime import datetime, timezone, timedelta -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Literal def _beijing_time_str(ts: float) -> str: beijing_tz = timezone(timedelta(hours=8)) return datetime.fromtimestamp(ts, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") +def _beijing_yyyymmdd(ts: Optional[float] = None) -> str: + beijing_tz = timezone(timedelta(hours=8)) + dt = datetime.fromtimestamp(ts or time.time(), tz=beijing_tz) + return dt.strftime("%Y%m%d") + def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: try: @@ -32,12 +37,45 @@ def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: return None +LogGroup = Literal["error", "warning", "info"] + + +def _parse_bool(v: Any, default: bool) -> bool: + if v is None: + return default + if isinstance(v, bool): + return v + s = str(v).strip().lower() + if s in ("1", "true", "yes", "y", "on"): + return True + if s in ("0", "false", "no", "n", "off"): + return False + return default + + +def _parse_int(v: Any, default: int) -> int: + try: + n = int(str(v).strip()) + return n + except Exception: + return default + + @dataclass(frozen=True) class RedisLogConfig: redis_url: str - list_key: str = "ats:logs:error" - max_len: int = 2000 + list_key_prefix: str = "ats:logs" + config_key: str = "ats:logs:config" + stats_key_prefix: str = "ats:logs:stats:added" + max_len_error: int = 2000 + max_len_warning: int = 2000 + max_len_info: int = 2000 dedupe_consecutive: bool = True + enable_error: bool = True + enable_warning: bool = True + enable_info: bool = True + include_debug_in_info: bool = False + config_refresh_sec: float = 5.0 service: str = "backend" hostname: str = socket.gethostname() connect_timeout_sec: float = 1.0 @@ -56,6 +94,8 @@ class RedisErrorLogHandler(logging.Handler): self._redis = None self._redis_ok = False self._last_connect_attempt_ts = 0.0 + self._last_cfg_refresh_ts = 0.0 + self._remote_cfg: Dict[str, Any] = {} def _connection_kwargs(self) -> Dict[str, Any]: kwargs: Dict[str, Any] = { @@ -135,19 +175,106 @@ class RedisErrorLogHandler(logging.Handler): "count": 1, } + def _effective_cfg_bool(self, key: str, default: bool) -> bool: + if key in self._remote_cfg: + return _parse_bool(self._remote_cfg.get(key), default) + return default + + def _refresh_remote_config_if_needed(self, client) -> None: + now = time.time() + if now - self._last_cfg_refresh_ts < self.cfg.config_refresh_sec: + return + self._last_cfg_refresh_ts = now + try: + cfg_key = os.getenv("REDIS_LOG_CONFIG_KEY", self.cfg.config_key).strip() or self.cfg.config_key + data = client.hgetall(cfg_key) or {} + normalized: Dict[str, Any] = {} + for k, v in data.items(): + if not k: + continue + normalized[str(k).strip()] = v + self._remote_cfg = normalized + except Exception: + return + + def _group_for_record(self, record: logging.LogRecord) -> Optional[LogGroup]: + if record.levelno >= logging.ERROR: + return "error" + if record.levelno >= logging.WARNING: + return "warning" + if record.levelno == logging.INFO: + return "info" + if record.levelno == logging.DEBUG and self._effective_cfg_bool("include_debug_in_info", self.cfg.include_debug_in_info): + return "info" + return None + + def _list_key_for_group(self, group: LogGroup) -> str: + if group == "error": + legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip() + if legacy: + return legacy + env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip() + if env_key: + return env_key + prefix = os.getenv("REDIS_LOG_LIST_PREFIX", self.cfg.list_key_prefix).strip() or self.cfg.list_key_prefix + return f"{prefix}:{group}" + + def _max_len_for_group(self, group: LogGroup) -> int: + env_specific = os.getenv(f"REDIS_LOG_LIST_MAX_LEN_{group.upper()}", "").strip() + if env_specific: + n = _parse_int(env_specific, 0) + return n if n > 0 else (self.cfg.max_len_error if group == "error" else self.cfg.max_len_warning if group == "warning" else self.cfg.max_len_info) + + env_global = os.getenv("REDIS_LOG_LIST_MAX_LEN", "").strip() + if env_global: + n = _parse_int(env_global, 0) + if n > 0: + return n + + field = f"max_len:{group}" + if field in self._remote_cfg: + n = _parse_int(self._remote_cfg.get(field), 0) + if n > 0: + return n + + return self.cfg.max_len_error if group == "error" else self.cfg.max_len_warning if group == "warning" else self.cfg.max_len_info + + def _enabled_for_group(self, group: LogGroup) -> bool: + field = f"enabled:{group}" + if field in self._remote_cfg: + return _parse_bool(self._remote_cfg.get(field), True) + return self.cfg.enable_error if group == "error" else self.cfg.enable_warning if group == "warning" else self.cfg.enable_info + + def _dedupe_consecutive_enabled(self) -> bool: + if "dedupe_consecutive" in self._remote_cfg: + return _parse_bool(self._remote_cfg.get("dedupe_consecutive"), self.cfg.dedupe_consecutive) + return self.cfg.dedupe_consecutive + + def _stats_key(self, group: LogGroup) -> str: + prefix = os.getenv("REDIS_LOG_STATS_PREFIX", self.cfg.stats_key_prefix).strip() or self.cfg.stats_key_prefix + day = _beijing_yyyymmdd() + return f"{prefix}:{day}:{group}" + def emit(self, record: logging.LogRecord) -> None: try: client = self._get_redis() if client is None: return - entry = self._build_entry(record) - list_key = os.getenv("REDIS_LOG_LIST_KEY", self.cfg.list_key).strip() or self.cfg.list_key - max_len = int(os.getenv("REDIS_LOG_LIST_MAX_LEN", str(self.cfg.max_len)) or self.cfg.max_len) - if max_len <= 0: - max_len = self.cfg.max_len + self._refresh_remote_config_if_needed(client) - if self.cfg.dedupe_consecutive: + group = self._group_for_record(record) + if group is None: + return + if not self._enabled_for_group(group): + return + + entry = self._build_entry(record) + list_key = self._list_key_for_group(group) + max_len = self._max_len_for_group(group) + stats_key = self._stats_key(group) + + if self._dedupe_consecutive_enabled(): try: head_raw = client.lindex(list_key, 0) except Exception: @@ -169,6 +296,8 @@ class RedisErrorLogHandler(logging.Handler): pipe = client.pipeline() pipe.lset(list_key, 0, json.dumps(head, ensure_ascii=False)) pipe.ltrim(list_key, 0, max_len - 1) + pipe.incr(stats_key, 1) + pipe.expire(stats_key, 14 * 24 * 3600) pipe.execute() return except Exception: @@ -178,6 +307,8 @@ class RedisErrorLogHandler(logging.Handler): pipe = client.pipeline() pipe.lpush(list_key, json.dumps(entry, ensure_ascii=False)) pipe.ltrim(list_key, 0, max_len - 1) + pipe.incr(stats_key, 1) + pipe.expire(stats_key, 14 * 24 * 3600) pipe.execute() except Exception: return diff --git a/backend/api/routes/system.py b/backend/api/routes/system.py index d579631..2f22849 100644 --- a/backend/api/routes/system.py +++ b/backend/api/routes/system.py @@ -1,10 +1,12 @@ import os import re import subprocess +import json from pathlib import Path from typing import Any, Dict, Optional, Tuple from fastapi import APIRouter, HTTPException, Header +from pydantic import BaseModel import logging logger = logging.getLogger(__name__) @@ -12,6 +14,117 @@ logger = logging.getLogger(__name__) # 路由统一挂在 /api/system 下,前端直接调用 /api/system/... router = APIRouter(prefix="/api/system") +LOG_GROUPS = ("error", "warning", "info") + + +def _logs_prefix() -> str: + return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs") + + +def _logs_key_for_group(group: str) -> str: + group = (group or "error").strip().lower() + # 兼容旧配置:REDIS_LOG_LIST_KEY 仅用于 error + if group == "error": + legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip() + if legacy: + return legacy + + env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip() + if env_key: + return env_key + + return f"{_logs_prefix()}:{group}" + + +def _logs_config_key() -> str: + return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config") + + +def _logs_stats_prefix() -> str: + return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added") + + +def _beijing_yyyymmdd() -> str: + from datetime import datetime, timezone, timedelta + + beijing_tz = timezone(timedelta(hours=8)) + return datetime.now(tz=beijing_tz).strftime("%Y%m%d") + + +def _default_logs_config() -> Dict[str, Any]: + return { + "max_len": {"error": 2000, "warning": 2000, "info": 2000}, + "enabled": {"error": True, "warning": True, "info": True}, + "dedupe_consecutive": True, + "include_debug_in_info": False, + "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, + "config_key": _logs_config_key(), + "stats_prefix": _logs_stats_prefix(), + } + + +def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]: + cfg = defaults + + for g in LOG_GROUPS: + v = redis_hash.get(f"max_len:{g}") + if v is not None: + try: + n = int(str(v).strip()) + if n > 0: + cfg["max_len"][g] = n + except Exception: + pass + + ev = redis_hash.get(f"enabled:{g}") + if ev is not None: + s = str(ev).strip().lower() + cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on") + + for k in ("dedupe_consecutive", "include_debug_in_info"): + vv = redis_hash.get(k) + if vv is not None: + s = str(vv).strip().lower() + cfg[k] = s in ("1", "true", "yes", "y", "on") + + return cfg + + +def _read_logs_config(client) -> Dict[str, Any]: + defaults = _default_logs_config() + try: + raw = client.hgetall(_logs_config_key()) or {} + return _merge_logs_config(defaults, raw) + except Exception: + return defaults + + +def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]: + mapping: Dict[str, str] = {} + for g in LOG_GROUPS: + mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g])) + mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0" + mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0" + mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0" + + pipe = client.pipeline() + pipe.hset(_logs_config_key(), mapping=mapping) + for g in LOG_GROUPS: + key = _logs_key_for_group(g) + max_len = int(cfg["max_len"][g]) + if max_len > 0: + pipe.ltrim(key, 0, max_len - 1) + pipe.execute() + return cfg + + +class LogsConfigUpdate(BaseModel): + max_len: Optional[Dict[str, int]] = None + enabled: Optional[Dict[str, bool]] = None + dedupe_consecutive: Optional[bool] = None + include_debug_in_info: Optional[bool] = None + + def _get_redis_client_for_logs(): """ 获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。 @@ -73,15 +186,17 @@ def _get_redis_client_for_logs(): @router.get("/logs") async def get_logs( limit: int = 200, + group: str = "error", service: Optional[str] = None, level: Optional[str] = None, x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), ) -> Dict[str, Any]: """ - 从 Redis List 读取最新日志(默认 ats:logs:error)。 + 从 Redis List 读取最新日志(默认 group=error -> ats:logs:error)。 参数: - limit: 返回条数(最大 2000) + - group: 日志分组(error / warning / info) - service: 过滤(backend / trading_system) - level: 过滤(ERROR / CRITICAL ...) """ @@ -92,7 +207,11 @@ async def get_logs( if limit > 2000: limit = 2000 - list_key = os.getenv("REDIS_LOG_LIST_KEY", "ats:logs:error").strip() or "ats:logs:error" + group = (group or "error").strip().lower() + if group not in LOG_GROUPS: + raise HTTPException(status_code=400, detail=f"非法 group:{group}(可选:{', '.join(LOG_GROUPS)})") + + list_key = _logs_key_for_group(group) client = _get_redis_client_for_logs() if client is None: @@ -110,7 +229,7 @@ async def get_logs( if isinstance(raw, bytes): obj = raw.decode("utf-8", errors="ignore") if isinstance(obj, str): - parsed = __import__("json").loads(obj) + parsed = json.loads(obj) else: continue if not isinstance(parsed, dict): @@ -124,12 +243,103 @@ async def get_logs( continue return { + "group": group, "key": list_key, "count": len(items), "items": items, } +@router.get("/logs/overview") +async def logs_overview(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: + _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) + + client = _get_redis_client_for_logs() + if client is None: + raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览") + + cfg = _read_logs_config(client) + + day = _beijing_yyyymmdd() + stats_prefix = _logs_stats_prefix() + + pipe = client.pipeline() + for g in LOG_GROUPS: + pipe.llen(_logs_key_for_group(g)) + for g in LOG_GROUPS: + pipe.get(f"{stats_prefix}:{day}:{g}") + res = pipe.execute() + + llen_vals = res[: len(LOG_GROUPS)] + added_vals = res[len(LOG_GROUPS) :] + + llen: Dict[str, int] = {} + added_today: Dict[str, int] = {} + for i, g in enumerate(LOG_GROUPS): + try: + llen[g] = int(llen_vals[i] or 0) + except Exception: + llen[g] = 0 + try: + added_today[g] = int(added_vals[i] or 0) + except Exception: + added_today[g] = 0 + + return { + "config": cfg, + "stats": { + "day": day, + "llen": llen, + "added_today": added_today, + }, + } + + +@router.put("/logs/config") +async def update_logs_config( + payload: LogsConfigUpdate, + x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), +) -> Dict[str, Any]: + _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) + + client = _get_redis_client_for_logs() + if client is None: + raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置") + + cfg = _read_logs_config(client) + + if payload.max_len: + for g, v in payload.max_len.items(): + gg = (g or "").strip().lower() + if gg not in LOG_GROUPS: + continue + try: + n = int(v) + if n < 100: + n = 100 + if n > 20000: + n = 20000 + cfg["max_len"][gg] = n + except Exception: + continue + + if payload.enabled: + for g, v in payload.enabled.items(): + gg = (g or "").strip().lower() + if gg not in LOG_GROUPS: + continue + cfg["enabled"][gg] = bool(v) + + if payload.dedupe_consecutive is not None: + cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive) + + if payload.include_debug_in_info is not None: + cfg["include_debug_in_info"] = bool(payload.include_debug_in_info) + + cfg = _write_logs_config_and_trim(client, cfg) + return {"message": "ok", "config": cfg} + + def _require_admin(token: Optional[str], provided: Optional[str]) -> None: """ 可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN,则要求请求携带 X-Admin-Token。 diff --git a/frontend/src/components/LogMonitor.css b/frontend/src/components/LogMonitor.css index 9307343..ad2ae57 100644 --- a/frontend/src/components/LogMonitor.css +++ b/frontend/src/components/LogMonitor.css @@ -45,6 +45,63 @@ background: #fff; } +.log-overview { + padding: 12px; + border: 1px solid #eee; + border-radius: 10px; + background: #fff; + display: flex; + flex-direction: column; + gap: 10px; +} + +.overview-row { + display: flex; + align-items: center; + justify-content: space-between; + gap: 12px; + flex-wrap: wrap; +} + +.overview-title { + font-weight: 600; + color: #333; +} + +.overview-items { + display: flex; + gap: 12px; + flex-wrap: wrap; + color: #444; + font-size: 12px; +} + +.overview-config { + display: flex; + gap: 10px; + flex-wrap: wrap; + align-items: flex-end; +} + +.overview-config .mini { + display: flex; + flex-direction: column; + gap: 6px; + min-width: 120px; +} + +.overview-config .mini label { + font-size: 12px; + color: #666; +} + +.overview-config .mini input { + border: 1px solid #ddd; + border-radius: 8px; + padding: 8px 10px; + outline: none; +} + .control { display: flex; flex-direction: column; diff --git a/frontend/src/components/LogMonitor.jsx b/frontend/src/components/LogMonitor.jsx index 8d0a3c8..f9779d9 100644 --- a/frontend/src/components/LogMonitor.jsx +++ b/frontend/src/components/LogMonitor.jsx @@ -2,7 +2,13 @@ import React, { useEffect, useMemo, useState } from 'react' import { api } from '../services/api' import './LogMonitor.css' -const LEVELS = ['', 'ERROR', 'CRITICAL', 'WARNING', 'INFO', 'DEBUG'] +const GROUPS = [ + { key: 'error', label: '错误' }, + { key: 'warning', label: '警告' }, + { key: 'info', label: '信息' }, +] + +const LEVELS = ['', 'ERROR', 'CRITICAL', 'WARNING', 'INFO'] const SERVICES = ['', 'backend', 'trading_system'] function formatCount(item) { @@ -15,6 +21,10 @@ export default function LogMonitor() { const [loading, setLoading] = useState(false) const [error, setError] = useState('') + const [group, setGroup] = useState('error') + const [overview, setOverview] = useState(null) + const [saving, setSaving] = useState(false) + const [limit, setLimit] = useState(200) const [service, setService] = useState('') const [level, setLevel] = useState('') @@ -22,11 +32,20 @@ export default function LogMonitor() { const [refreshSec, setRefreshSec] = useState(5) const params = useMemo(() => { - const p = { limit: String(limit) } + const p = { limit: String(limit), group } if (service) p.service = service if (level) p.level = level return p - }, [limit, service, level]) + }, [limit, service, level, group]) + + const loadOverview = async () => { + try { + const res = await api.getLogsOverview() + setOverview(res) + } catch (e) { + // 概览失败不阻塞日志列表 + } + } const load = async () => { setLoading(true) @@ -34,6 +53,7 @@ export default function LogMonitor() { try { const res = await api.getSystemLogs(params) setItems(res?.items || []) + await loadOverview() } catch (e) { setError(e?.message || '获取日志失败') } finally { @@ -55,12 +75,44 @@ export default function LogMonitor() { // eslint-disable-next-line react-hooks/exhaustive-deps }, [autoRefresh, refreshSec, params]) + const maxLen = overview?.config?.max_len || {} + const enabled = overview?.config?.enabled || {} + const llen = overview?.stats?.llen || {} + const addedToday = overview?.stats?.added_today || {} + const day = overview?.stats?.day || '' + + const [maxLenDraft, setMaxLenDraft] = useState({ error: 2000, warning: 2000, info: 2000 }) + useEffect(() => { + if (maxLen?.error || maxLen?.warning || maxLen?.info) { + setMaxLenDraft({ + error: Number(maxLen.error || 2000), + warning: Number(maxLen.warning || 2000), + info: Number(maxLen.info || 2000), + }) + } + }, [maxLen?.error, maxLen?.warning, maxLen?.info]) + + const saveConfig = async () => { + setSaving(true) + setError('') + try { + await api.updateLogsConfig({ max_len: maxLenDraft }) + await loadOverview() + } catch (e) { + setError(e?.message || '更新日志配置失败') + } finally { + setSaving(false) + } + } + return (

日志监控

-
来源:Redis List(只保留最近 N 条,连续同类会合并计数)
+
+ 来源:Redis List(分组存储 + 只保留最近 N 条 + 连续同类合并计数) +
+
+
+
今日统计 {day ? `(${day})` : ''}
+
+ error: {addedToday.error || 0} / {llen.error || 0}{enabled.error === false ? '(已停用)' : ''} + warning: {addedToday.warning || 0} / {llen.warning || 0}{enabled.warning === false ? '(已停用)' : ''} + info: {addedToday.info || 0} / {llen.info || 0}{enabled.info === false ? '(已停用)' : ''} +
+
+ +
+
最大条数(每类)
+
+
+ + setMaxLenDraft((s) => ({ ...s, error: Number(e.target.value || 2000) }))} + /> +
+
+ + setMaxLenDraft((s) => ({ ...s, warning: Number(e.target.value || 2000) }))} + /> +
+
+ + setMaxLenDraft((s) => ({ ...s, info: Number(e.target.value || 2000) }))} + /> +
+ +
+
+
+
+
+ + +
+
{ + const response = await fetch(buildUrl('/api/system/logs/overview')); + if (!response.ok) { + const error = await response.json().catch(() => ({ detail: '获取日志概览失败' })); + throw new Error(error.detail || '获取日志概览失败'); + } + return response.json(); + }, + + updateLogsConfig: async (data) => { + const response = await fetch(buildUrl('/api/system/logs/config'), { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(data || {}), + }); + if (!response.ok) { + const error = await response.json().catch(() => ({ detail: '更新日志配置失败' })); + throw new Error(error.detail || '更新日志配置失败'); + } + return response.json(); + }, }; diff --git a/trading_system/main.py b/trading_system/main.py index fd384e7..28b45fa 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -80,7 +80,8 @@ try: service="trading_system", ) redis_handler = RedisErrorLogHandler(redis_cfg) - redis_handler.setLevel(logging.ERROR) + # 让 handler 自己按组筛选(error/warning/info),这里只需要放宽到 INFO + redis_handler.setLevel(logging.INFO) logging.getLogger().addHandler(redis_handler) except Exception: # Redis handler 仅用于增强监控,失败不影响交易系统启动 diff --git a/trading_system/redis_log_handler.py b/trading_system/redis_log_handler.py index 2df309f..bfd59ee 100644 --- a/trading_system/redis_log_handler.py +++ b/trading_system/redis_log_handler.py @@ -1,14 +1,18 @@ """ -将 Python logging 的 ERROR 日志写入 Redis List(仅保留最近 N 条)。 +将 Python logging 的日志写入 Redis List(按 error / warning / info 分组,仅保留最近 N 条)。 设计目标: - 不影响现有日志(文件/控制台):Redis 不可用时静默降级 -- 只写入 ERROR/CRITICAL(由 handler level 控制) +- 默认写入:ERROR/CRITICAL、WARNING、INFO(DEBUG 默认不写入) - 支持“同类内容去重”:仅对“连续相同 signature”的日志做合并计数(count++ + 更新时间) +- 支持前端动态调整:通过 Redis Hash 配置(无需重启进程,handler 内部做轻量缓存刷新) +- 支持记录量统计:按“北京时间日期 + 分组”累加计数 Redis 数据结构: -- List key(默认 ats:logs:error),list 头部是最新日志 +- List key(默认 ats:logs:{group}),list 头部是最新日志 - 每条日志是 JSON 字符串 +- 配置 Hash(默认 ats:logs:config) +- 统计 Key(默认 ats:logs:stats:added:{YYYYMMDD}:{group}) """ from __future__ import annotations @@ -21,13 +25,18 @@ import time import traceback from dataclasses import dataclass from datetime import datetime, timezone, timedelta -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Literal def _beijing_time_str(ts: float) -> str: beijing_tz = timezone(timedelta(hours=8)) return datetime.fromtimestamp(ts, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") +def _beijing_yyyymmdd(ts: Optional[float] = None) -> str: + beijing_tz = timezone(timedelta(hours=8)) + dt = datetime.fromtimestamp(ts or time.time(), tz=beijing_tz) + return dt.strftime("%Y%m%d") + def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: try: @@ -39,12 +48,45 @@ def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]: return None +LogGroup = Literal["error", "warning", "info"] + + +def _parse_bool(v: Any, default: bool) -> bool: + if v is None: + return default + if isinstance(v, bool): + return v + s = str(v).strip().lower() + if s in ("1", "true", "yes", "y", "on"): + return True + if s in ("0", "false", "no", "n", "off"): + return False + return default + + +def _parse_int(v: Any, default: int) -> int: + try: + n = int(str(v).strip()) + return n + except Exception: + return default + + @dataclass(frozen=True) class RedisLogConfig: redis_url: str - list_key: str = "ats:logs:error" - max_len: int = 2000 + list_key_prefix: str = "ats:logs" + config_key: str = "ats:logs:config" + stats_key_prefix: str = "ats:logs:stats:added" + max_len_error: int = 2000 + max_len_warning: int = 2000 + max_len_info: int = 2000 dedupe_consecutive: bool = True + enable_error: bool = True + enable_warning: bool = True + enable_info: bool = True + include_debug_in_info: bool = False + config_refresh_sec: float = 5.0 service: str = "trading_system" hostname: str = socket.gethostname() connect_timeout_sec: float = 1.0 @@ -69,6 +111,8 @@ class RedisErrorLogHandler(logging.Handler): self._redis = None self._redis_ok = False self._last_connect_attempt_ts = 0.0 + self._last_cfg_refresh_ts = 0.0 + self._remote_cfg: Dict[str, Any] = {} def _connection_kwargs(self) -> Dict[str, Any]: kwargs: Dict[str, Any] = { @@ -150,19 +194,117 @@ class RedisErrorLogHandler(logging.Handler): "count": 1, } + def _group_for_record(self, record: logging.LogRecord) -> Optional[LogGroup]: + # ERROR/CRITICAL -> error + if record.levelno >= logging.ERROR: + return "error" + # WARNING -> warning + if record.levelno >= logging.WARNING: + return "warning" + # INFO -> info + if record.levelno == logging.INFO: + return "info" + # DEBUG:默认不写;可通过配置打开并归入 info + if record.levelno == logging.DEBUG and self._effective_cfg_bool("include_debug_in_info", self.cfg.include_debug_in_info): + return "info" + return None + + def _effective_cfg_bool(self, key: str, default: bool) -> bool: + if key in self._remote_cfg: + return _parse_bool(self._remote_cfg.get(key), default) + return default + + def _refresh_remote_config_if_needed(self, client) -> None: + now = time.time() + if now - self._last_cfg_refresh_ts < self.cfg.config_refresh_sec: + return + self._last_cfg_refresh_ts = now + try: + cfg_key = os.getenv("REDIS_LOG_CONFIG_KEY", self.cfg.config_key).strip() or self.cfg.config_key + data = client.hgetall(cfg_key) or {} + # 约定:hash field 使用 max_len:error / enabled:info 等 + normalized: Dict[str, Any] = {} + for k, v in data.items(): + if not k: + continue + normalized[str(k).strip()] = v + self._remote_cfg = normalized + except Exception: + # 读取失败就沿用旧缓存 + return + + def _list_key_for_group(self, group: LogGroup) -> str: + # 兼容旧环境变量:REDIS_LOG_LIST_KEY(仅用于 error) + if group == "error": + legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip() + if legacy: + return legacy + env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip() + if env_key: + return env_key + prefix = os.getenv("REDIS_LOG_LIST_PREFIX", self.cfg.list_key_prefix).strip() or self.cfg.list_key_prefix + return f"{prefix}:{group}" + + def _max_len_for_group(self, group: LogGroup) -> int: + # env 最高优先级(便于应急) + env_specific = os.getenv(f"REDIS_LOG_LIST_MAX_LEN_{group.upper()}", "").strip() + if env_specific: + n = _parse_int(env_specific, 0) + return n if n > 0 else (self.cfg.max_len_error if group == "error" else self.cfg.max_len_warning if group == "warning" else self.cfg.max_len_info) + + # 其次:全局 env + env_global = os.getenv("REDIS_LOG_LIST_MAX_LEN", "").strip() + if env_global: + n = _parse_int(env_global, 0) + if n > 0: + return n + + # 再其次:Redis 配置 + field = f"max_len:{group}" + if field in self._remote_cfg: + n = _parse_int(self._remote_cfg.get(field), 0) + if n > 0: + return n + + # 最后:本地默认 + return self.cfg.max_len_error if group == "error" else self.cfg.max_len_warning if group == "warning" else self.cfg.max_len_info + + def _enabled_for_group(self, group: LogGroup) -> bool: + field = f"enabled:{group}" + if field in self._remote_cfg: + return _parse_bool(self._remote_cfg.get(field), True) + return self.cfg.enable_error if group == "error" else self.cfg.enable_warning if group == "warning" else self.cfg.enable_info + + def _dedupe_consecutive_enabled(self) -> bool: + if "dedupe_consecutive" in self._remote_cfg: + return _parse_bool(self._remote_cfg.get("dedupe_consecutive"), self.cfg.dedupe_consecutive) + return self.cfg.dedupe_consecutive + + def _stats_key(self, group: LogGroup) -> str: + prefix = os.getenv("REDIS_LOG_STATS_PREFIX", self.cfg.stats_key_prefix).strip() or self.cfg.stats_key_prefix + day = _beijing_yyyymmdd() + return f"{prefix}:{day}:{group}" + def emit(self, record: logging.LogRecord) -> None: try: client = self._get_redis() if client is None: return - entry = self._build_entry(record) - list_key = os.getenv("REDIS_LOG_LIST_KEY", self.cfg.list_key).strip() or self.cfg.list_key - max_len = int(os.getenv("REDIS_LOG_LIST_MAX_LEN", str(self.cfg.max_len)) or self.cfg.max_len) - if max_len <= 0: - max_len = self.cfg.max_len + self._refresh_remote_config_if_needed(client) - if self.cfg.dedupe_consecutive: + group = self._group_for_record(record) + if group is None: + return + if not self._enabled_for_group(group): + return + + entry = self._build_entry(record) + list_key = self._list_key_for_group(group) + max_len = self._max_len_for_group(group) + stats_key = self._stats_key(group) + + if self._dedupe_consecutive_enabled(): try: head_raw = client.lindex(list_key, 0) except Exception: @@ -185,6 +327,8 @@ class RedisErrorLogHandler(logging.Handler): pipe = client.pipeline() pipe.lset(list_key, 0, json.dumps(head, ensure_ascii=False)) pipe.ltrim(list_key, 0, max_len - 1) + pipe.incr(stats_key, 1) + pipe.expire(stats_key, 14 * 24 * 3600) pipe.execute() return except Exception: @@ -195,6 +339,8 @@ class RedisErrorLogHandler(logging.Handler): pipe = client.pipeline() pipe.lpush(list_key, json.dumps(entry, ensure_ascii=False)) pipe.ltrim(list_key, 0, max_len - 1) + pipe.incr(stats_key, 1) + pipe.expire(stats_key, 14 * 24 * 3600) pipe.execute() except Exception: # Redis 写失败不应影响业务