Advertisement

经济数据预测 | Python实现机器学习(MLP、XGBoost)金融市场预测

阅读量:

本文介绍了一种基于Python的机器学习方法用于金融市场数据预测,具体涉及两种模型:MLP(多层感知机)和XGBoost(梯度提升树)。在“程序设计”部分提到训练数据包含匿名特征和响应变量(如交易动作),并使用自定义的自动编码器-MLP模型进行特征提取与分类任务;同时结合交叉验证策略以避免时间序列数据的时间延迟问题,并通过多次随机种子训练以减少预测方差。此外,在“学习总结”中指出了制定有效交易策略的挑战性及市场波动性带来的不确定性因素。

经济数据预测 | Python实现机器学习(MLP、XGBoost)金融市场预测

目录

经济数据预测 | 使用Python在金融市场上应用机器学习算法(MLP、XGBoost)进行行为分析

复制代码
  * 学习总结
  * 参考资料

基本介绍

掌握现代股市投资的有效方法既需要敏锐的洞察力(技巧),又需要稳健的心理素质(理性)。只有实现技术分析与心理博弈的完美融合(协调),才能具备现代投资所需的所有核心能力(装备),从而在金融市场中立于不败之地(天下无敌)。
投资者的行为模式深深影响着市场的发展进程(极重要的一环)。深入理解投资者心理演变过程(来龙去脉),便能准确把握市场运行节奏(脉动)。
在一个完全有效信息传递机制下,在一个完全有效的市场中,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,在一个完全有效信息传递机制下,
在一个完全有效的市场中,
在一个完全有效的市场中,
在一个完全有效的市场中,
在一个完全有效的市场中,
在一个 completely efficient market 中,
买家与卖家将都能获得充分的信息支持(买卖双方都能获得足够的必要条件)。
因此
因此
因此
因此
因此
在这种完美的条件下,
在这种完美的条件下,
在这种完美的条件下,
在这种完美的条件下,
在这种完美的条件下,
资产的价格将始终与其实际价值相符(不会偏离其真实价值水平)。
然而现实世界的金融市场往往并非如此完美无缺。
制定交易策略以发现并利用市场的不合理现象是一项极具挑战性的任务。

程序设计

训练集 包含一组脱敏特征 数据库模拟真实的股票市场动态 每个记录都对应一个潜在的交易机会 该系统会生成一个action标记值 1表示建议进行交易 0则表示应避免此操作 每笔交易都有其相应的权重值和响应指标 它们共同反映了交易的结果 日期字段是一个整数值 时间排序字段则标识了事件发生顺序 在原始数据集中 不包含任何隐性信息的数据特征 这些特征可以从features.csv文件中获取 在训练集train.csv中 除了基础响应值外 还提供了resp_{1,2,3,4}等指标 这些指标分别对应不同时间段内的回报情况 它们不会被包含在测试集中以避免评估偏差 尽管如此 数据集中特意加入了weight=0的情况 这种情况不会影响评分结果

测试阶段 比赛将使用约100万行历史记录构成的数据样本 这些数据将被用于模型训练 随着系统运行 预测阶段将采用实时更新的数据流 来支持在线决策

MLP
1

AE MLP采用了自动编码器以提取分类特征。
自编码器是一种神经网络模型,
它由 encoder 和 decoder 子系统构成。
经过训练后提取出的 encoder 模型得以保存以便后续使用。
随后可被用于训练各种机器学习模型。
整个网络架构划分为两个主要部分:左侧是自编码机部分,
右侧是多层感知机预测模块。
左侧 Autoencoder 部分对传统架构进行了优化,
在输入 encoder 之前加入了高斯噪声处理以缓解过拟合问题。
同时在 decoder 输出之后不仅采用了传统的 MSE Loss 损失函数,
还增加了分类模块来确保生成特征可用于分类任务。
右侧 MLP 部分则接受每个样本原始输入与压缩表示拼接后的数据作为输入层 fed into 多层感知机进行特征提取与分类任务处理。

