跳转至

核心模块 API

以下 API 文档由源码自动生成,内容始终与代码保持同步。


qka.Data

数据管理类

负责股票数据的获取、缓存和管理,支持多数据源、并发下载和自定义因子计算。

属性:

名称 类型 描述
symbols List[str]

股票代码列表

period str

数据周期,如 '1d'、'1m' 等

adjust str

复权方式,如 'qfq'、'hfq'、'bfq'

factor Callable

因子计算函数

source str

数据源,如 'akshare'、'qmt'

pool_size int

并发下载线程数

datadir Path

数据缓存目录

target_dir Path

目标存储目录

源代码位于: qka/core/data.py
class Data():
    """
    数据管理类

    负责股票数据的获取、缓存和管理,支持多数据源、并发下载和自定义因子计算。

    Attributes:
        symbols (List[str]): 股票代码列表
        period (str): 数据周期,如 '1d'、'1m' 等
        adjust (str): 复权方式,如 'qfq'、'hfq'、'bfq'
        factor (Callable): 因子计算函数
        source (str): 数据源,如 'akshare'、'qmt'
        pool_size (int): 并发下载线程数
        datadir (Path): 数据缓存目录
        target_dir (Path): 目标存储目录
    """

    def __init__(
        self, 
        symbols: Optional[List[str]] = None,
        period: str = '1d',
        adjust: str = 'qfq',
        factor: Callable[[pd.DataFrame], pd.DataFrame] = lambda df: df,
        source: str = 'akshare',
        pool_size: int = 10,
        datadir: Optional[Path] = None
    ):
        """
        初始化数据对象

        Args:
            symbols: 股票代码列表,如 ['000001.SZ', '600000.SH']
            period: 数据周期,如 '1d'(日线)、'1m'(分钟)
            adjust: 复权方式,'qfq'(前复权)、'hfq'(后复权)、'bfq'(不复权)
            factor: 因子计算函数,接收 DataFrame 返回 DataFrame,用于扩展自定义因子
            source: 数据来源,'akshare'(默认)或 'qmt'
            pool_size: 并发下载线程数
            datadir: 缓存根目录,默认为当前工作目录下的 datadir/
        """
        self.symbols = symbols or []
        self.period = period
        self.adjust = adjust
        self.factor = factor
        self.source = source
        self.pool_size = pool_size

        # 初始化缓存目录
        if datadir is None:
            # 默认使用当前工作目录下的 datadir
            self.datadir = Path.cwd() / "datadir"
        else:
            self.datadir = Path(datadir)

        self.datadir.mkdir(parents=True, exist_ok=True)

        self.target_dir = self.datadir / self.source / self.period / (self.adjust or "bfq")
        self.target_dir.mkdir(parents=True, exist_ok=True)

    def _download(self, symbol: str) -> Path:
        """
        下载单个股票的数据

        Args:
            symbol (str): 股票代码

        Returns:
            Path: 数据文件路径
        """
        path = self.target_dir / f"{symbol}.parquet"

        if path.exists():
             return path

        if self.source == 'akshare':
            df = self._get_from_akshare(symbol)
        else:
            df = pd.DataFrame()

        if len(df) == 0:
            return path

        table = pa.Table.from_pandas(df)
        pq.write_table(table, path)

        return path

    def get(self) -> dd.DataFrame:
        """
        获取历史数据

        并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

        Returns:
            dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column}
                          如果没有股票,返回空的 Dask DataFrame
        """
        if not self.symbols:
            return dd.from_pandas(pd.DataFrame(), npartitions=1)

        # 准备缓存目录

        with ThreadPoolExecutor(max_workers=self.pool_size) as executor:
            # 提交下载任务
            futures = {
                executor.submit(self._download, symbol): symbol
                for symbol in self.symbols
            }

            # 添加tqdm进度条
            errors = []
            with tqdm(total=len(self.symbols), desc="下载数据") as pbar:
                for future in as_completed(futures):
                    symbol = futures[future]
                    try:
                        future.result()  # 触发异常(如果有)
                    except Exception as e:
                        errors.append(f"{symbol}: {e}")
                        logger.warning(f"下载 {symbol} 失败: {e}")
                    pbar.update(1)
                    pbar.set_postfix_str(f"当前: {symbol}")

            if errors:
                logger.warning(f"共 {len(errors)} 只股票下载失败: {errors[:3]}...")

        dfs = []
        for symbol in self.symbols:
            parquet_path = self.target_dir / f"{symbol}.parquet"
            if not parquet_path.exists():
                logger.warning(f"数据文件不存在,跳过: {parquet_path}")
                continue
            df = dd.read_parquet(str(parquet_path))
            df = self.factor(df)
            column_mapping = {col: f'{symbol}_{col}' for col in df.columns}
            dfs.append(df.rename(columns=column_mapping))

        if not dfs:
            return dd.from_pandas(pd.DataFrame(), npartitions=1)

        df = dd.concat(dfs, axis=1, join='outer')

        return df

    def _get_from_akshare(self, symbol: str) -> pd.DataFrame:
        """
        从 akshare 获取单个股票的数据。

        Args:
            symbol (str): 股票代码,支持带后缀如 000001.SZ 或不带后缀的 000001

        Returns:
            pd.DataFrame: 股票数据,以 date 为索引,包含 open, high, low, close, volume, amount 列
        """
        column_mapping = {
            "日期": "date",
            "开盘": "open",
            "收盘": "close",
            "最高": "high",
            "最低": "low",
            "成交量": "volume",
            "成交额": "amount",
        }

        # 下载数据
        # akshare 不支持带 .SZ/.SH 后缀,需去除
        clean_symbol = symbol.replace('.SZ', '').replace('.SH', '').replace('.BJ', '')
        df = ak.stock_zh_a_hist(symbol=clean_symbol, period='daily', adjust=self.adjust)

        # 数据标准化处理
        # 1. 标准化列名
        df = df.rename(columns=column_mapping)
        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])

        # 2. 确保数值列为数值类型
        numeric_cols = [c for c in ("open", "high", "low", "close", "volume", "amount") if c in df.columns]
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors="coerce")

        # 3. 只保留需要的列
        mapped_columns = list(column_mapping.values())
        available_columns = [col for col in mapped_columns if col in df.columns]
        df = df[available_columns]

        df = df.set_index('date')
        # 设置索引
        return df

