核心内容摘要
leetcode 900. RLE Iterator RLE 迭代器-耗时100
声明本文为个人学习笔记仅供技术交流不构成任何投资建议。
前言作为一个从业二十年的期货老兵我深知实盘运维的重要性。
策略写得再好如果运维出问题一样会亏钱。
今天这篇文章我来分享一下量化交易日志与监控系统的搭建经验。
为什么需要日志和监控作用说明问题排查出问题时能快速定位原因策略复盘记录交易决策过程异常告警及时发现和处理异常合规审计保留交易记录
日志系统设计
1 日志分类日志类型内容级别系统日志启动、连接、异常INFO/ERROR交易日志下单、成交、持仓INFO策略日志信号、计算过程DEBUG风控日志风控触发、仓位调整WARNING
2 日志配置importloggingfromlogging.handlersimportRotatingFileHandler,TimedRotatingFileHandlerfromdatetimeimportdatetimeimportosdefsetup_logging(log_dirlogs):配置日志系统# 创建日志目录ifnotos.path.exists(log_dir):os.makedirs(log_dir)# 日志格式formatterlogging.Formatter(%(asctime)s | %(levelname)-8s | %(name)s | %(message)s,datefmt%Y-%m-%d %H:%M:%S)# 根日志器root_loggerlogging.getLogger()root_logger.setLevel(logging.DEBUG)#
控制台输出console_handlerlogging.StreamHandler()console_handler.setLevel(logging.INFO)console_handler.setFormatter(formatter)root_logger.addHandler(console_handler)#
交易日志按天切割trade_handlerTimedRotatingFileHandler(os.path.join(log_dir,trade.log),whenmidnight,interval1,backupCount30,encodingutf-
trade_handler.setLevel(logging.INFO)trade_handler.setFormatter(formatter)trade_loggerlogging.getLogger(trade)trade_logger.addHandler(trade_handler)#
错误日志error_handlerRotatingFileHandler(os.path.join(log_dir,error.log),maxBytes10*1024*1024,# 10MBbackupCount5,encodingutf-
error_handler.setLevel(logging.ERROR)error_handler.setFormatter(formatter)root_logger.addHandler(error_handler)returnroot_logger# 使用loggersetup_logging()trade_loggerlogging.getLogger(trade)logger.info(系统启动)trade_logger.info(下单 | SHFE.rb2505 | BUY | OPEN | 1手 |
3850)
3 结构化日志importjsonfromdatetimeimportdatetimeclassStructuredLogger:结构化日志记录器def__init__(self,log_file):self.log_filelog_filedeflog(self,event_type,**kwargs):记录结构化日志record{timestamp:datetime.now().isoformat(),event_type:event_type,**kwargs}withopen(self.log_file,a,encodingutf-
asf:f.write(json.dumps(record,ensure_asciiFalse)\n)deflog_order(self,symbol,direction,offset,volume,price):记录订单self.log(ORDER,symbolsymbol,directiondirection,offsetoffset,volumevolume,priceprice)deflog_trade(self,symbol,direction,price,volume,pnl):记录成交self.log(TRADE,symbolsymbol,directiondirection,priceprice,volumevolume,pnlpnl)deflog_signal(self,symbol,signal,reason):记录信号self.log(SIGNAL,symbolsymbol,signalsignal,reasonreason)# 使用示例slogStructuredLogger(logs/structured.jsonl)slog.log_order(SHFE.rb2505,BUY,OPEN,1,
slog.log_signal(SHFE.rb2505,BUY,MA5上穿MA
20)
监控系统设计
1 监控指标指标说明阈值账户权益实时权益-日盈亏当日盈亏亏损3%告警持仓当前持仓超限告警系统状态连接状态断线告警延迟行情延迟1秒告警
2 监控类实现fromdatetimeimportdatetime,timeimportloggingclassTradingMonitor:交易监控器def__init__(self,config):self.configconfig self.daily_start_balanceNoneself.last_heartbeatdatetime.now()self.loggerlogging.getLogger(monitor)defon_day_start(self,balance):每日开始self.daily_start_balancebalance self.logger.info(f交易日开始 | 初始权益:{balance:.2f})defcheck_daily_pnl(self,current_balance):检查日盈亏ifself.daily_start_balanceisNone:returnTruepnlcurrent_balance-self.daily_start_balance pnl_pctpnl/self.daily_start_balanceifpnl_pct-self.config[daily_loss_limit]:self.logger.warning(f日亏损告警 | 亏损:{pnl:.2f}({pnl_pct:.2%}))self.send_alert(f日亏损告警:{pnl:.2f}({pnl_pct:.2%}))returnFalsereturnTruedefcheck_position(self,position,max_position):检查持仓total_posposition.pos_longposition.pos_shortiftotal_posmax_position:self.logger.warning(f持仓超限 | 当前:{total_pos}上限:{max_position})self.send_alert(f持仓超限:{total_pos}/{max_position})returnFalsereturnTruedefheartbeat(self):心跳检测self.last_heartbeatdatetime.now()defcheck_heartbeat(self,timeout_seconds
:检查心跳elapsed(datetime.now()-self.last_heartbeat).total_seconds()ifelapsedtimeout_seconds:self.logger.error(f心跳超时 | 距上次心跳:{elapsed:.0f}秒)self.send_alert(f策略心跳超时:{elapsed:.0f}秒)returnFalsereturnTruedefsend_alert(self,message):发送告警# 钉钉告警self.send_dingtalk(message)# 或微信告警# self.send_wechat(message)defsend_dingtalk(self,message):发送钉钉消息importrequests webhookself.config.get(dingtalk_webhook)ifnotwebhook:returndata{msgtype:text,text:{content:f[量化告警]{message}}}try:requests.post(webhook,jsondata,timeout
exceptExceptionase:self.logger.error(f钉钉发送失败:{e})
3 定时报告fromdatetimeimportdatetime,timeimportscheduleclassReportGenerator:报告生成器def__init__(self,api):self.apiapidefgenerate_daily_report(self):生成每日报告accountself.api.get_account()reportf 每日报告 日期:{datetime.now().strftime(%Y-%m-%d)}账户权益:{account.balance:.2f}可用资金:{account.available:.2f}持仓盈亏:{account.position_profit:.2f}当日盈亏:{account.close_profit:.2f} logging.info(report)returnreportdefschedule_reports(self):设置定时报告# 每日收盘后发送报告schedule.every().day.at(15:
.do(self.generate_daily_report)# 每小时发送状态schedule.every().hour.do(self.send_status)# 在主循环中运行# schedule.run_pending()
完整监控系统示例fromtqsdkimportTqApi,TqAuth,TqAccountimportloggingfromdatetimeimportdatetimeimportthreadingimporttime# 配置CONFIG{broker:期货公司,account:资金账号,password:密码,tq_user:TQ账户,tq_pass:TQ密码,symbol:SHFE.rb2505,daily_loss_limit:
03,max_position:5,dingtalk_webhook:https://oapi.dingtalk.com/robot/send?access_tokenxxx,}# 设置日志setup_logging()loggerlogging.getLogger(main)# 初始化监控monitorTradingMonitor(CONFIG)# 连接APIapiTqApi(TqAccount(CONFIG[broker],CONFIG[account],CONFIG[password]),authTqAuth(CONFIG[tq_user],CONFIG[tq_pass]))accountapi.get_account()positionapi.get_position(CONFIG[symbol])# 初始化监控monitor.on_day_start(account.balance)logger.info(策略启动)# 主循环try:whileTrue:api.wait_update()# 心跳monitor.heartbeat()# 监控检查ifnotmonitor.check_daily_pnl(account.balance):logger.warning(触发日亏损限制)# 可以选择停止交易ifnotmonitor.check_position(position,CONFIG[max_position]):logger.warning(持仓超限)# 策略逻辑...exceptKeyboardInterrupt:logger.info(用户中断)exceptExceptionase:logger.error(f异常:{e})monitor.send_alert(f策略异常:{e})finally:api.close()logger.info(策略停止)
运维最佳实践
1 日志管理日志分级DEBUG/INFO/WARNING/ERROR日志切割按天或按大小切割日志保留保留30天以上定期清理避免磁盘满
2 监控告警分级告警不同级别不同处理告警去重避免重复告警告警升级持续告警升级通知告警测试定期测试告警通道
3 故障恢复自动重连网络断开自动重连状态恢复重启后恢复状态数据备份定期备份配置和日志
我的运维配置组件选择量化框架TqSdk日志库Python logging告警钉钉机器人监控自建脚本服务器阿里云ECS选择TqSdk的原因是API简洁容易集成日志和监控逻辑。
八、