金 融 量 化 分 析 • JoinQuant • 第 二 篇
发布时间
阅读量:
阅读量
第 二 篇
-
一、自定义数据处理模块
-
- (一)、自定义获取所有A股列表模块
- (二)、自定义获取单个股票行情行情数据模块
- (三)、自定义导出股票相关数据模块
- (四)、自定义初始化股票行情数据库
- (五)、自定义以增量补全的方式获取行情数据模块
- (六)、自定义读取【 C S V 】数据模块
- (七)、自定义按时间读取 【 C S V 】 数据模块
- (八)、自定义读取【 E x c e l 】数据模块
- (九)、自定义按时间读取【 E x c e l 】数据模块
- (十)、自定义将数据转换股票行情指定周期模块
- (十 一)、自定义获取单个股票财务指标模块
- (十 二)、自定义获取单个股票估值指标模块
- (十 三)、自定义获取单个股票资产负债指标数据模块
- (十 四)、自定义获取单个利润数据指标数据模块
- (十 五)、自定义获取单个股票券商业专项指标数据模块
- (十 六)、自定义获取单个股票保险业专项指标数据模块
- (十 七)、自定义获取单个股票现金流指标数据模块
- (十 八)、自定义获取单个股票银行业专项指标模块
- (十 九)、自定义涨跌幅计算模块
- (二十)、自定义获取股池模块
-
总结
-
- 代码汇总
一、自定义数据处理模块
(一)、自定义获取所有A股列表模块
代码如下:
# 获取所有A股列表模块
def get_stock_list():
"""
这是获取所有股票代码的函数
"""
# 使用获取股票基本信息函数——获取所有股票代码
All_Stock_ID_List = list(get_all_securities(['stock']).index)
# 返回所有股票代码
return All_Stock_ID_List
(二)、自定义获取单个股票行情行情数据模块
代码如下:
# 获得单个股票行情数据模块
def get_single_price(stock_code,start_date = None, end_date =None, timefrequency='1d'):
"""
stock_code :'股票代码'
timefrequency : '1d'(每日)、'1m'(每分钟)
start_date : '年-月-日'(开始时间)
end_date : '年-月-日'(结束时间)
"""
# 如果 start_date 为 None 时,则 start_date 取 上市时间
if start_date is None:
start_date = get_security_info(stock_code).start_date
if end_date is None:
end_date = datetime.datetime.today()
# 使用获取行情函数
data = get_price(security = stock_code, start_date = start_date, end_date = end_date, frequency = timefrequency, fill_paused = True, panel = True)
# 返回目标行情数据
return data
(三)、自定义导出股票相关数据模块
代码如下:
# 导出股票相关数据模块
def export_data(data,filename,type,mode = None):
"""
type : 存储的文件夹的名称[Finace/Price]
filename : 文件名称
data : 数据
"""
finaPath = absolutePath + type + '\ ' + filename + '.csv'
data.index.names=['date']
if mode == 'a':
data.to_csv(finaPath, mode = mode ,header = False )
# # 删除重复值
# 读取已存在数据
data = pd.read_csv(finaPath)
# 删除重复值
data = data.drop_duplicates(subset = ['date'])
# 存储非重复数据
data.to_csv(finaPath,index=False)
else:
data.to_csv(finaPath)
print(f'股票 :【 {filename} 】 数据封装成功,封装路径:',finaPath)
(四)、自定义初始化股票行情数据库
代码如下:
def init_db():
"""
注意 : 什么都不用填
"""
# 获取所有股票代码
stocks_codes = get_stock_list()
# 存储为CSV文件
for stock_code in stocks_codes:
data = get_single_price(stock_code = stock_code)
export_data(data = data,filename= stock_code, type = 'Price')
(五)、自定义以增量补全的方式获取行情数据模块
代码如下:
# 以增量补全的方式获取行情数据模块
def add_price_data(stock_Code, type):
"""
(1)finaPath : 文件路径名
(2)判断文件是否存在 : 情况1,文件存在时以增量方式获取数据
情况2,文件不存在时全下载
"""
# 查询文件存在情况
finaPath = absolutePath + type + '\ ' + stock_Code + '.csv'
if os.path.exists(finaPath):
# 获取已有行情数据的最新日期
start_date = pd.read_csv(finaPath, usecols = ['date'])['date'].iloc[-1]
# 判断最新日期
print(start_date)
if start_date == '2022-05-13':
print(f'当前{stock_Code}行情数据为最新数据,不需要重复下载')
else:
# 获取已有行情数据的最新日期到当日时间的数据
data = get_single_price(stock_code = stock_Code, start_date = start_date, end_date = datetime.datetime.today())
# 以增量方式添加已有文件当中
export_data(data = data, filename = stock_Code, type = type, mode = 'a')
print(f'当前{stock_Code}行情数据已为最新数据')
else:
data = get_single_price(stock_code = stock_Code)
export_data(data = data, filename = stock_Code, type = 'Price')
print(f'当前{stock_Code}行情数据已为最新数据')
(六)、自定义读取【 C S V 】数据模块
代码如下:
# 读取【 C S V 】数据模块
def get_csv_data(filename,type = 'Price'):
"""
name : 文件名称
type : 存储的文件夹的名称[Finace/Price]
"""
finaPath = absolutePath + type + '\ ' + filename + '.csv'
return pd.read_csv(finaPath)
(七)、自定义按时间读取 【 C S V 】 数据模块
代码如下:
# 按时间读取 【 C S V 】 数据模块
def get_time_csv_data(filename,stat_date,end_date,type = 'Price'):
"""
filename : 文件名称
type : 存储的文件夹的名称[Finace/Price]
stat_date : 查询的起始时间
end_date : 查询的终止时间
"""
# 读取对应股票CSV文件
finaPath=absolutePath + type + '\ ' + filename + '.csv'
# # 筛选特定时间数据
# 把日期设为行索引
data = pd.read_csv(finaPath,index_col='date')
# 根据起始时间筛选数据
return data[( data.index >= stat_date ) & ( data.index <= end_date )]
(八)、自定义读取【 E x c e l 】数据模块
代码如下:
# 读取【 E x c e l 】数据模块
def get_excel_data(filename,type = 'Price'):
"""
name : 文件名称
type : 存储的文件夹的名称[Finace/Price]
"""
finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
return pd.read_excel(finaPath)
(九)、自定义按时间读取【 E x c e l 】数据模块
代码如下:
# 按时间读取【 E x c e l 】数据模块
def get_time_excel_data(filename,stat_date,end_date,type = 'Price'):
"""
filename : 文件名称
type : 存储的文件夹的名称[Finace/Price]
stat_date : 查询的起始时间
end_date : 查询的终止时间
"""
# 读取对应股票CSV文件
finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
# # 筛选特定时间数据
# 把日期设为行索引
data = pd.read_excel(finaPath,index_col='date')
# 根据起始时间筛选数据
return data[( data.index >= stat_date ) & ( data.index <= end_date )]
(十)、自定义将数据转换股票行情指定周期模块
代码如下:
# 将数据转换股票行情指定周期模块
def transfer_price_freq(data,timefrequency):
"""
data : 数据
timefrequency : 制定周期
"""
#汇总周数据表
TranskData=pd.DataFrame()
#转换开盘价
TranskData['Open']=data['open'].resample(timefrequency).first()
#转换收盘价
TranskData['Close']=data['close'].resample(timefrequency).last()
#转换最高价
TranskData['High']=data['high'].resample(timefrequency).max()
#转换最低价
TranskData['Low']=data['low'].resample(timefrequency).min()
#转换总的成交量
TranskData['Volume(Sum)']=data['volume'].resample(timefrequency).sum()
#转换总的成交额
TranskData['Money(Sum)']=data['money'].resample(timefrequency).sum()
return TranskData
(十 一)、自定义获取单个股票财务指标模块
代码如下:
# 获取单个股票财务指标模块
def get_single_indicator(code,statDate,date):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取财务数据函数
data = get_fundamentals(query(indicator).filter(indicator.code == code),date = date, statDate = statDate)
return data
(十 二)、自定义获取单个股票估值指标模块
代码如下:
# 获取单个股票估值指标模块
def get_single_valuation(code, date, statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取股票估值数据函数
TargetData=get_fundamentals(query(valuation).filter(valuation.code == code),date=date, statDate=statDate)
return TargetData
(十 三)、自定义获取单个股票资产负债指标数据模块
代码如下:
# 获取单个股票资产负债指标模块
def get_single_balance(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取资产负债数据函数
TargetData=get_fundamentals(query(balance).filter(balance.code == code),date=date, statDate=statDate)
return TargetData
(十 四)、自定义获取单个利润数据指标数据模块
代码如下:
# 获取单个利润数据指标模块
def get_single_income(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取利润数据函数
TargetData=get_fundamentals(query(income).filter(income.code == code),date=date, statDate=statDate)
return TargetData
(十 五)、自定义获取单个股票券商业专项指标数据模块
代码如下:
# 获取单个股票券商业专项指标模块
def get_single_security_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取券商业专项指标数据函数
TargetData=get_fundamentals(query(security_indicator).filter(security_indicator.code == code),date=date, statDate=statDate)
return TargetData
(十 六)、自定义获取单个股票保险业专项指标数据模块
代码如下:
# 获取单个股票保险业专项指标模块
def get_single_insurance_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取保险业专项指标数据函数
TargetData=get_fundamentals(query(insurance_indicator).filter(insurance_indicator.code == code),date=date, statDate=statDate)
return TargetData
(十 七)、自定义获取单个股票现金流指标数据模块
代码如下:
# 获取单个股票现金流指标模块
def get_single_cash_flow(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取现金流数据函数
TargetData=get_fundamentals(query(cash_flow).filter(cash_flow.code == code),date=date, statDate=statDate)
return TargetData
(十 八)、自定义获取单个股票银行业专项指标模块
代码如下:
# 获取单个股票银行业专项指标模块
def get_single_bank_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取银行业专项指标数据函数
TargetData=get_fundamentals(query(bank_indicator).filter(bank_indicator.code == code),date=date, statDate=statDate)
return TargetData
(十 九)、自定义涨跌幅计算模块
代码如下:
# 涨跌幅计算模块
def calculate_change_pct(data):
"""
公式:(当期开盘价-上期开盘价)/上期开盘价
: param data : dataframe 带有收盘价数据
: return data : dataframe 带有涨跌幅数据
"""
data['close_pct']=(data['close']-data['close'].shift(1))/data['close'].shift(1)
return data
(二十)、自定义获取股池模块
代码如下:
# 获取股票成分股票数据https://www.joinquant.com/indexData
def get_index_list(index_symbol = '000300.XSHG'):
# 获取所有沪深300的股票, 设为股票池
stocks = get_index_stocks(index_symbol)
return stocks
总结
代码汇总
from jqdatasdk import *
import pandas as pd
import os
import datetime
auth('账号','密码')
absolutePath = r'F:\ XuhssQUANT\ Data\ 数据\ '
print('========================================================================================================================================================================================================')
# 获取所有A股列表模块
def get_stock_list():
"""
这是获取所有股票代码的函数
"""
# 使用获取股票基本信息函数——获取所有股票代码
All_Stock_ID_List = list(get_all_securities(['stock']).index)
# 返回所有股票代码
return All_Stock_ID_List
# 获得单个股票行情数据模块
def get_single_price(stock_code,start_date = None, end_date =None, timefrequency='1d'):
"""
stock_code :'股票代码'
timefrequency : '1d'(每日)、'1m'(每分钟)
start_date : '年-月-日'(开始时间)
end_date : '年-月-日'(结束时间)
"""
# 如果 start_date 为 None 时,则 start_date 取 上市时间
if start_date is None:
start_date = get_security_info(stock_code).start_date
if end_date is None:
end_date = datetime.datetime.today()
# 使用获取行情函数
data = get_price(security = stock_code, start_date = start_date, end_date = end_date, frequency = timefrequency, fill_paused = True, panel = True)
# 返回目标行情数据
return data
# 导出股票相关数据模块
def export_data(data,filename,type,mode = None):
"""
type : 存储的文件夹的名称[Finace/Price]
filename : 文件名称
data : 数据
"""
finaPath = absolutePath + type + '\ ' + filename + '.csv'
data.index.names=['date']
if mode == 'a':
data.to_csv(finaPath, mode = mode ,header = False )
# # 删除重复值
# 读取已存在数据
data = pd.read_csv(finaPath)
# 删除重复值
data = data.drop_duplicates(subset = ['date'])
# 存储非重复数据
data.to_csv(finaPath,index=False)
else:
data.to_csv(finaPath)
print(f'股票 :【 {filename} 】 数据封装成功,封装路径:',finaPath)
# 初始化股票行情数据库
def init_db():
"""
注意 : 什么都不用填
"""
# 获取所有股票代码
stocks_codes = get_stock_list()
# 存储为CSV文件
for stock_code in stocks_codes:
data = get_single_price(stock_code = stock_code)
export_data(data = data,filename= stock_code, type = 'Price')
# 以增量补全的方式获取行情数据模块
def add_price_data(stock_Code, type):
"""
(1)finaPath : 文件路径名
(2)判断文件是否存在 : 情况1,文件存在时以增量方式获取数据
情况2,文件不存在时全下载
"""
# 查询文件存在情况
finaPath = absolutePath + type + '\ ' + stock_Code + '.csv'
if os.path.exists(finaPath):
# 获取已有行情数据的最新日期
start_date = pd.read_csv(finaPath, usecols = ['date'])['date'].iloc[-1]
# 判断最新日期
print(start_date)
if start_date == '2022-05-13':
print(f'当前{stock_Code}行情数据为最新数据,不需要重复下载')
else:
# 获取已有行情数据的最新日期到当日时间的数据
data = get_single_price(stock_code = stock_Code, start_date = start_date, end_date = datetime.datetime.today())
# 以增量方式添加已有文件当中
export_data(data = data, filename = stock_Code, type = type, mode = 'a')
print(f'当前{stock_Code}行情数据已为最新数据')
else:
data = get_single_price(stock_code = stock_Code)
export_data(data = data, filename = stock_Code, type = 'Price')
print(f'当前{stock_Code}行情数据已为最新数据')
# 读取【 C S V 】数据模块
def get_csv_data(filename,type = 'Price'):
"""
name : 文件名称
type : 存储的文件夹的名称[Finace/Price]
"""
finaPath = absolutePath + type + '\ ' + filename + '.csv'
return pd.read_csv(finaPath)
# 按时间读取 【 C S V 】 数据模块
def get_time_csv_data(filename,stat_date,end_date,type = 'Price'):
"""
filename : 文件名称
type : 存储的文件夹的名称[Finace/Price]
stat_date : 查询的起始时间
end_date : 查询的终止时间
"""
# 读取对应股票CSV文件
finaPath=absolutePath + type + '\ ' + filename + '.csv'
# # 筛选特定时间数据
# 把日期设为行索引
data = pd.read_csv(finaPath,index_col='date')
# 根据起始时间筛选数据
return data[( data.index >= stat_date ) & ( data.index <= end_date )]
# 读取【 E x c e l 】数据模块
def get_excel_data(filename,type = 'Price'):
"""
name : 文件名称
type : 存储的文件夹的名称[Finace/Price]
"""
finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
return pd.read_excel(finaPath)
# 按时间读取【 E x c e l 】数据模块
def get_time_excel_data(filename,stat_date,end_date,type = 'Price'):
"""
filename : 文件名称
type : 存储的文件夹的名称[Finace/Price]
stat_date : 查询的起始时间
end_date : 查询的终止时间
"""
# 读取对应股票CSV文件
finaPath = absolutePath + type + '\ ' + filename + '.xlsx'
# # 筛选特定时间数据
# 把日期设为行索引
data = pd.read_excel(finaPath,index_col='date')
# 根据起始时间筛选数据
return data[( data.index >= stat_date ) & ( data.index <= end_date )]
# 将数据转换股票行情指定周期模块
def transfer_price_freq(data,timefrequency):
"""
data : 数据
timefrequency : 制定周期
"""
#汇总周数据表
TranskData=pd.DataFrame()
#转换开盘价
TranskData['Open']=data['open'].resample(timefrequency).first()
#转换收盘价
TranskData['Close']=data['close'].resample(timefrequency).last()
#转换最高价
TranskData['High']=data['high'].resample(timefrequency).max()
#转换最低价
TranskData['Low']=data['low'].resample(timefrequency).min()
#转换总的成交量
TranskData['Volume(Sum)']=data['volume'].resample(timefrequency).sum()
#转换总的成交额
TranskData['Money(Sum)']=data['money'].resample(timefrequency).sum()
return TranskData
# 获取单个股票财务指标模块
def get_single_indicator(code,statDate,date):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取财务数据函数
data = get_fundamentals(query(indicator).filter(indicator.code == code),date = date, statDate = statDate)
return data
# 获取单个股票估值指标模块
def get_single_valuation(code, date, statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取股票估值数据函数
TargetData=get_fundamentals(query(valuation).filter(valuation.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个股票资产负债指标模块
def get_single_balance(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取资产负债数据函数
TargetData=get_fundamentals(query(balance).filter(balance.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个利润数据指标模块
def get_single_income(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取利润数据函数
TargetData=get_fundamentals(query(income).filter(income.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个股票券商业专项指标模块
def get_single_security_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取券商业专项指标数据函数
TargetData=get_fundamentals(query(security_indicator).filter(security_indicator.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个股票保险业专项指标模块
def get_single_insurance_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取保险业专项指标数据函数
TargetData=get_fundamentals(query(insurance_indicator).filter(insurance_indicator.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个股票现金流指标模块
def get_single_cash_flow(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取现金流数据函数
TargetData=get_fundamentals(query(cash_flow).filter(cash_flow.code == code),date=date, statDate=statDate)
return TargetData
# 获取单个股票银行业专项指标模块
def get_single_bank_indicator(code,date,statDate):
"""
code : '股票代码'
date : '年-月-日'
statDate : '年季度'
注意 : data 和 statData不能同时存在
"""
# 使用获取银行业专项指标数据函数
TargetData=get_fundamentals(query(bank_indicator).filter(bank_indicator.code == code),date=date, statDate=statDate)
return TargetData
# 涨跌幅计算模块
def calculate_change_pct(data):
"""
公式:(当期开盘价-上期开盘价)/上期开盘价
: param data : dataframe 带有收盘价数据
: return data : dataframe 带有涨跌幅数据
"""
data['close_pct']=(data['close']-data['close'].shift(1))/data['close'].shift(1)
return data
全部评论 (0)
还没有任何评论哟~