__init__(symbols=None, period='1d', adjust='qfq', factor=lambda df: df, source='akshare', pool_size=10, datadir=None)

初始化数据对象

参数:

名称 类型 描述 默认
symbols Optional[List[str]]

股票代码列表,如 ['000001.SZ', '600000.SH']

None
period str

数据周期,如 '1d'(日线)、'1m'(分钟)

'1d'
adjust str

复权方式,'qfq'(前复权)、'hfq'(后复权)、'bfq'(不复权)

'qfq'
factor Callable[[DataFrame], DataFrame]

因子计算函数,接收 DataFrame 返回 DataFrame,用于扩展自定义因子

lambda df: df
source str

数据来源,'akshare'(默认)或 'qmt'

'akshare'
pool_size int

并发下载线程数

10
datadir Optional[Path]

缓存根目录,默认为当前工作目录下的 datadir/

None
源代码位于: qka/core/data.py
def __init__(
    self, 
    symbols: Optional[List[str]] = None,
    period: str = '1d',
    adjust: str = 'qfq',
    factor: Callable[[pd.DataFrame], pd.DataFrame] = lambda df: df,
    source: str = 'akshare',
    pool_size: int = 10,
    datadir: Optional[Path] = None
):
    """
    初始化数据对象

    Args:
        symbols: 股票代码列表,如 ['000001.SZ', '600000.SH']
        period: 数据周期,如 '1d'(日线)、'1m'(分钟)
        adjust: 复权方式,'qfq'(前复权)、'hfq'(后复权)、'bfq'(不复权)
        factor: 因子计算函数,接收 DataFrame 返回 DataFrame,用于扩展自定义因子
        source: 数据来源,'akshare'(默认)或 'qmt'
        pool_size: 并发下载线程数
        datadir: 缓存根目录,默认为当前工作目录下的 datadir/
    """
    self.symbols = symbols or []
    self.period = period
    self.adjust = adjust
    self.factor = factor
    self.source = source
    self.pool_size = pool_size

    # 初始化缓存目录
    if datadir is None:
        # 默认使用当前工作目录下的 datadir
        self.datadir = Path.cwd() / "datadir"
    else:
        self.datadir = Path(datadir)

    self.datadir.mkdir(parents=True, exist_ok=True)

    self.target_dir = self.datadir / self.source / self.period / (self.adjust or "bfq")
    self.target_dir.mkdir(parents=True, exist_ok=True)

get()

获取历史数据

并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

返回:

类型 描述
DataFrame

dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column} 如果没有股票,返回空的 Dask DataFrame

源代码位于: qka/core/data.py
def get(self) -> dd.DataFrame:
    """
    获取历史数据

    并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

    Returns:
        dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column}
                      如果没有股票,返回空的 Dask DataFrame
    """
    if not self.symbols:
        return dd.from_pandas(pd.DataFrame(), npartitions=1)

    # 准备缓存目录

    with ThreadPoolExecutor(max_workers=self.pool_size) as executor:
        # 提交下载任务
        futures = {
            executor.submit(self._download, symbol): symbol
            for symbol in self.symbols
        }

        # 添加tqdm进度条
        errors = []
        with tqdm(total=len(self.symbols), desc="下载数据") as pbar:
            for future in as_completed(futures):
                symbol = futures[future]
                try:
                    future.result()  # 触发异常(如果有)
                except Exception as e:
                    errors.append(f"{symbol}: {e}")
                    logger.warning(f"下载 {symbol} 失败: {e}")
                pbar.update(1)
                pbar.set_postfix_str(f"当前: {symbol}")

        if errors:
            logger.warning(f"共 {len(errors)} 只股票下载失败: {errors[:3]}...")

    dfs = []
    for symbol in self.symbols:
        parquet_path = self.target_dir / f"{symbol}.parquet"
        if not parquet_path.exists():
            logger.warning(f"数据文件不存在,跳过: {parquet_path}")
            continue
        df = dd.read_parquet(str(parquet_path))
        df = self.factor(df)
        column_mapping = {col: f'{symbol}_{col}' for col in df.columns}
        dfs.append(df.rename(columns=column_mapping))

    if not dfs:
        return dd.from_pandas(pd.DataFrame(), npartitions=1)

    df = dd.concat(dfs, axis=1, join='outer')

    return df

qka.Backtest

QKA回测引擎类

提供基于时间序列的回测功能,支持多股票横截面数据处理, 以及绩效指标计算和可视化。

属性:

名称 类型 描述
data Data

数据对象实例

strategy Strategy

策略对象实例

results DataFrame

回测结果数据

initial_cash float

初始资金

