Advertisement

金 融 量 化 分 析 • JoinQuant • 第 二 篇

阅读量:

第 二 篇

  • 一、自定义数据处理模块

    • (一)、自定义获取所有A股列表模块
    • (二)、自定义获取单个股票行情行情数据模块
    • (三)、自定义导出股票相关数据模块
    • (四)、自定义初始化股票行情数据库
    • (五)、自定义以增量补全的方式获取行情数据模块
    • (六)、自定义读取【 C S V 】数据模块
    • (七)、自定义按时间读取 【 C S V 】 数据模块
    • (八)、自定义读取【 E x c e l 】数据模块
    • (九)、自定义按时间读取【 E x c e l 】数据模块
    • (十)、自定义将数据转换股票行情指定周期模块
    • (十 一)、自定义获取单个股票财务指标模块
    • (十 二)、自定义获取单个股票估值指标模块
    • (十 三)、自定义获取单个股票资产负债指标数据模块
    • (十 四)、自定义获取单个利润数据指标数据模块
    • (十 五)、自定义获取单个股票券商业专项指标数据模块
    • (十 六)、自定义获取单个股票保险业专项指标数据模块
    • (十 七)、自定义获取单个股票现金流指标数据模块
    • (十 八)、自定义获取单个股票银行业专项指标模块
    • (十 九)、自定义涨跌幅计算模块
    • (二十)、自定义获取股池模块
  • 总结

    • 代码汇总

一、自定义数据处理模块

(一)、自定义获取所有A股列表模块

代码如下:

复制代码
    # 获取所有A股列表模块
    def get_stock_list():
    """
    这是获取所有股票代码的函数 
    """
    # 使用获取股票基本信息函数——获取所有股票代码
    All_Stock_ID_List = list(get_all_securities(['stock']).index)
    # 返回所有股票代码
    return All_Stock_ID_List

(二)、自定义获取单个股票行情行情数据模块

代码如下:

复制代码
    # 获得单个股票行情数据模块
    def get_single_price(stock_code,start_date = None, end_date =None, timefrequency='1d'):
    """ 
    stock_code :'股票代码'
    timefrequency : '1d'(每日)、'1m'(每分钟)
    start_date : '年-月-日'(开始时间)
    end_date : '年-月-日'(结束时间)
    """ 
    # 如果 start_date 为 None 时,则 start_date 取 上市时间
    if start_date is None:
        start_date = get_security_info(stock_code).start_date
    if end_date is None:
        end_date = datetime.datetime.today()
    # 使用获取行情函数
    data = get_price(security = stock_code, start_date = start_date, end_date = end_date, frequency = timefrequency, fill_paused = True, panel = True)
    # 返回目标行情数据
    return data

(三)、自定义导出股票相关数据模块

代码如下:

复制代码
    # 导出股票相关数据模块
    def export_data(data,filename,type,mode = None):
    """ 
    type : 存储的文件夹的名称[Finace/Price]
    filename : 文件名称
    data : 数据    
    """
    finaPath = absolutePath + type + '\ ' + filename + '.csv'
    data.index.names=['date']
    if mode == 'a':
        data.to_csv(finaPath, mode = mode ,header = False )
        # # 删除重复值
        # 读取已存在数据
        data = pd.read_csv(finaPath)
        # 删除重复值
        data = data.drop_duplicates(subset = ['date'])
        # 存储非重复数据
        data.to_csv(finaPath,index=False)
    else:
        data.to_csv(finaPath)
    print(f'股票 :【 {filename} 】 数据封装成功,封装路径:',finaPath)

(四)、自定义初始化股票行情数据库

代码如下:

复制代码
    def init_db():
    """ 
    注意 : 什么都不用填
    """
    # 获取所有股票代码
    stocks_codes = get_stock_list()
    # 存储为CSV文件
    for stock_code in stocks_codes:
            data = get_single_price(stock_code = stock_code)
            export_data(data = data,filename= stock_code, type = 'Price')

(五)、自定义以增量补全的方式获取行情数据模块

代码如下:

