核心的逻辑:选出流通值小的超跌次新股,然后买入博反抽。
测了一下,2015年9月---2019年3月的收益是这样的:
![]()
看起来非常不错,3年半的时间总共有12倍的收益,平均年化111%,胜率为62.5%,最大回撤22.8%。
时隔2年,这个策略还行不行呢?
翻出以前写的代码,又测了一下2015年9月---2021年3月的收益:
![]()
真是让我感到惊讶,竟然还是非常有效,收益达到了2522%,年化84%。
这个策略的思路其实特别简单:
**选股方式: **就是在上市天数小于180天的次新股里挑选出跌幅大于60%的股票,然后按照流通市值的大小,由小到大排序
**买入方式: **买入流通市值最小的前几个
**卖出方式: **5天内涨幅超过20%就止盈,或者4天内跌幅超过10%就止损
**仓位管理: **全仓买卖
源代码如下
from kuanke.wizard import *
from jqdata import *
import numpy as np
import pandas as pd
import talib
import datetime
import time
import math
## 初始化函数,设定要操作的股票、基准等等
def initialize(context):
# 设定基准
set_benchmark('000300.XSHG')
# 设定滑点
set_slippage(FixedSlippage(0.01))
# True为开启动态复权模式,使用真实价格交易
set_option('use_real_price', True)
# 设定成交量比例
set_option('order_volume_ratio', 1)
# 股票类交易手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
#容器初始化
check_container_initialize()
#动态仓位、频率、计数初始化函数
check_dynamic_initialize()
# 股票筛选初始化函数
check_stocks_initialize()
# 出场初始化函数
sell_initialize()
# 入场初始化函数
buy_initialize()
# 关闭提示
log.set_level('order', 'info')
# 运行函数
run_daily(check_stocks, '9:15') #选股
run_daily(main_stock_pick, '9:16') #买入卖出列表
run_daily(sell_every_day,'open') #卖出未卖出成功的股票
run_daily(trade, 'open') #交易
run_daily(selled_security_list_count, 'after_close') #卖出股票日期计数
#######################!!!新手需要使用的地方!!!###################################################
##动态仓位、频率、计数初始化函数(持仓比例,选股频率,买入频率,卖出频率在这里设置)
def check_dynamic_initialize():
# 个股最大持仓比重
g.security_max_proportion = 1
# 选股和买卖频率
g.check_stocks_refresh_rate = 1
# 最大建仓数量
g.max_hold_stocknum = 1
#下面这几项不用管
# 买入频率
g.buy_refresh_rate = 1
# 卖出频率
g.sell_refresh_rate = 1
# 选股频率计数器
g.check_stocks_days = 0
#机器学习选股频率计数器
g.days = 0
# 买卖交易频率计数器
g.buy_trade_days=0
g.sell_trade_days=0
## 股票池初筛设置函数(股票初筛在这里设置)
def check_stocks_initialize():
# 是否过滤停盘
g.filter_paused = True
# 是否过滤退市
g.filter_delisted = True
# 是否只有ST
g.only_st = False
# 是否过滤ST
g.filter_st = True
# 股票池(填指数)
g.security_universe_index = ["all_a_securities"]#这里填写指数,全部股票就填['all_a_securities'],沪深300股票就填['000300.XSHG'],中证500就填['000905.XSHG'],沪深300+中证500就填['000300.XSHG','000905.XSHG']
# 填成分股(填成分股)
g.security_universe_user_securities = []
# 行业列表
g.industry_list = ["801010","801020","801030","801040","801050","801080","801110","801120","801130","801140","801150","801160","801170","801180","801200","801210","801230","801710","801720","801730","801740","801750","801760","801770","801780","801790","801880","801890"]
# 概念列表
g.concept_list = []
# 黑名单
g.blacklist=[]
## 买入股票,卖出股票筛选函数
def main_stock_pick(context):
if g.days % g.check_stocks_refresh_rate != 0:
g.days +=1
return
g.sell_stock_list=[]
g.buy_stock_list = []
####自定义编辑范围#####
#超跌次新策略
g.check_out_lists = [stock for stock in g.check_out_lists if ((context.current_dt.date() - get_security_info(stock).start_date).days)<180]
g.check_out_lists = [stock for stock in g.check_out_lists if stock[:3]!= &#34;688&#34;]
uuzhang2=[]
for stock in g.check_out_lists:
df2=attribute_history(stock,count=180,unit=&#39;1d&#39;,fields=[&#39;close&#39;],fq=&#39;pre&#39;)
Maxjiage = df2[&#39;close&#39;].max()
current_pricedata = attribute_history(stock, 1, &#39;1d&#39;, [&#39;close&#39;])
current_price = current_pricedata[&#39;close&#39;].mean()
if current_price/Maxjiage <0.4:
uuzhang2.append(stock)
stockset = uuzhang2
g.u=[]
#security_list22 = [stock for stock in g.sell_stock_list1 if current_data[stock].paused]
#print security_list22
g.sell_stock_list=[]
g.buy_stock_list = []
g.sell_stock_list1 = list(context.portfolio.positions.keys())
stockset = [stock for stock in stockset if stock not in g.sell_stock_list1]
for stock in g.sell_stock_list1:
df2=attribute_history(stock,count=1,unit=&#39;1d&#39;,\
fields=[&#39;open&#39;,&#39;close&#39;,&#39;volume&#39;,&#39;high&#39;,&#39;low&#39;,&#39;money&#39;,&#39;high_limit&#39;,&#39;low_limit&#39;,&#39;paused&#39;],\
fq=&#39;pre&#39;)
if df2[&#39;close&#39;][-1]==df2[&#39;high_limit&#39;][-1] and df2[&#39;volume&#39;][-1]>0 and df2[&#39;money&#39;][-1]>0:
g.u.append(stock)
else:
Minjiage=attribute_history(stock,count=5,unit=&#39;1d&#39;,fields=[&#39;close&#39;],fq=&#39;pre&#39;)
Minjiage = df2[&#39;close&#39;].min()
current_pricedata = attribute_history(stock, 1, &#39;1d&#39;, [&#39;close&#39;])
current_price = current_pricedata[&#39;close&#39;].mean()
loss_data3= attribute_history(stock, 4, &#39;1d&#39;, [&#39;close&#39;])
loss3 = loss_data3[&#39;close&#39;].max()
current_pricedata = attribute_history(stock, 1, &#39;1d&#39;, [&#39;close&#39;])
current_price = current_pricedata[&#39;close&#39;].mean()
loss_rate = current_price/loss3
if current_price/Minjiage >1.2 or loss_rate < 0.90:
g.sell_stock_list.append(stock)
for stock in stockset:
if stock in g.sell_stock_list1:
pass
else:
g.buy_stock_list.append(stock)
####自定义编辑范围#####
http://log.info(&#39;卖出列表:&#39;,g.sell_stock_list)
http://log.info(&#39;购买列表:&#39;,g.buy_stock_list)
g.days =1
return g.sell_stock_list,g.buy_stock_list
#######################!!!新手需要使用的地方!!!###################################################
##容器初始化(有新的全局容器可以加到这里)(新手忽略这里)
def check_container_initialize():
#卖出股票列表
g.sell_stock_list=[]
#买入股票列表
g.buy_stock_list = []
# 获取未卖出的股票
g.open_sell_securities = []
# 卖出股票的dict
g.selled_security_list={}
#涨停股票列表
g.ZT=[]
## 出场初始化函数(新手忽略这里)
def sell_initialize():
# 设定是否卖出buy_lists中的股票
g.sell_will_buy = True
# 固定出仓的数量或者百分比
g.sell_by_amount = None
g.sell_by_percent = None
## 入场初始化函数(新手忽略这里)
def buy_initialize():
# 是否可重复买入
g.filter_holded = False
# 委托类型
g.order_style_str = &#39;by_cap_mean&#39;
g.order_style_value = 100
## 股票初筛(新手忽略这里)
def check_stocks(context):
if g.check_stocks_days%g.check_stocks_refresh_rate != 0:
# 计数器加一
g.check_stocks_days += 1
return
# 股票池赋值
g.check_out_lists = get_security_universe(context, g.security_universe_index, g.security_universe_user_securities)
# 行业过滤
#g.check_out_lists = industry_filter(context, g.check_out_lists, g.industry_list)
# 概念过滤
#g.check_out_lists = concept_filter(context, g.check_out_lists, g.concept_list)
# 过滤ST股票
g.check_out_lists = st_filter(context, g.check_out_lists)
# 过滤停牌股票
g.check_out_lists = paused_filter(context, g.check_out_lists)
# 过滤退市股票
g.check_out_lists = delisted_filter(context, g.check_out_lists)
# 过滤黑名单股票
g.check_out_lists = [s for s in g.check_out_lists if s not in g.blacklist]
# 计数器归一
g.check_stocks_days = 1
return
## 卖出未卖出成功的股票(新手忽略这里)
def sell_every_day(context):
g.open_sell_securities = list(set(g.open_sell_securities))
open_sell_securities = [s for s in context.portfolio.positions.keys() if s in g.open_sell_securities]
if len(open_sell_securities)>0:
for stock in open_sell_securities:
order_target_value(stock, 0)
g.open_sell_securities = [s for s in g.open_sell_securities if s in context.portfolio.positions.keys()]
return
## 交易函数(新手忽略这里)
def trade(context):
# 初始化买入列表
buy_lists = []
# 买入股票筛选
if g.buy_trade_days%g.buy_refresh_rate == 0:
# 获取 buy_lists 列表
buy_lists = g.buy_stock_list
# 过滤涨停股票
buy_lists = high_limit_filter(context, buy_lists)
http://log.info(&#39;购买列表最终&#39;,buy_lists)
# 卖出操作
if g.sell_trade_days%g.sell_refresh_rate != 0:
# 计数器加一
g.sell_trade_days += 1
else:
# 卖出股票
sell(context, buy_lists)
# 计数器归一
g.sell_trade_days = 1
# 买入操作
if g.buy_trade_days%g.buy_refresh_rate != 0:
# 计数器加一
g.buy_trade_days += 1
else:
# 卖出股票
buy(context, buy_lists)
# 计数器归一
g.buy_trade_days = 1
################################## 交易函数群 ##################################(新手忽略)
# 交易函数 - 出场
def sell(context, buy_lists):
# 获取 sell_lists 列表
init_sl = context.portfolio.positions.keys()
sell_lists = context.portfolio.positions.keys()
# 判断是否卖出buy_lists中的股票
if not g.sell_will_buy:
sell_lists = [security for security in sell_lists if security not in buy_lists]
### _出场函数筛选-开始 ###
sell_lists = g.sell_stock_list
### _出场函数筛选-结束 ###
# 卖出股票
if len(sell_lists)>0:
for stock in sell_lists:
sell_by_amount_or_percent_or_none(context,stock, g.sell_by_amount, g.sell_by_percent, g.open_sell_securities)
# 获取卖出的股票, 并加入到 g.selled_security_list中
selled_security_list_dict(context,init_sl)
return
# 交易函数 - 入场
def buy(context, buy_lists):
# 判断是否可重复买入
buy_lists = holded_filter(context,buy_lists)
# 获取最终的 buy_lists 列表
Num = g.max_hold_stocknum - len(context.portfolio.positions)
buy_lists = buy_lists[:Num]
# 买入股票
if len(buy_lists)>0:
# 分配资金
result = order_style(context,buy_lists,g.max_hold_stocknum, g.order_style_str, g.order_style_value)
for stock in buy_lists:
if len(context.portfolio.positions) < g.max_hold_stocknum:
# 获取资金
Cash = result[stock]
# 判断个股最大持仓比重
value = judge_security_max_proportion(context,stock,Cash,g.security_max_proportion)
# 判断单只最大买入股数或金额
amount = max_buy_value_or_amount(stock,value,None,None)
# 下单
order(stock, amount, MarketOrderStyle())
return
################################### 公用函数群 ##################################(新手忽略)
## 过滤同一标的继上次卖出N天不再买入
def filter_n_tradeday_not_buy(security, n=0):
try:
if (security in g.selled_security_list.keys()) and (g.selled_security_list[security]<n):
return False
return True
except:
return True
## 是否可重复买入
def holded_filter(context,security_list):
if not g.filter_holded:
security_list = [stock for stock in security_list if stock not in context.portfolio.positions.keys()]
# 返回结果
return security_list
## 卖出股票加入dict
def selled_security_list_dict(context,security_list):
selled_sl = [s for s in security_list if s not in context.portfolio.positions.keys()]
if len(selled_sl)>0:
for stock in selled_sl:
g.selled_security_list[stock] = 0
## 过滤停牌股票
def paused_filter(context, security_list):
if g.filter_paused:
current_data = get_current_data()
security_list = [stock for stock in security_list if not current_data[stock].paused]
# 返回结果
return security_list
## 过滤退市股票
def delisted_filter(context, security_list):
if g.filter_delisted:
current_data = get_current_data()
security_list = [stock for stock in security_list if not ((&#39;退&#39; in current_data[stock].name) or (&#39;*&#39; in current_data[stock].name))]
# 返回结果
return security_list
## 过滤ST股票
def st_filter(context, security_list):
if g.only_st:
current_data = get_current_data()
security_list = [stock for stock in security_list if current_data[stock].is_st]
else:
if g.filter_st:
current_data = get_current_data()
security_list = [stock for stock in security_list if not current_data[stock].is_st]
# 返回结果
return security_list
# 过滤涨停股票
def high_limit_filter(context, security_list):
current_data = get_current_data()
security_list = [stock for stock in security_list if not (current_data[stock].day_open >= current_data[stock].high_limit)]
# 返回结果
return security_list
# 获取股票股票池
def get_security_universe(context, security_universe_index, security_universe_user_securities):
temp_index = []
for s in security_universe_index:
if s == &#39;all_a_securities&#39;:
temp_index += list(get_all_securities([&#39;stock&#39;], context.current_dt.date()).index)
else:
temp_index += get_index_stocks(s)
for x in security_universe_user_securities:
temp_index += x
return sorted(list(set(temp_index)))
# 行业过滤
def industry_filter(context, security_list, industry_list):
if len(industry_list) == 0:
# 返回股票列表
return security_list
else:
securities = []
for s in industry_list:
temp_securities = get_industry_stocks(s)
securities += temp_securities
security_list = [stock for stock in security_list if stock in securities]
# 返回股票列表
return security_list
# 概念过滤
def concept_filter(context, security_list, concept_list):
if len(concept_list) == 0:
return security_list
else:
securities = []
for s in concept_list:
temp_securities = get_concept_stocks(s)
securities += temp_securities
security_list = [stock for stock in security_list if stock in securities]
# 返回股票列表
return security_list
## 卖出股票日期计数
def selled_security_list_count(context):
#g.daily_risk_management = True
if len(g.selled_security_list)>0:
for stock in g.selled_security_list.keys():
g.selled_security_list[stock] += 1
#获取交易日
def shifttradingday(date,shift):
#获取N天前的交易日日期
# 获取所有的交易日,返回一个包含所有交易日的 list,元素值为 datetime.date 类型.
tradingday = get_all_trade_days()
# 得到date之后shift天那一天在列表中的行标号 返回一个数
shiftday_index = list(tradingday).index(date)+shift
# 根据行号返回该日日期 为datetime.date类型
return tradingday[shiftday_index] |
|