复制代码
    def create_ae_mlp(num_columns, 
                  num_labels, 
                  hidden_units, 
                  dropout_rates, 
                  ls = 1e-2, 
                  lr = 1e-3):
    # 定义输入
    inp = tf.keras.layers.Input(shape = (num_columns, ))
    x0 = tf.keras.layers.BatchNormalization()(inp)
    # 高斯加噪
    noise_x0 = tf.keras.layers.GaussianNoise(dropout_rates[0])(x0)
    # 编码器
    encoder = tf.keras.layers.Dense(hidden_units[0])(encoder)
    encoder = tf.keras.layers.BatchNormalization()(encoder)
    encoder = tf.keras.layers.Activation('swish')(encoder)
    # 解码器
    decoder = tf.keras.layers.Dropout(dropout_rates[1])(encoder)
    decoder = tf.keras.layers.Dense(num_columns, name = 'decoder')(decoder)
    # Classification Layer
    x_ae = tf.keras.layers.Dense(hidden_units[1])(decoder)
    x_ae = tf.keras.layers.BatchNormalization()(x_ae)
    x_ae = tf.keras.layers.Activation('swish')(x_ae)
    x_ae = tf.keras.layers.Dropout(dropout_rates[2])(x_ae)
    # Autoencoder的分类器
    out_ae = tf.keras.layers.Dense(num_labels,
                                   activation='sigmoid', 
                                   name = 'ae_action')(x_ae)
    # 拼接原始输入和编码器输入
    x = tf.keras.layers.Concatenate()([x0, encoder])
    # MLP的网络层
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(dropout_rates[3])(x)
    for i in range(2, len(hidden_units)):
        x = tf.keras.layers.Dense(hidden_units[i])(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation('swish')(x)
        x = tf.keras.layers.Dropout(dropout_rates[i + 2])(x)
    # MLP的分类器
    out = tf.keras.layers.Dense(num_labels, activation = 'sigmoid', name = 'action')(x)
    # 定义输入输出
    model = tf.keras.models.Model(inputs = inp, outputs = [decoder, out_ae, out])
    # 定义优化器,损失函数和Metrics
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr),
                  loss = {'decoder': tf.keras.losses.MeanSquaredError(), 
                          'ae_action': tf.keras.losses.BinaryCrossentropy(label_smoothing = ls),
                          'action': tf.keras.losses.BinaryCrossentropy(label_smoothing = ls), 
                         },
                  metrics = {'decoder': tf.keras.metrics.MeanAbsoluteError(name = 'MAE'), 
                             'ae_action': tf.keras.metrics.AUC(name = 'AUC'), 
                             'action': tf.keras.metrics.AUC(name = 'AUC'), 
                            }, 
                 )
    
    return model

在金融任务中(段落1),由于任务数据为时间序列性质(句子1),模型的训练与应用不可避免地存在时间延迟(句子2)。作者采用PurgedGroupTimeSeriesSplit方法(句子3)进行交叉验证划分(句子4)。如图所示(段落2),该技术方法支持对时序数据进行科学划分(句子1),从而有效避免了验证集包含训练阶段之前数据的问题(句子2)。此外,在实际应用中(段落2),该研究还定义了训练集与验证集之间的时间间隔(句子3)。具体而言(段落2),本文采用了5折、31期的交叉验证划分策略(句子4)。