复制代码
    # 以增量补全的方式获取行情数据模块
    def add_price_data(stock_Code, type):
    """  
    (1)finaPath : 文件路径名
    (2)判断文件是否存在 : 情况1,文件存在时以增量方式获取数据
                         情况2,文件不存在时全下载
    """
    # 查询文件存在情况
    finaPath = absolutePath + type + '\ ' + stock_Code + '.csv'
    if os.path.exists(finaPath):
        # 获取已有行情数据的最新日期
        start_date = pd.read_csv(finaPath, usecols = ['date'])['date'].iloc[-1]
        # 判断最新日期
        print(start_date)
        if start_date == '2022-05-13':
            print(f'当前{stock_Code}行情数据为最新数据,不需要重复下载')
        else:
            # 获取已有行情数据的最新日期到当日时间的数据
            data = get_single_price(stock_code = stock_Code, start_date = start_date, end_date = datetime.datetime.today())
            # 以增量方式添加已有文件当中
            export_data(data = data, filename = stock_Code, type = type, mode = 'a')
            print(f'当前{stock_Code}行情数据已为最新数据')
    else:
        data = get_single_price(stock_code = stock_Code)
        export_data(data = data, filename = stock_Code, type = 'Price')
        print(f'当前{stock_Code}行情数据已为最新数据')

(六)、自定义读取【 C S V 】数据模块

代码如下:

复制代码
    # 读取【 C S V 】数据模块
    def get_csv_data(filename,type = 'Price'):
    """ 
    name : 文件名称
    type : 存储的文件夹的名称[Finace/Price]   
    """
    finaPath = absolutePath + type + '\ ' + filename + '.csv'
    return pd.read_csv(finaPath)

(七)、自定义按时间读取 【 C S V 】 数据模块

代码如下:

复制代码
    # 按时间读取 【 C S V 】 数据模块
    def get_time_csv_data(filename,stat_date,end_date,type = 'Price'):
    """ 
    filename : 文件名称
    type : 存储的文件夹的名称[Finace/Price] 
    stat_date : 查询的起始时间
    end_date : 查询的终止时间
    """
    # 读取对应股票CSV文件
    finaPath=absolutePath + type + '\ ' + filename + '.csv'
    # # 筛选特定时间数据
    # 把日期设为行索引
    data = pd.read_csv(finaPath,index_col='date')
    # 根据起始时间筛选数据
    return data[( data.index >= stat_date ) & ( data.index <= end_date )]

(八)、自定义读取【 E x c e l 】数据模块

代码如下:

复制代码
    # 读取【 E x c e l 】数据模块
    def get_excel_data(filename,type = 'Price'):
    """ 
    name : 文件名称
    type : 存储的文件夹的名称[Finace/Price]   
    """
    finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
    return pd.read_excel(finaPath)

(九)、自定义按时间读取【 E x c e l 】数据模块

代码如下:

复制代码
    # 按时间读取【 E x c e l 】数据模块
    def get_time_excel_data(filename,stat_date,end_date,type = 'Price'):
    """ 
    filename : 文件名称
    type : 存储的文件夹的名称[Finace/Price] 
    stat_date : 查询的起始时间
    end_date : 查询的终止时间
    """
    # 读取对应股票CSV文件
    finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
    # # 筛选特定时间数据
    # 把日期设为行索引
    data = pd.read_excel(finaPath,index_col='date')
    # 根据起始时间筛选数据
    return data[( data.index >= stat_date ) & ( data.index <= end_date )]

(十)、自定义将数据转换股票行情指定周期模块

代码如下:

复制代码
    # 将数据转换股票行情指定周期模块
    def transfer_price_freq(data,timefrequency):
    """ 
    data : 数据
    timefrequency : 制定周期
    """ 
    #汇总周数据表
    TranskData=pd.DataFrame()
    #转换开盘价
    TranskData['Open']=data['open'].resample(timefrequency).first()
    #转换收盘价
    TranskData['Close']=data['close'].resample(timefrequency).last()
    #转换最高价
    TranskData['High']=data['high'].resample(timefrequency).max()
    #转换最低价
    TranskData['Low']=data['low'].resample(timefrequency).min()
    #转换总的成交量
    TranskData['Volume(Sum)']=data['volume'].resample(timefrequency).sum()
    #转换总的成交额
    TranskData['Money(Sum)']=data['money'].resample(timefrequency).sum()
    return TranskData