源代码位于: qka/core/backtest.py
class Backtest:
    """
    QKA回测引擎类

    提供基于时间序列的回测功能,支持多股票横截面数据处理,
    以及绩效指标计算和可视化。

    Attributes:
        data (Data): 数据对象实例
        strategy (Strategy): 策略对象实例
        results (pd.DataFrame): 回测结果数据
        initial_cash (float): 初始资金
    """

    def __init__(self, data, strategy):
        """
        初始化回测引擎

        Args:
            data (Data): Data类的实例,包含股票数据
            strategy (Strategy): 策略对象,必须包含on_bar方法
        """
        self.data = data
        self.strategy = strategy
        self.results = None
        self.initial_cash = strategy.broker.cash
        self._benchmark_data = None

    def run(self, benchmark: Optional[str] = None):
        """
        执行回测

        遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策,
        并记录交易后的状态。

        Args:
            benchmark (str, optional): 基准代码,如 '000300.SH'(沪深300)。
                                       如果提供,会下载基准数据用于对比。

        Returns:
            None。回测结果保存在 self.results 中,可通过
            self.summary() 查看绩效指标,self.report() 生成报告。
        """
        # 获取所有股票数据(dask DataFrame)
        df = self.data.get()

        # 加载基准数据
        if benchmark:
            self._load_benchmark(benchmark)

        for date, row in df.iterrows():
            def get(factor):
                """
                获取指定因子的数据

                Args:
                    factor (str): 因子名称,如 'close', 'volume' 等

                Returns:
                    pd.Series: 该因子的所有股票数据,index为股票代码
                """
                s = row[row.index.str.endswith(factor)]
                s.index = s.index.str.replace(f'_{factor}$', '', regex=True)
                return s

            # 调用策略的on_bar方法
            self.strategy.on_bar(date, get)

            # 记录Bar结束后的状态
            self.strategy.broker.on_bar(date, get)

        # 保存回测结果
        self.results = self.strategy.broker.trades

    def _load_benchmark(self, benchmark_code: str):
        """
        加载基准指数数据

        Args:
            benchmark_code: 基准代码,如 '000300.SH'
        """
        try:
            import akshare as ak
            clean_code = benchmark_code.replace('.SH', '').replace('.SZ', '').replace('.BJ', '')
            bm_df = ak.stock_zh_index_daily(symbol=f"sh{clean_code}")
            if bm_df is not None and not bm_df.empty:
                bm_df['date'] = pd.to_datetime(bm_df['date'])
                bm_df = bm_df.set_index('date')
                bm_df = bm_df.sort_index()
                self._benchmark_data = bm_df['close']
                print(f"基准数据加载成功: {benchmark_code}{len(bm_df)} 个交易日")
        except Exception as e:
            print(f"基准数据加载失败: {e}")

    def summary(self) -> dict:
        """
        计算并打印回测绩效指标

        返回包含以下指标的字典:
        - 总收益率、年化收益率、年化波动率
        - 夏普比率、最大回撤、Calmar比率
        - 胜率、盈亏比、交易次数
        - 最终资产、总手续费

        Returns:
            dict: 绩效指标字典
        """
        if self.results is None or self.results.empty:
            print("请先运行回测 (backtest.run())")
            return {}

        totals = self.results['total']
        if len(totals) < 2:
            print("回测数据不足(至少需要2个交易周期)")
            return {}

        # 基本数据
        initial = self.initial_cash
        final = totals.iloc[-1]
        total_return = (final / initial - 1) * 100

        # 交易天数 / 年化
        n_days = len(totals)
        years = n_days / 252  # A股年均约252个交易日

        # 日收益率序列
        daily_returns = totals.pct_change().dropna()
        if len(daily_returns) == 0:
            print("没有足够的收益率数据")
            return {}

        # 年化收益率
        annual_return = (final / initial) ** (1 / years) - 1 if years > 0 else 0

        # 年化波动率
        annual_vol = daily_returns.std() * np.sqrt(252)

        # 夏普比率(无风险利率假设 3%)
        risk_free_rate = 0.03
        sharpe = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0

        # 最大回撤
        cumulative = (1 + daily_returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min() * 100

        # Calmar 比率
        calmar = annual_return / abs(max_drawdown / 100) if max_drawdown != 0 else 0

        # 交易分析
        trades = self.strategy.broker.trade_history
        n_trades = len(trades)

        if n_trades > 0:
            # 统计每笔交易的盈亏
            trade_pnl = []
            buy_prices = {}
            for t in trades:
                if t['action'] == 'buy':
                    if t['symbol'] not in buy_prices:
                        buy_prices[t['symbol']] = []
                    buy_prices[t['symbol']].append((t['size'], t['exec_price'], t['total_cost']))
                elif t['action'] == 'sell':
                    symbol = t['symbol']
                    size = t['size']
                    net_proceeds = t['net_proceeds']
                    # 按先进先出匹配买入
                    if symbol in buy_prices and buy_prices[symbol]:
                        total_buy_cost = 0
                        remaining = size
                        while remaining > 0 and buy_prices[symbol]:
                            b_size, b_price, b_cost = buy_prices[symbol][0]
                            if b_size <= remaining:
                                total_buy_cost += b_cost
                                remaining -= b_size
                                buy_prices[symbol].pop(0)
                            else:
                                ratio = remaining / b_size
                                total_buy_cost += b_cost * ratio
                                buy_prices[symbol][0] = (b_size - remaining, b_price, b_cost * (1 - ratio))
                                remaining = 0
                        pnl = net_proceeds - total_buy_cost
                        trade_pnl.append(pnl)

            win_trades = sum(1 for p in trade_pnl if p > 0)
            win_rate = (win_trades / len(trade_pnl) * 100) if trade_pnl else 0
            avg_win = np.mean([p for p in trade_pnl if p > 0]) if any(p > 0 for p in trade_pnl) else 0
            avg_loss = abs(np.mean([p for p in trade_pnl if p <= 0])) if any(p <= 0 for p in trade_pnl) else 0
            profit_loss_ratio = avg_win / avg_loss if avg_loss > 0 else 0
        else:
            win_rate = 0
            profit_loss_ratio = 0
            trade_pnl = []

        # 总手续费
        total_commission = self.strategy.broker.total_commission

        # 打印报告
        print("=" * 55)
        print("           回测绩效报告")
        print("=" * 55)
        print(f"  初始资金:        RMB {initial:>10,.2f}")
        print(f"  最终资产:        RMB {final:>10,.2f}")
        print(f"  总收益率:         {total_return:>+8.2f}%")
        print(f"  年化收益率:       {annual_return * 100:>+8.2f}%")
        print(f"  年化波动率:       {annual_vol * 100:>8.2f}%")
        print(f"  夏普比率:         {sharpe:>8.2f}")
        print(f"  最大回撤:         {max_drawdown:>8.2f}%")
        print(f"  Calmar比率:       {calmar:>8.2f}")
        print(f"  交易次数:         {n_trades:>8}")
        print(f"  胜率:             {win_rate:>8.2f}%")
        print(f"  盈亏比:           {profit_loss_ratio:>8.2f}")
        print(f"  总手续费:         RMB {total_commission:>10,.2f}")
        print(f"  回测天数:         {n_days:>8} 天")
        print("=" * 55)

        return {
            'initial_cash': initial,
            'final_equity': final,
            'total_return_pct': total_return,
            'annual_return_pct': annual_return * 100,
            'annual_volatility_pct': annual_vol * 100,
            'sharpe_ratio': sharpe,
            'max_drawdown_pct': max_drawdown,
            'calmar_ratio': calmar,
            'total_trades': n_trades,
            'win_rate_pct': win_rate,
            'profit_loss_ratio': profit_loss_ratio,
            'total_commission': total_commission,
            'n_days': n_days,
        }

    def report(self, title: str = "未命名策略", output_path: Optional[str] = None) -> str:
        """
        生成自包含的 HTML 回测报告

        包含绩效指标卡片、净值曲线、回撤曲线、月度收益率热力图、
        交易明细表和回撤分析。可直接在浏览器中打开。

        Args:
            title: 策略名称(显示在报告标题中)
            output_path: 输出 HTML 文件路径。
                         None 则自动保存在 examples/charts/ 下

        Returns:
            str: HTML 文件路径
        """
        from qka.core.report import generate_report

        if self.results is None or self.results.empty:
            print("错误:请先运行回测 (bt.run())")
            return ""

        bm = getattr(self, '_benchmark_data', None)
        return generate_report(
            results=self.results,
            broker=self.strategy.broker,
            initial_cash=self.initial_cash,
            benchmark_data=bm,
            strategy_name=title,
            output_path=output_path,
        )

__init__(data, strategy)

初始化回测引擎

参数:

名称 类型 描述 默认
data Data

Data类的实例,包含股票数据

必需
strategy Strategy

策略对象,必须包含on_bar方法

必需
源代码位于: qka/core/backtest.py
def __init__(self, data, strategy):
    """
    初始化回测引擎

    Args:
        data (Data): Data类的实例,包含股票数据
        strategy (Strategy): 策略对象,必须包含on_bar方法
    """
    self.data = data
    self.strategy = strategy
    self.results = None
    self.initial_cash = strategy.broker.cash
    self._benchmark_data = None

run(benchmark=None)

执行回测

遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策, 并记录交易后的状态。

参数:

名称 类型 描述 默认
benchmark str

基准代码,如 '000300.SH'(沪深300)。 如果提供,会下载基准数据用于对比。

None

返回:

类型 描述

None。回测结果保存在 self.results 中,可通过

self.summary() 查看绩效指标,self.report() 生成报告。

源代码位于: qka/core/backtest.py
def run(self, benchmark: Optional[str] = None):
    """
    执行回测

    遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策,
    并记录交易后的状态。

    Args:
        benchmark (str, optional): 基准代码,如 '000300.SH'(沪深300)。
                                   如果提供,会下载基准数据用于对比。

    Returns:
        None。回测结果保存在 self.results 中,可通过
        self.summary() 查看绩效指标,self.report() 生成报告。
    """
    # 获取所有股票数据(dask DataFrame)
    df = self.data.get()

    # 加载基准数据
    if benchmark:
        self._load_benchmark(benchmark)

    for date, row in df.iterrows():
        def get(factor):
            """
            获取指定因子的数据

            Args:
                factor (str): 因子名称,如 'close', 'volume' 等

            Returns:
                pd.Series: 该因子的所有股票数据,index为股票代码
            """
            s = row[row.index.str.endswith(factor)]
            s.index = s.index.str.replace(f'_{factor}$', '', regex=True)
            return s

        # 调用策略的on_bar方法
        self.strategy.on_bar(date, get)

        # 记录Bar结束后的状态
        self.strategy.broker.on_bar(date, get)

    # 保存回测结果
    self.results = self.strategy.broker.trades

summary()

计算并打印回测绩效指标

返回包含以下指标的字典: - 总收益率、年化收益率、年化波动率 - 夏普比率、最大回撤、Calmar比率 - 胜率、盈亏比、交易次数 - 最终资产、总手续费

返回:

名称 类型 描述
dict dict

绩效指标字典

源代码位于: qka/core/backtest.py
def summary(self) -> dict:
    """
    计算并打印回测绩效指标

    返回包含以下指标的字典:
    - 总收益率、年化收益率、年化波动率
    - 夏普比率、最大回撤、Calmar比率
    - 胜率、盈亏比、交易次数
    - 最终资产、总手续费

    Returns:
        dict: 绩效指标字典
    """
    if self.results is None or self.results.empty:
        print("请先运行回测 (backtest.run())")
        return {}

    totals = self.results['total']
    if len(totals) < 2:
        print("回测数据不足(至少需要2个交易周期)")
        return {}

    # 基本数据
    initial = self.initial_cash
    final = totals.iloc[-1]
    total_return = (final / initial - 1) * 100

    # 交易天数 / 年化
    n_days = len(totals)
    years = n_days / 252  # A股年均约252个交易日

    # 日收益率序列
    daily_returns = totals.pct_change().dropna()
    if len(daily_returns) == 0:
        print("没有足够的收益率数据")
        return {}

    # 年化收益率
    annual_return = (final / initial) ** (1 / years) - 1 if years > 0 else 0

    # 年化波动率
    annual_vol = daily_returns.std() * np.sqrt(252)

    # 夏普比率(无风险利率假设 3%)
    risk_free_rate = 0.03
    sharpe = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0

    # 最大回撤
    cumulative = (1 + daily_returns).cumprod()
    running_max = cumulative.cummax()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = drawdown.min() * 100

    # Calmar 比率
    calmar = annual_return / abs(max_drawdown / 100) if max_drawdown != 0 else 0

    # 交易分析
    trades = self.strategy.broker.trade_history
    n_trades = len(trades)

    if n_trades > 0:
        # 统计每笔交易的盈亏
        trade_pnl = []
        buy_prices = {}
        for t in trades:
            if t['action'] == 'buy':
                if t['symbol'] not in buy_prices:
                    buy_prices[t['symbol']] = []
                buy_prices[t['symbol']].append((t['size'], t['exec_price'], t['total_cost']))
            elif t['action'] == 'sell':
                symbol = t['symbol']
                size = t['size']
                net_proceeds = t['net_proceeds']
                # 按先进先出匹配买入
                if symbol in buy_prices and buy_prices[symbol]:
                    total_buy_cost = 0
                    remaining = size
                    while remaining > 0 and buy_prices[symbol]:
                        b_size, b_price, b_cost = buy_prices[symbol][0]
                        if b_size <= remaining:
                            total_buy_cost += b_cost
                            remaining -= b_size
                            buy_prices[symbol].pop(0)
                        else:
                            ratio = remaining / b_size
                            total_buy_cost += b_cost * ratio
                            buy_prices[symbol][0] = (b_size - remaining, b_price, b_cost * (1 - ratio))
                            remaining = 0
                    pnl = net_proceeds - total_buy_cost
                    trade_pnl.append(pnl)

        win_trades = sum(1 for p in trade_pnl if p > 0)
        win_rate = (win_trades / len(trade_pnl) * 100) if trade_pnl else 0
        avg_win = np.mean([p for p in trade_pnl if p > 0]) if any(p > 0 for p in trade_pnl) else 0
        avg_loss = abs(np.mean([p for p in trade_pnl if p <= 0])) if any(p <= 0 for p in trade_pnl) else 0
        profit_loss_ratio = avg_win / avg_loss if avg_loss > 0 else 0
    else:
        win_rate = 0
        profit_loss_ratio = 0
        trade_pnl = []

    # 总手续费
    total_commission = self.strategy.broker.total_commission

    # 打印报告
    print("=" * 55)
    print("           回测绩效报告")
    print("=" * 55)
    print(f"  初始资金:        RMB {initial:>10,.2f}")
    print(f"  最终资产:        RMB {final:>10,.2f}")
    print(f"  总收益率:         {total_return:>+8.2f}%")
    print(f"  年化收益率:       {annual_return * 100:>+8.2f}%")
    print(f"  年化波动率:       {annual_vol * 100:>8.2f}%")
    print(f"  夏普比率:         {sharpe:>8.2f}")
    print(f"  最大回撤:         {max_drawdown:>8.2f}%")
    print(f"  Calmar比率:       {calmar:>8.2f}")
    print(f"  交易次数:         {n_trades:>8}")
    print(f"  胜率:             {win_rate:>8.2f}%")
    print(f"  盈亏比:           {profit_loss_ratio:>8.2f}")
    print(f"  总手续费:         RMB {total_commission:>10,.2f}")
    print(f"  回测天数:         {n_days:>8} 天")
    print("=" * 55)

    return {
        'initial_cash': initial,
        'final_equity': final,
        'total_return_pct': total_return,
        'annual_return_pct': annual_return * 100,
        'annual_volatility_pct': annual_vol * 100,
        'sharpe_ratio': sharpe,
        'max_drawdown_pct': max_drawdown,
        'calmar_ratio': calmar,
        'total_trades': n_trades,
        'win_rate_pct': win_rate,
        'profit_loss_ratio': profit_loss_ratio,
        'total_commission': total_commission,
        'n_days': n_days,
    }

report(title='未命名策略', output_path=None)

生成自包含的 HTML 回测报告

包含绩效指标卡片、净值曲线、回撤曲线、月度收益率热力图、 交易明细表和回撤分析。可直接在浏览器中打开。

参数:

名称 类型 描述 默认
title str

策略名称(显示在报告标题中)

'未命名策略'
output_path Optional[str]

输出 HTML 文件路径。 None 则自动保存在 examples/charts/ 下

None

返回:

名称 类型 描述
str str

HTML 文件路径

源代码位于: qka/core/backtest.py
def report(self, title: str = "未命名策略", output_path: Optional[str] = None) -> str:
    """
    生成自包含的 HTML 回测报告

    包含绩效指标卡片、净值曲线、回撤曲线、月度收益率热力图、
    交易明细表和回撤分析。可直接在浏览器中打开。

    Args:
        title: 策略名称(显示在报告标题中)
        output_path: 输出 HTML 文件路径。
                     None 则自动保存在 examples/charts/ 下

    Returns:
        str: HTML 文件路径
    """
    from qka.core.report import generate_report

    if self.results is None or self.results.empty:
        print("错误:请先运行回测 (bt.run())")
        return ""

    bm = getattr(self, '_benchmark_data', None)
    return generate_report(
        results=self.results,
        broker=self.strategy.broker,
        initial_cash=self.initial_cash,
        benchmark_data=bm,
        strategy_name=title,
        output_path=output_path,
    )

qka.Strategy

Bases: ABC

策略抽象基类

所有自定义策略都应该继承此类,并实现on_bar方法。

属性:

名称 类型 描述
broker Broker

交易经纪商实例,用于执行交易操作

源代码位于: qka/core/strategy.py
class Strategy(ABC):
    """
    策略抽象基类

    所有自定义策略都应该继承此类,并实现on_bar方法。

    Attributes:
        broker (Broker): 交易经纪商实例,用于执行交易操作
    """

    def __init__(self, cash: float = 100000.0):
        """
        初始化策略

        Args:
            cash: 初始资金,默认 10 万元
        """
        self.broker = Broker(initial_cash=cash)

    @abstractmethod
    def on_bar(self, date, get):
        """
        每个bar的处理逻辑,必须由子类实现

        Args:
            date: 当前时间戳
            get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
        """
        pass

__init__(cash=100000.0)

初始化策略

参数:

名称 类型 描述 默认
cash float

初始资金,默认 10 万元

100000.0
源代码位于: qka/core/strategy.py
def __init__(self, cash: float = 100000.0):
    """
    初始化策略

    Args:
        cash: 初始资金,默认 10 万元
    """
    self.broker = Broker(initial_cash=cash)

on_bar(date, get) abstractmethod

每个bar的处理逻辑,必须由子类实现

参数:

名称 类型 描述 默认
date

当前时间戳

必需
get

获取因子数据的函数,格式为 get(factor_name) -> pd.Series

必需
源代码位于: qka/core/strategy.py
@abstractmethod
def on_bar(self, date, get):
    """
    每个bar的处理逻辑,必须由子类实现

    Args:
        date: 当前时间戳
        get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
    """
    pass

qka.Broker

虚拟交易经纪商类

管理资金、持仓和交易记录,提供买入卖出操作接口。 支持佣金、印花税、滑点等真实交易成本模拟。

属性:

名称 类型 描述
cash float

可用现金

positions Dict

持仓记录

trade_history List

交易历史记录

commission_rate float

佣金费率

stamp_duty_rate float

印花税费率(仅卖出)

slippage float

滑点比率

total_commission float

累计佣金

total_stamp_duty float

累计印花税

total_slippage_cost float

累计滑点成本

trades DataFrame

逐日状态记录

源代码位于: qka/core/broker.py
class Broker:
    """
    虚拟交易经纪商类

    管理资金、持仓和交易记录,提供买入卖出操作接口。
    支持佣金、印花税、滑点等真实交易成本模拟。

    Attributes:
        cash (float): 可用现金
        positions (Dict): 持仓记录
        trade_history (List): 交易历史记录
        commission_rate (float): 佣金费率
        stamp_duty_rate (float): 印花税费率(仅卖出)
        slippage (float): 滑点比率
        total_commission (float): 累计佣金
        total_stamp_duty (float): 累计印花税
        total_slippage_cost (float): 累计滑点成本
        trades (pd.DataFrame): 逐日状态记录
    """

    def __init__(self, initial_cash=100000.0,
                 commission_rate=DEFAULT_COMMISSION_RATE,
                 stamp_duty_rate=DEFAULT_STAMP_DUTY_RATE,
                 slippage=DEFAULT_SLIPPAGE):
        """
        初始化Broker

        Args:
            initial_cash (float): 初始资金,默认10万元
            commission_rate (float): 佣金费率,默认万2.5
            stamp_duty_rate (float): 印花税费率(仅卖出),默认万5
            slippage (float): 滑点比率,默认0.1%
        """
        self.cash = initial_cash
        self.positions = {}
        self.trade_history = []
        self.timestamp = None

        self.commission_rate = commission_rate
        self.stamp_duty_rate = stamp_duty_rate
        self.slippage = slippage

        self.total_commission = 0.0
        self.total_stamp_duty = 0.0
        self.total_slippage_cost = 0.0

        self.trades = pd.DataFrame(columns=[
            'cash', 'value', 'total', 'positions', 'trades'
        ])

    def on_bar(self, date, get):
        """
        Bar结束时记录当前状态。

        Args:
            date: 当前时间戳
            get: 获取因子数据的函数
        """
        self.timestamp = date
        total_value = self.cash
        position_summary = {}
        for symbol, pos in self.positions.items():
            price = get('close')
            if symbol in price.index:
                current_price = price[symbol]
                market_value = pos['size'] * current_price
                total_value += market_value
                position_summary[symbol] = {
                    'size': pos['size'],
                    'avg_price': pos['avg_price'],
                    'current_price': current_price,
                    'market_value': market_value,
                    'profit_pct': (current_price / pos['avg_price'] - 1) * 100 if pos['avg_price'] > 0 else 0,
                }

        self.trades.loc[self.timestamp] = {
            'cash': self.cash,
            'value': total_value - self.cash,
            'total': total_value,
            'positions': position_summary,
            'trades': list(self.trade_history),
        }

    def buy(self, symbol: str, price: float, size: int) -> bool:
        """
        买入操作

        考虑滑点(买入价上移)和佣金(最低 5 元)。

        Args:
            symbol (str): 交易标的代码
            price (float): 市价
            size (int): 买入数量

        Returns:
            bool: 交易是否成功
        """
        if size <= 0:
            print(f"买入数量必须大于 0!当前: {size}")
            return False

        exec_price = price * (1 + self.slippage)
        amount = exec_price * size
        if self.commission_rate > 0:
            commission = max(amount * self.commission_rate, MIN_COMMISSION)
        else:
            commission = 0.0
        total_cost = amount + commission

        if self.cash < total_cost:
            print(f"资金不足!需要 {total_cost:.2f}(佣金 {commission:.2f}),当前可用 {self.cash:.2f}")
            return False

        # 执行买入
        self.cash -= total_cost
        self.total_commission += commission
        self.total_slippage_cost += amount - price * size

        # 更新持仓(按实际成交价记录成本)
        if symbol in self.positions:
            old = self.positions[symbol]
            new_total = old['size'] * old['avg_price'] + amount
            new_size = old['size'] + size
            self.positions[symbol] = {'size': new_size, 'avg_price': new_total / new_size}
        else:
            self.positions[symbol] = {'size': size, 'avg_price': exec_price}

        self.trade_history.append({
            'action': 'buy', 'symbol': symbol,
            'price': price, 'exec_price': exec_price,
            'size': size, 'amount': amount,
            'commission': commission, 'total_cost': total_cost,
            'timestamp': self.timestamp,
        })

        print(f"买入成功: {symbol} {size}股 @ {exec_price:.2f},花费 {total_cost:.2f}(佣金 {commission:.2f})")
        return True

    def sell(self, symbol: str, price: float, size: int) -> bool:
        """
        卖出操作

        考虑滑点(卖出价下移)、佣金(最低 5 元)和印花税。

        Args:
            symbol (str): 交易标的代码
            price (float): 市价
            size (int): 卖出数量

        Returns:
            bool: 交易是否成功
        """
        if size <= 0:
            print(f"卖出数量必须大于 0!当前: {size}")
            return False

        if symbol not in self.positions:
            print(f"没有 {symbol} 的持仓!")
            return False

        position = self.positions[symbol]
        if position['size'] < size:
            print(f"持仓不足!当前持有 {position['size']},尝试卖出 {size}")
            return False

        exec_price = price * (1 - self.slippage)
        amount = exec_price * size
        if self.commission_rate > 0:
            commission = max(amount * self.commission_rate, MIN_COMMISSION)
        else:
            commission = 0.0
        stamp_duty = amount * self.stamp_duty_rate
        net_proceeds = amount - commission - stamp_duty

        # 执行卖出
        self.cash += net_proceeds
        self.total_commission += commission
        self.total_stamp_duty += stamp_duty
        self.total_slippage_cost += price * size - amount

        # 更新持仓
        if position['size'] == size:
            del self.positions[symbol]
        else:
            self.positions[symbol]['size'] -= size

        self.trade_history.append({
            'action': 'sell', 'symbol': symbol,
            'price': price, 'exec_price': exec_price,
            'size': size, 'amount': amount,
            'commission': commission, 'stamp_duty': stamp_duty,
            'net_proceeds': net_proceeds,
            'timestamp': self.timestamp,
        })

        print(f"卖出成功: {symbol} {size}股 @ {exec_price:.2f},获得 {net_proceeds:.2f}(佣金 {commission:.2f} + 印花税 {stamp_duty:.2f})")
        return True

    def get(self, factor: str, timestamp=None) -> Any:
        """
        从trades DataFrame中获取数据

        Args:
            factor (str): 列名,可选 'cash', 'value', 'total', 'positions', 'trades'
            timestamp: 时间戳,为None则使用当前时间戳

        Returns:
            Any: 对应列的数据,不存在则返回None
        """
        ts = timestamp if timestamp is not None else self.timestamp
        if ts is None or ts not in self.trades.index:
            return None
        if factor not in self.trades.columns:
            return None
        return self.trades.at[ts, factor]

__init__(initial_cash=100000.0, commission_rate=DEFAULT_COMMISSION_RATE, stamp_duty_rate=DEFAULT_STAMP_DUTY_RATE, slippage=DEFAULT_SLIPPAGE)

初始化Broker

参数:

名称 类型 描述 默认
initial_cash float

初始资金,默认10万元

100000.0
commission_rate float

佣金费率,默认万2.5

DEFAULT_COMMISSION_RATE
stamp_duty_rate float

印花税费率(仅卖出),默认万5

DEFAULT_STAMP_DUTY_RATE
slippage float

滑点比率,默认0.1%

DEFAULT_SLIPPAGE
源代码位于: qka/core/broker.py
def __init__(self, initial_cash=100000.0,
             commission_rate=DEFAULT_COMMISSION_RATE,
             stamp_duty_rate=DEFAULT_STAMP_DUTY_RATE,
             slippage=DEFAULT_SLIPPAGE):
    """
    初始化Broker

    Args:
        initial_cash (float): 初始资金,默认10万元
        commission_rate (float): 佣金费率,默认万2.5
        stamp_duty_rate (float): 印花税费率(仅卖出),默认万5
        slippage (float): 滑点比率,默认0.1%
    """
    self.cash = initial_cash
    self.positions = {}
    self.trade_history = []
    self.timestamp = None

    self.commission_rate = commission_rate
    self.stamp_duty_rate = stamp_duty_rate
    self.slippage = slippage

    self.total_commission = 0.0
    self.total_stamp_duty = 0.0
    self.total_slippage_cost = 0.0

    self.trades = pd.DataFrame(columns=[
        'cash', 'value', 'total', 'positions', 'trades'
    ])

on_bar(date, get)

Bar结束时记录当前状态。

参数:

名称 类型 描述 默认
date

当前时间戳

必需
get

获取因子数据的函数

必需
源代码位于: qka/core/broker.py
def on_bar(self, date, get):
    """
    Bar结束时记录当前状态。

    Args:
        date: 当前时间戳
        get: 获取因子数据的函数
    """
    self.timestamp = date
    total_value = self.cash
    position_summary = {}
    for symbol, pos in self.positions.items():
        price = get('close')
        if symbol in price.index:
            current_price = price[symbol]
            market_value = pos['size'] * current_price
            total_value += market_value
            position_summary[symbol] = {
                'size': pos['size'],
                'avg_price': pos['avg_price'],
                'current_price': current_price,
                'market_value': market_value,
                'profit_pct': (current_price / pos['avg_price'] - 1) * 100 if pos['avg_price'] > 0 else 0,
            }

    self.trades.loc[self.timestamp] = {
        'cash': self.cash,
        'value': total_value - self.cash,
        'total': total_value,
        'positions': position_summary,
        'trades': list(self.trade_history),
    }

buy(symbol, price, size)

买入操作

考虑滑点(买入价上移)和佣金(最低 5 元)。

参数:

名称 类型 描述 默认
symbol str

交易标的代码

必需
price float

市价

必需
size int

买入数量

必需

返回:

名称 类型 描述
bool bool

交易是否成功

源代码位于: qka/core/broker.py
def buy(self, symbol: str, price: float, size: int) -> bool:
    """
    买入操作

    考虑滑点(买入价上移)和佣金(最低 5 元)。

    Args:
        symbol (str): 交易标的代码
        price (float): 市价
        size (int): 买入数量

    Returns:
        bool: 交易是否成功
    """
    if size <= 0:
        print(f"买入数量必须大于 0!当前: {size}")
        return False

    exec_price = price * (1 + self.slippage)
    amount = exec_price * size
    if self.commission_rate > 0:
        commission = max(amount * self.commission_rate, MIN_COMMISSION)
    else:
        commission = 0.0
    total_cost = amount + commission

    if self.cash < total_cost:
        print(f"资金不足!需要 {total_cost:.2f}(佣金 {commission:.2f}),当前可用 {self.cash:.2f}")
        return False

    # 执行买入
    self.cash -= total_cost
    self.total_commission += commission
    self.total_slippage_cost += amount - price * size

    # 更新持仓(按实际成交价记录成本)
    if symbol in self.positions:
        old = self.positions[symbol]
        new_total = old['size'] * old['avg_price'] + amount
        new_size = old['size'] + size
        self.positions[symbol] = {'size': new_size, 'avg_price': new_total / new_size}
    else:
        self.positions[symbol] = {'size': size, 'avg_price': exec_price}

    self.trade_history.append({
        'action': 'buy', 'symbol': symbol,
        'price': price, 'exec_price': exec_price,
        'size': size, 'amount': amount,
        'commission': commission, 'total_cost': total_cost,
        'timestamp': self.timestamp,
    })

    print(f"买入成功: {symbol} {size}股 @ {exec_price:.2f},花费 {total_cost:.2f}(佣金 {commission:.2f})")
    return True

sell(symbol, price, size)

卖出操作

考虑滑点(卖出价下移)、佣金(最低 5 元)和印花税。

参数:

名称 类型 描述 默认
symbol str

交易标的代码

必需
price float

市价

必需
size int

卖出数量

必需

返回:

名称 类型 描述
bool bool

交易是否成功

源代码位于: qka/core/broker.py
def sell(self, symbol: str, price: float, size: int) -> bool:
    """
    卖出操作

    考虑滑点(卖出价下移)、佣金(最低 5 元)和印花税。

    Args:
        symbol (str): 交易标的代码
        price (float): 市价
        size (int): 卖出数量

    Returns:
        bool: 交易是否成功
    """
    if size <= 0:
        print(f"卖出数量必须大于 0!当前: {size}")
        return False

    if symbol not in self.positions:
        print(f"没有 {symbol} 的持仓!")
        return False

    position = self.positions[symbol]
    if position['size'] < size:
        print(f"持仓不足!当前持有 {position['size']},尝试卖出 {size}")
        return False

    exec_price = price * (1 - self.slippage)
    amount = exec_price * size
    if self.commission_rate > 0:
        commission = max(amount * self.commission_rate, MIN_COMMISSION)
    else:
        commission = 0.0
    stamp_duty = amount * self.stamp_duty_rate
    net_proceeds = amount - commission - stamp_duty

    # 执行卖出
    self.cash += net_proceeds
    self.total_commission += commission
    self.total_stamp_duty += stamp_duty
    self.total_slippage_cost += price * size - amount

    # 更新持仓
    if position['size'] == size:
        del self.positions[symbol]
    else:
        self.positions[symbol]['size'] -= size

    self.trade_history.append({
        'action': 'sell', 'symbol': symbol,
        'price': price, 'exec_price': exec_price,
        'size': size, 'amount': amount,
        'commission': commission, 'stamp_duty': stamp_duty,
        'net_proceeds': net_proceeds,
        'timestamp': self.timestamp,
    })

    print(f"卖出成功: {symbol} {size}股 @ {exec_price:.2f},获得 {net_proceeds:.2f}(佣金 {commission:.2f} + 印花税 {stamp_duty:.2f})")
    return True

get(factor, timestamp=None)

从trades DataFrame中获取数据

参数:

名称 类型 描述 默认
factor str

列名,可选 'cash', 'value', 'total', 'positions', 'trades'

必需
timestamp

时间戳,为None则使用当前时间戳

None

返回:

名称 类型 描述
Any Any

对应列的数据,不存在则返回None

源代码位于: qka/core/broker.py
def get(self, factor: str, timestamp=None) -> Any:
    """
    从trades DataFrame中获取数据

    Args:
        factor (str): 列名,可选 'cash', 'value', 'total', 'positions', 'trades'
        timestamp: 时间戳,为None则使用当前时间戳

    Returns:
        Any: 对应列的数据,不存在则返回None
    """
    ts = timestamp if timestamp is not None else self.timestamp
    if ts is None or ts not in self.trades.index:
        return None
    if factor not in self.trades.columns:
        return None
    return self.trades.at[ts, factor]