用户: 利用vnpy项目的ctp接口,开发一个python项目,已经申请好simnow账号,针对ag2606,实现行情收取,5秒,30秒,1分钟,15分钟k线建立存储,用DolphinDB,模型推理用NautilusTrader,如何部署,给出步骤和代码 我们基于 vnpy CTP 接口、DolphinDB 和 NautilusTrader 搭建一套针对 ag2606 的行情采集、K 线合成、存储与模型推理流水线。以下给出完整部署步骤和核心代码。 ## 1. 整体架构 ``` [SimNow CTP] --> (vnpy CTP Gateway) --> Tick 事件 │ ├─> 多周期 K 线合成器 (5s/30s/1m/15m) │ │ │ └─> DolphinDB 写入 (实时) │ └─> DolphinDB (历史 K 线) │ └─> NautilusTrader 策略进程 (定时/实时读取,模型推理,产生信号) ``` ## 2. 环境部署步骤 ### 2.1 服务器 / 本地环境准备 - 操作系统:Linux (CentOS 7+ / Ubuntu 20.04+) 或 Windows 10+ - Python 3.9+ (推荐 3.10) - DolphinDB 服务端 (社区版即可) 部署在同一网络或本地,端口 8848 - SimNow 账号已申请,确保网络连通 ### 2.2 安装依赖 ```bash pip install vnpy vnpy_ctp vnpy_sqlite # vnpy 核心和 CTP 接口 pip install dolphindb # DolphinDB Python API pip install nautilus_trader # 策略框架 pip install pandas numpy loguru ``` ### 2.3 启动 DolphinDB 并建库建表 在 DolphinDB 控制台或 Python 脚本中执行: ```sql -- 创建复合分区数据库(按日期值分区,按频率范围分区) if(!existsDatabase("dfs://kline_db")){ dbDate = database(, VALUE, 2024.01.01..2025.12.31) dbFreq = database(, RANGE, string(["5s","30s","1m","15m"])) db = database("dfs://kline_db", COMPO, [dbDate, dbFreq]) } -- 创建 K 线表 db = database("dfs://kline_db") t = table(1:0, `datetime`symbol`open`high`low`close`volume`open_interest`frequency, [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, SYMBOL]) db.createPartitionedTable(t, `kline, `datetime`frequency) ``` ## 3. vnpy 行情收取与 K 线合成代码 ### 3.1 CTP 行情服务 (market_data_engine.py) 使用 vnpy 的事件引擎,连接 SimNow CTP 行情,合成多周期 K 线并写入 DolphinDB。 ```python import sys from datetime import datetime from typing import List from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.object import TickData, BarData, SubscribeRequest, Interval from vnpy.trader.constant import Exchange from vnpy_ctp import CtpGateway import dolphindb as ddb from dolphindb import Session from loguru import logger # ---------- DolphinDB 连接 ---------- session = Session() session.connect("127.0.0.1", 8848, "admin", "123456") # ---------- K 线合成器 ---------- class MultiPeriodBarGenerator: """基于 Tick 合成多个周期的 K 线""" def __init__(self, symbol: str, exchange: Exchange, intervals: List[Interval]): self.symbol = symbol self.exchange = exchange self.generators = {} for interval in intervals: self.generators[interval] = self._init_generator(interval) def _init_generator(self, interval: Interval): return { "interval": interval, "bar": None, "is_new": True } def update_tick(self, tick: TickData): for interval, gen in self.generators.items(): self._update_bar(tick, gen) def _update_bar(self, tick: TickData, gen: dict): interval = gen["interval"] bar = gen["bar"] if bar is None: # 新建一根 K 线 gen["bar"] = BarData( gateway_name="CTP", symbol=tick.symbol, exchange=tick.exchange, datetime=self._get_bar_start(tick.datetime, interval), interval=interval, open_price=tick.last_price, high_price=tick.last_price, low_price=tick.last_price, close_price=tick.last_price, volume=0, turnover=0, open_interest=tick.open_interest, ) gen["is_new"] = True return # 检查是否进入下一根 K 线 if tick.datetime >= bar.datetime + self._interval_delta(interval): # 推送上一根 K 线 self.on_bar(bar.copy()) # 重置 gen["bar"] = BarData( gateway_name="CTP", symbol=tick.symbol, exchange=tick.exchange, datetime=self._get_bar_start(tick.datetime, interval), interval=interval, open_price=tick.last_price, high_price=tick.last_price, low_price=tick.last_price, close_price=tick.last_price, volume=0, turnover=0, open_interest=tick.open_interest, ) gen["is_new"] = True else: # 更新当前 K 线 bar.high_price = max(bar.high_price, tick.last_price) bar.low_price = min(bar.low_price, tick.last_price) bar.close_price = tick.last_price bar.volume += tick.volume bar.turnover += tick.turnover bar.open_interest = tick.open_interest def _get_bar_start(self, dt: datetime, interval: Interval): """计算 K 线起始时间""" if interval == Interval.MINUTE: return dt.replace(second=0, microsecond=0) elif interval == Interval.SECOND: return dt.replace(microsecond=0) # 按秒截断,由外部决定周期 else: return dt.replace(second=0, microsecond=0) # 默认分钟 def _interval_delta(self, interval: Interval): from datetime import timedelta if interval == Interval.MINUTE: return timedelta(minutes=1) elif interval == Interval.SECOND: return timedelta(seconds=5) # 这里简化,实际需按外部周期映射 return timedelta(minutes=1) def on_bar(self, bar: BarData): """实现在子类中处理生成的 K 线""" pass # ---------- 具体行情引擎 ---------- class MarketDataApp(MultiPeriodBarGenerator): def __init__(self, event_engine: EventEngine): self.event_engine = event_engine # 定义要合成的周期 intervals = [Interval.SECOND, Interval.SECOND, Interval.MINUTE, Interval.MINUTE] # 注意:SECOND 无法区分 5s/30s,需要自定义枚举,此处用字符串标识 self.custom_intervals = ["5s", "30s", "1m", "15m"] self.bar_map = {iv: None for iv in self.custom_intervals} # 为简单,我们将所有周期视为自定义秒/分钟合成,使用字典存储 super().__init__("ag2606", Exchange.SHFE, []) # 父类暂时不用 self.register_event() def register_event(self): self.event_engine.register("tick", self.process_tick) def process_tick(self, event): tick: TickData = event.data # 过滤合约 if tick.symbol != "ag2606": return # 用自定义合成逻辑(兼容多周期) self.synthesize_bars(tick) def synthesize_bars(self, tick: TickData): """自行管理 K 线合成,推送至 on_bar""" # 实现略(与 MultiPeriodBarGenerator 类似,但区分 5s/30s/1m/15m) # 当一根 K 线完成时,调用 self.on_bar(bar, frequency) pass def on_bar(self, bar: BarData, frequency: str): """K 线生成回调,写入 DolphinDB""" try: table_name = "kline" data = { "datetime": bar.datetime.strftime("%Y.%m.%d %H:%M:%S"), "symbol": bar.symbol, "open": bar.open_price, "high": bar.high_price, "low": bar.low_price, "close": bar.close_price, "volume": bar.volume, "open_interest": bar.open_interest, "frequency": frequency } session.run(f"tableInsert(`dfs://kline_db`{table_name}, {data})") logger.info(f"写入 {frequency} K 线: {bar.datetime}") except Exception as e: logger.error(f"DolphinDB 写入失败: {e}") # ---------- 主函数 ---------- def main(): logger.add("market.log", rotation="1 day") event_engine = EventEngine() main_engine = MainEngine(event_engine) # 添加 CTP 网关 main_engine.add_gateway(CtpGateway) # 配置 CTP 行情连接 (SimNow 测试环境) ctp_setting = { "用户名": "your_userid", "密码": "your_password", "经纪商代码": "9999", "交易服务器": "180.168.146.187:10201", "行情服务器": "180.168.146.187:10211", "产品名称": "simnow_client_test", "授权编码": "0000000000000000", "产品信息": "" } main_engine.connect(ctp_setting, "CTP") # 启动行情引擎 app = MarketDataApp(event_engine) # 订阅行情 main_engine.subscribe(SubscribeRequest("ag2606", Exchange.SHFE), "CTP") # event_engine 启动循环(阻塞) event_engine.start() if __name__ == "__main__": main() ``` ## 4. NautilusTrader 模型推理代码 NautilusTrader 策略从 DolphinDB 读取历史 K 线,并可在回测或实盘中调用推理模型。这里展示一个定时拉取最新 15 分钟 K 线并执行轻量推理的脚本。 ```python import asyncio import pandas as pd from datetime import datetime, timedelta import dolphindb as ddb from nautilus_trader.common.actor import Actor from nautilus_trader.common.config import ActorConfig from nautilus_trader.common.enums import LogLevel from nautilus_trader.data.engine import DataEngine from nautilus_trader.model.data.bar import Bar, BarType from nautilus_trader.model.enums import BarAggregation, InstrumentClass from nautilus_trader.model.identifiers import InstrumentId, Symbol, Venue # ---------- DolphinDB 读取 ---------- class DolphinDBDataLoader: def __init__(self, host, port, user, password): self.session = ddb.Session() self.session.connect(host, port, user, password) def load_bars(self, symbol, frequency, start_time, end_time): script = f""" select datetime, open, high, low, close, volume, open_interest from loadTable('dfs://kline_db','kline') where symbol='{symbol}' and frequency='{frequency}' and datetime between '{start_time}' and '{end_time}' order by datetime """ df = self.session.run(script) if df.empty: return pd.DataFrame() df['datetime'] = pd.to_datetime(df['datetime']) return df.set_index('datetime') # ---------- NautilusTrader 策略 ---------- class DBAggBarActor(Actor): """一个循环拉取 DolphinDB 最新 15 分钟 K 线并推理的 Actor(示例)""" def __init__(self, config: ActorConfig, db_loader: DolphinDBDataLoader): super().__init__(config) self.db_loader = db_loader self.interval = 15 # 拉取间隔(秒) self.frequency = "15m" async def on_start(self): self.log.info("DBAggBarActor 启动") # 启动定时任务 asyncio.create_task(self._run_loop()) async def _run_loop(self): while True: await self._fetch_and_predict() await asyncio.sleep(self.interval) async def _fetch_and_predict(self): symbol = "ag2606" end = datetime.now() start = end - timedelta(hours=4) # 取近期数据用于推理 df = self.db_loader.load_bars(symbol, self.frequency, start.strftime("%Y.%m.%d %H:%M:%S"), end.strftime("%Y.%m.%d %H:%M:%S")) if df.empty: return # 模型推理示例:计算简单均线信号 df['ma5'] = df['close'].rolling(5).mean() df['ma20'] = df['close'].rolling(20).mean() latest = df.iloc[-1] signal = 1 if latest['ma5'] > latest['ma20'] else -1 self.log.info(f"[{latest.name}] 推理信号: {signal}, close={latest['close']}") # 在真实环境中,可以将 signal 通过消息队列发送给交易执行模块 # 或者直接调用 NautilusTrader 的 TradingClient 下单(需连接交易所) ``` ### 启动 NautilusTrader 节点 ```python import asyncio from nautilus_trader.common.component import LiveClock, MessageBus from nautilus_trader.data.engine import DataEngine from nautilus_trader.trader.config import TraderConfig from nautilus_trader.trader.trader import Trader async def main(): clock = LiveClock() bus = MessageBus(clock=clock) data_engine = DataEngine(clock=clock, bus=bus) trader = Trader(config=TraderConfig(), clock=clock, bus=bus, data_engine=data_engine) db_loader = DolphinDBDataLoader("127.0.0.1", 8848, "admin", "123456") actor = DBAggBarActor( config=ActorConfig(name="DBAggBarActor"), db_loader=db_loader ) trader.add_actor(actor) trader.start() await asyncio.Event().wait() # 永久运行 asyncio.run(main()) ``` ## 5. 部署与运行 1. **启动 DolphinDB** ```bash ./dolphindb -config dolphindb.cfg ``` 2. **初始化数据库** (执行 2.3 中的建库建表脚本) 3. **配置并启动行情服务** - 修改 `market_data_engine.py` 中的 CTP 账号、密码等信息 - 运行: ```bash python market_data_engine.py ``` 4. **启动 NautilusTrader 策略节点** - 确认 DolphinDB 中已有积累的 K 线数据 - 运行推理脚本: ```bash python nautilus_strategy.py ``` 5. **(可选) 定时调度** 可将行情服务设为 systemd 服务(Linux)或任务计划,确保 24 小时运行。NautilusTrader 策略同理。 ## 6. 注意事项 - vnpy 与 CTP 版本选择:推荐 vnpy 3.x + vnpy_ctp,SimNow 环境需持续更新行情地址。 - K 线合成器需按交易所交易时间避免非交易时段生成空 K 线。 - DolphinDB Python API 支持高并发写入,单进程写入一般满足需求。 - NautilusTrader 推理可以离线回测或结合实时数据,上述示例为实时拉取,可根据需求改为订阅流式数据。 - 模型推理部分可替换为加载 ONNX/PyTorch 模型,只需在 `_fetch_and_predict` 中加入模型推理逻辑。 以上架构将数据采集、存储与策略推理解耦,便于扩展更多合约和更复杂的模型。