(十 一)、自定义获取单个股票财务指标模块

代码如下:

复制代码
    # 获取单个股票财务指标模块
    def get_single_indicator(code,statDate,date):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """    
    # 使用获取财务数据函数
    data = get_fundamentals(query(indicator).filter(indicator.code == code),date = date, statDate = statDate)
    return data

(十 二)、自定义获取单个股票估值指标模块

代码如下:

复制代码
    # 获取单个股票估值指标模块
    def get_single_valuation(code,   date,   statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取股票估值数据函数
    TargetData=get_fundamentals(query(valuation).filter(valuation.code == code),date=date, statDate=statDate)
    return TargetData

(十 三)、自定义获取单个股票资产负债指标数据模块

代码如下:

复制代码
    # 获取单个股票资产负债指标模块
    def get_single_balance(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取资产负债数据函数
    TargetData=get_fundamentals(query(balance).filter(balance.code == code),date=date, statDate=statDate)
    return TargetData

(十 四)、自定义获取单个利润数据指标数据模块

代码如下:

复制代码
    # 获取单个利润数据指标模块
    def get_single_income(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取利润数据函数
    TargetData=get_fundamentals(query(income).filter(income.code == code),date=date, statDate=statDate)
    return TargetData

(十 五)、自定义获取单个股票券商业专项指标数据模块

代码如下:

复制代码
    # 获取单个股票券商业专项指标模块
    def get_single_security_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取券商业专项指标数据函数
    TargetData=get_fundamentals(query(security_indicator).filter(security_indicator.code == code),date=date, statDate=statDate)
    return TargetData

(十 六)、自定义获取单个股票保险业专项指标数据模块

代码如下:

复制代码
    # 获取单个股票保险业专项指标模块
    def get_single_insurance_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取保险业专项指标数据函数
    TargetData=get_fundamentals(query(insurance_indicator).filter(insurance_indicator.code == code),date=date, statDate=statDate)
    return TargetData

(十 七)、自定义获取单个股票现金流指标数据模块

代码如下:

复制代码
    # 获取单个股票现金流指标模块
    def get_single_cash_flow(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取现金流数据函数
    TargetData=get_fundamentals(query(cash_flow).filter(cash_flow.code == code),date=date, statDate=statDate)
    return TargetData

(十 八)、自定义获取单个股票银行业专项指标模块

代码如下:

复制代码
    # 获取单个股票银行业专项指标模块
    def get_single_bank_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取银行业专项指标数据函数
    TargetData=get_fundamentals(query(bank_indicator).filter(bank_indicator.code == code),date=date, statDate=statDate)
    return TargetData

(十 九)、自定义涨跌幅计算模块

代码如下:

复制代码
    # 涨跌幅计算模块
    def calculate_change_pct(data):
    """ 
    公式:(当期开盘价-上期开盘价)/上期开盘价
    : param data : dataframe 带有收盘价数据
    : return data : dataframe 带有涨跌幅数据
    """
    data['close_pct']=(data['close']-data['close'].shift(1))/data['close'].shift(1)
    return data

(二十)、自定义获取股池模块

代码如下:

复制代码
    # 获取股票成分股票数据https://www.joinquant.com/indexData
    def get_index_list(index_symbol = '000300.XSHG'):
    # 获取所有沪深300的股票, 设为股票池
    stocks = get_index_stocks(index_symbol)
    return stocks

总结

代码汇总

复制代码
    from jqdatasdk import *
    import pandas as pd
    import os
    import datetime
    auth('账号','密码')
    absolutePath = r'F:\ XuhssQUANT\ Data\ 数据\ '
    print('========================================================================================================================================================================================================')
    
    # 获取所有A股列表模块
    def get_stock_list():
    """
    这是获取所有股票代码的函数 
    """
    # 使用获取股票基本信息函数——获取所有股票代码
    All_Stock_ID_List = list(get_all_securities(['stock']).index)
    # 返回所有股票代码
    return All_Stock_ID_List
    
    # 获得单个股票行情数据模块
    def get_single_price(stock_code,start_date = None, end_date =None, timefrequency='1d'):
    """ 
    stock_code :'股票代码'
    timefrequency : '1d'(每日)、'1m'(每分钟)
    start_date : '年-月-日'(开始时间)
    end_date : '年-月-日'(结束时间)
    """ 
    # 如果 start_date 为 None 时,则 start_date 取 上市时间
    if start_date is None:
        start_date = get_security_info(stock_code).start_date
    if end_date is None:
        end_date = datetime.datetime.today()
    # 使用获取行情函数
    data = get_price(security = stock_code, start_date = start_date, end_date = end_date, frequency = timefrequency, fill_paused = True, panel = True)
    # 返回目标行情数据
    return data
    
    # 导出股票相关数据模块
    def export_data(data,filename,type,mode = None):
    """ 
    type : 存储的文件夹的名称[Finace/Price]
    filename : 文件名称
    data : 数据    
    """
    finaPath = absolutePath + type + '\ ' + filename + '.csv'
    data.index.names=['date']
    if mode == 'a':
        data.to_csv(finaPath, mode = mode ,header = False )
        # # 删除重复值
        # 读取已存在数据
        data = pd.read_csv(finaPath)
        # 删除重复值
        data = data.drop_duplicates(subset = ['date'])
        # 存储非重复数据
        data.to_csv(finaPath,index=False)
    else:
        data.to_csv(finaPath)
    print(f'股票 :【 {filename} 】 数据封装成功,封装路径:',finaPath)
    
    # 初始化股票行情数据库
    def init_db():
    """ 
    注意 : 什么都不用填
    """
    # 获取所有股票代码
    stocks_codes = get_stock_list()
    # 存储为CSV文件
    for stock_code in stocks_codes:
            data = get_single_price(stock_code = stock_code)
            export_data(data = data,filename= stock_code, type = 'Price')
        
    # 以增量补全的方式获取行情数据模块
    def add_price_data(stock_Code, type):
    """  
    (1)finaPath : 文件路径名
    (2)判断文件是否存在 : 情况1,文件存在时以增量方式获取数据
                         情况2,文件不存在时全下载
    """
    # 查询文件存在情况
    finaPath = absolutePath + type + '\ ' + stock_Code + '.csv'
    if os.path.exists(finaPath):
        # 获取已有行情数据的最新日期
        start_date = pd.read_csv(finaPath, usecols = ['date'])['date'].iloc[-1]
        # 判断最新日期
        print(start_date)
        if start_date == '2022-05-13':
            print(f'当前{stock_Code}行情数据为最新数据,不需要重复下载')
        else:
            # 获取已有行情数据的最新日期到当日时间的数据
            data = get_single_price(stock_code = stock_Code, start_date = start_date, end_date = datetime.datetime.today())
            # 以增量方式添加已有文件当中
            export_data(data = data, filename = stock_Code, type = type, mode = 'a')
            print(f'当前{stock_Code}行情数据已为最新数据')
    else:
        data = get_single_price(stock_code = stock_Code)
        export_data(data = data, filename = stock_Code, type = 'Price')
        print(f'当前{stock_Code}行情数据已为最新数据')
    
    # 读取【 C S V 】数据模块
    def get_csv_data(filename,type = 'Price'):
    """ 
    name : 文件名称
    type : 存储的文件夹的名称[Finace/Price]   
    """
    finaPath = absolutePath + type + '\ ' + filename + '.csv'
    return pd.read_csv(finaPath)
    
    # 按时间读取 【 C S V 】 数据模块
    def get_time_csv_data(filename,stat_date,end_date,type = 'Price'):
    """ 
    filename : 文件名称
    type : 存储的文件夹的名称[Finace/Price] 
    stat_date : 查询的起始时间
    end_date : 查询的终止时间
    """
    # 读取对应股票CSV文件
    finaPath=absolutePath + type + '\ ' + filename + '.csv'
    # # 筛选特定时间数据
    # 把日期设为行索引
    data = pd.read_csv(finaPath,index_col='date')
    # 根据起始时间筛选数据
    return data[( data.index >= stat_date ) & ( data.index <= end_date )]
    
    # 读取【 E x c e l 】数据模块
    def get_excel_data(filename,type = 'Price'):
    """ 
    name : 文件名称
    type : 存储的文件夹的名称[Finace/Price]   
    """
    finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
    return pd.read_excel(finaPath)
    
    # 按时间读取【 E x c e l 】数据模块
    def get_time_excel_data(filename,stat_date,end_date,type = 'Price'):
    """ 
    filename : 文件名称
    type : 存储的文件夹的名称[Finace/Price] 
    stat_date : 查询的起始时间
    end_date : 查询的终止时间
    """
    # 读取对应股票CSV文件
    finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
    # # 筛选特定时间数据
    # 把日期设为行索引
    data = pd.read_excel(finaPath,index_col='date')
    # 根据起始时间筛选数据
    return data[( data.index >= stat_date ) & ( data.index <= end_date )]
    
    # 将数据转换股票行情指定周期模块
    def transfer_price_freq(data,timefrequency):
    """ 
    data : 数据
    timefrequency : 制定周期
    """ 
    #汇总周数据表
    TranskData=pd.DataFrame()
    #转换开盘价
    TranskData['Open']=data['open'].resample(timefrequency).first()
    #转换收盘价
    TranskData['Close']=data['close'].resample(timefrequency).last()
    #转换最高价
    TranskData['High']=data['high'].resample(timefrequency).max()
    #转换最低价
    TranskData['Low']=data['low'].resample(timefrequency).min()
    #转换总的成交量
    TranskData['Volume(Sum)']=data['volume'].resample(timefrequency).sum()
    #转换总的成交额
    TranskData['Money(Sum)']=data['money'].resample(timefrequency).sum()
    return TranskData
        
    # 获取单个股票财务指标模块
    def get_single_indicator(code,statDate,date):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """    
    # 使用获取财务数据函数
    data = get_fundamentals(query(indicator).filter(indicator.code == code),date = date, statDate = statDate)
    return data
    
    # 获取单个股票估值指标模块
    def get_single_valuation(code,   date,   statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取股票估值数据函数
    TargetData=get_fundamentals(query(valuation).filter(valuation.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个股票资产负债指标模块
    def get_single_balance(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取资产负债数据函数
    TargetData=get_fundamentals(query(balance).filter(balance.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个利润数据指标模块
    def get_single_income(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取利润数据函数
    TargetData=get_fundamentals(query(income).filter(income.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个股票券商业专项指标模块
    def get_single_security_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取券商业专项指标数据函数
    TargetData=get_fundamentals(query(security_indicator).filter(security_indicator.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个股票保险业专项指标模块
    def get_single_insurance_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取保险业专项指标数据函数
    TargetData=get_fundamentals(query(insurance_indicator).filter(insurance_indicator.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个股票现金流指标模块
    def get_single_cash_flow(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取现金流数据函数
    TargetData=get_fundamentals(query(cash_flow).filter(cash_flow.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 获取单个股票银行业专项指标模块
    def get_single_bank_indicator(code,date,statDate):
    """  
    code : '股票代码'
    date : '年-月-日'
    statDate : '年季度'
    注意 : data 和 statData不能同时存在
    """
    # 使用获取银行业专项指标数据函数
    TargetData=get_fundamentals(query(bank_indicator).filter(bank_indicator.code == code),date=date, statDate=statDate)
    return TargetData
    
    # 涨跌幅计算模块
    def calculate_change_pct(data):
    """ 
    公式:(当期开盘价-上期开盘价)/上期开盘价
    : param data : dataframe 带有收盘价数据
    : return data : dataframe 带有涨跌幅数据
    """
    data['close_pct']=(data['close']-data['close'].shift(1))/data['close'].shift(1)
    return data

全部评论 (0)

还没有任何评论哟~