2
复制代码
    import numpy as np
    from sklearn.model_selection import KFold
    from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples
    from sklearn.utils.validation import _deprecate_positional_args
    
    
    # modified code for group gaps; source
    # https://github.com/getgaurav2/scikit-learn/blob/d4a3af5cc9da3a76f0266932644b884c99724c57/sklearn/model_selection/_split.py#L2243
    class PurgedGroupTimeSeriesSplit(_BaseKFold):
    """
    Parameters
    ----------
    n_splits : int, default=5
        Number of splits. Must be at least 2.
    max_train_group_size : int, default=Inf
        Maximum group size for a single training set.
    group_gap : int, default=None
        Gap between train and test
    max_test_group_size : int, default=Inf
        We discard this number of groups from the end of each train split
    """
    
    @_deprecate_positional_args
    def __init__(self,
                 n_splits=5,
                 *,
                 max_train_group_size=np.inf,
                 max_test_group_size=np.inf,
                 group_gap=None,
                 verbose=False
                 ):
        super().__init__(n_splits, shuffle=False, random_state=None)
        self.max_train_group_size = max_train_group_size
        self.group_gap = group_gap
        self.max_test_group_size = max_test_group_size
        self.verbose = verbose
    
    def split(self, X, y=None, groups=None):
        """Generate indices to split data into training and test set.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where n_samples is the number of samples
            and n_features is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Group labels for the samples used while splitting the dataset into
            train/test set.
        Yields
        ------
        train : ndarray
            The training set indices for that split.
        test : ndarray
            The testing set indices for that split.
        """
        if groups is None:
            raise ValueError(
                "The 'groups' parameter should not be None")
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        group_gap = self.group_gap
        max_test_group_size = self.max_test_group_size
        max_train_group_size = self.max_train_group_size
        n_folds = n_splits + 1
        group_dict = {}
        u, ind = np.unique(groups, return_index=True)
        unique_groups = u[np.argsort(ind)]
        n_samples = _num_samples(X)
        n_groups = _num_samples(unique_groups)
        for idx in np.arange(n_samples):
            if (groups[idx] in group_dict):
                group_dict[groups[idx]].append(idx)
            else:
                group_dict[groups[idx]] = [idx]
        if n_folds > n_groups:
            raise ValueError(
                ("Cannot have number of folds={0} greater than"
                 " the number of groups={1}").format(n_folds,
                                                     n_groups))
    
        group_test_size = min(n_groups // n_folds, max_test_group_size)
        group_test_starts = range(n_groups - n_splits * group_test_size,
                                  n_groups, group_test_size)
        for group_test_start in group_test_starts:
            train_array = []
            test_array = []
    
            group_st = max(0, group_test_start - group_gap - max_train_group_size)
            for train_group_idx in unique_groups[group_st:(group_test_start - group_gap)]:
                train_array_tmp = group_dict[train_group_idx]
    
                train_array = np.sort(np.unique(
                    np.concatenate((train_array,
                                    train_array_tmp)),
                    axis=None), axis=None)
    
            train_end = train_array.size
    
            for test_group_idx in unique_groups[group_test_start:
            group_test_start +
            group_test_size]:
                test_array_tmp = group_dict[test_group_idx]
                test_array = np.sort(np.unique(
                    np.concatenate((test_array,
                                    test_array_tmp)),
                    axis=None), axis=None)
    
            test_array = test_array[group_gap:]
    
            if self.verbose > 0:
                pass
    
            yield [int(i) for i in train_array], [int(i) for i in test_array]
  • 采用 swish 激活函数而非 ReLU 以防止"死亡神经元"并实现梯度平滑;
  • 通过 3 种不同随机种子训练模型后取平均值来降低预测方差;
  • 主要采用在最后两次交叉验证拆分中使用的不同随机种子下的模型训练;
  • 利用 MLP 的 BCE 损失实施早停策略;
  • 借助 Hyperopt 实现最佳超参数配置。
Xgboost

相对于其他方法而言,Xgboost相对简单,并且允许用户设置关键参数以优化性能。具体来说,在实际应用中可以通过设定不同的随机种子训练出三个模型,并根据这些模型的表现来选择最优配置。代码如下:

复制代码
    X = train.loc[:, train.columns.str.contains('feature')].values
    y = train.loc[:, 'action'].astype('int').values
    
    X_ = X
    y_ = train.loc[:, 'action3'].astype('int').values
    
    clf1 = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=11,
    learning_rate=0.05,
    subsample=0.90,
    colsample_bytree=0.7,
    missing=-999,
    random_state=21,
    tree_method='gpu_hist',  # THE MAGICAL PARAMETER
    reg_alpha=10,
    reg_lambda=10,
    )
    
    clf1.fit(X_, y_)
    
    clf2 = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=11,
    learning_rate=0.05,
    subsample=0.90,
    colsample_bytree=0.7,
    missing=-999,
    random_state=210,
    tree_method='gpu_hist',  # THE MAGICAL PARAMETER
    reg_alpha=10,
    reg_lambda=10,
    )
    
    
    clf2.fit(X_, y_)
    
    clf3 = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=11,
    learning_rate=0.05,
    subsample=0.90,
    colsample_bytree=0.7,
    missing=-999,
    random_state=2010,
    tree_method='gpu_hist',  # THE MAGICAL PARAMETER
    reg_alpha=10,
    reg_lambda=10,
    )
    
    clf3.fit(X_, y_)

学习总结

开发一套能够识别市场无效性的交易策略是一项高难度的任务。无论当前 profitable 策略如何,在未来其持续盈利能力并不能保证。由于市场存在波动性特征,在每次交易时准确预测其盈利能力便变得不可靠。这种不确定性导致我们难以将投资者获得的成功回报归因于运气因素还是系统本身的优化。

参考资料

[1]
[2]
[3]

全部评论 (0)

还没有任何评论哟~