import baostock as bs import pandas as pd import os import mplfinance as mpf import sys import time import json from scipy.stats import pearsonr import multiprocessing BULLISH_K_INCREASE_LEN = 5.0 NOT_BREAK_K_COUNT = 6 BUY_WINDOW_LEN = 5 LEAST_PROFIT_RATE = 0.1 MAX_KEEP_DAYS = 10 DRAW_BUY_POINT_K_LINE = True DRAW_TO_FILE = True LOCAL_CACHE_PATH = "stocks-2025-04-14" RESULT_OUTPUT_PATH = "results_tmp" INDECATOR_CODE_LIST = ["sh.{:06}".format(i) for i in range(0, 999)] + ["sz.{:06}".format(i) for i in range(399001, 399999)] if os.name == 'nt': # Windows 系统 import msvcrt def getch(): return msvcrt.getch() else: # Linux/macOS 系统 import tty import termios def getch(): fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch def all_stock_count(date, outfile="stock_list.csv"): bs.login() stock_rs = bs.query_all_stock(date) df = stock_rs.get_data() bs.logout() print(f"股票总数:{len(df)}") df.to_csv(outfile, encoding="utf8", index=False) def download_data(date, outfile="test_result.csv", codes=None, freq="d"): bs.login() data_df = pd.DataFrame() # 获取指定日期的指数、股票数据 if codes is None: stock_rs = bs.query_all_stock(date) stock_df = stock_rs.get_data() codes = stock_df["code"] for code in codes: print("Downloading :" + code) k_rs = bs.query_history_k_data_plus(code, "date,code,open,high,low,close", date, date, frequency=freq) k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields) data_df = pd.concat([data_df, k_df], ignore_index=True) bs.logout() data_df.to_csv(outfile, encoding="gbk", index=False) print(data_df) def get_trade_day(): lg = bs.login() print('login respond error_code:'+lg.error_code) print('login respond error_msg:'+lg.error_msg) #### 获取交易日信息 #### rs = bs.query_trade_dates(start_date="2017-01-01", end_date="2017-06-30") print('query_trade_dates respond error_code:'+rs.error_code) print('query_trade_dates respond error_msg:'+rs.error_msg) #### 打印结果集 #### data_list = [] while (rs.error_code == '0') & rs.next(): # 获取一条记录,将记录合并在一起 data_list.append(rs.get_row_data()) result = pd.DataFrame(data_list, columns=rs.fields) #### 结果集输出到csv文件 #### result.to_csv("D:\\trade_datas.csv", encoding="gbk", index=False) print(result) #### 登出系统 #### bs.logout() def select_bullish_not_break(k_data, bullish_increase=6.0, period_lens=6): ret = [] r, c = k_data.shape for idx, stock in k_data.iterrows(): if idx + period_lens >= r: continue # 1. 找到启动大阳线 is_trade = stock["tradestatus"] if is_trade == 0: continue bullish_open = float(stock["open"]) bullish_close = float(stock["close"]) increase = float(stock["pctChg"]) if increase < bullish_increase: continue # 2. 其后n天,k线存在回踩大阳线顶,且不跌破大阳线 fall_down = False fall_break = False for i in range(idx+1, idx+period_lens+1): low = float(k_data.at[i, "low"]) open = float(k_data.at[i, "open"]) close = float(k_data.at[i, "close"]) if low < bullish_close or open < bullish_close or close < bullish_close: fall_down = True if low < bullish_open: fall_break = True if not fall_down or fall_break: continue # 3. 其后n天,振幅均小于大阳线 increase_too_large = False for i in range(idx+1, idx+period_lens+1): rate = k_data.at[i, "pctChg"] if rate > increase: increase_too_large = True break if increase_too_large: continue ret.append((idx, stock["date"])) return ret def check_profit(k_data:pd.DataFrame, buy_k_start:int, buy_k_end:int, least_profit_rate=0.2, max_keep_days=20, bullish_k:int=-1): ret = {} bullish_open = 0 k_end = k_data.shape[0] if bullish_k > 0: bullish_open = float(k_data.at[bullish_k, "open"]) if buy_k_start >= k_end: return {} if buy_k_end > k_end: buy_k_end = k_end for i in range(buy_k_start, buy_k_end): buy_open = float(k_data.at[i, "open"]) buy_close = float(k_data.at[i, "close"]) if buy_close < bullish_open: return {} sell_points = [] range_end = i+max_keep_days if range_end > k_end: range_end = k_end for j in range(i+1, range_end): current_open = float(k_data.at[j, "open"]) current_close = float(k_data.at[j, "close"]) profit = float(current_close - buy_close) / buy_close if profit < least_profit_rate: continue sell_points.append((j, profit, j - i)) if len(sell_points) > 0: ret[i] = sell_points return ret def pull_stock_data(start_day="2022-03-31", end_day=None): if end_day is None: end_day = time.strftime("%Y-%m-%d", time.localtime()) bs.login() start_stocks = bs.query_all_stock(start_day) end_stocks = bs.query_all_stock(end_day) start_data = start_stocks.get_data() end_data = end_stocks.get_data() # start_data.to_csv("1.csv", encoding="utf8", index=False) # end_data.to_csv("2.csv", encoding="utf8", index=False) exist_data = pd.merge(start_data, end_data, how="inner", on=["code"]) exist_data.to_csv("exist_stock.csv", encoding="utf8", index=False) if not os.path.exists(LOCAL_CACHE_PATH): os.mkdir(LOCAL_CACHE_PATH) for _, stock in exist_data.iterrows(): code = stock["code"] name = stock["code_name_y"] name = name.replace("*", "x") file_name = str(code) + "_" + str(name) + ".csv" print(file_name) file_path = os.path.join(LOCAL_CACHE_PATH, file_name) k_rs = bs.query_history_k_data_plus(code, "date,open,high,low,close,volume,turn,tradestatus,peTTM,isST,preclose,pctChg", start_day, end_day, frequency="d") k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields) k_df.to_csv(file_path, encoding="utf8", index=False) bs.logout() def list_cached_stocks(base_path=LOCAL_CACHE_PATH): ret = {} names = os.listdir(base_path) for name in names: path = os.path.join(base_path, name) if os.path.isfile(path): name = name[:len(name)-4] stock = name.split("_") if stock[0] in INDECATOR_CODE_LIST: continue ret[stock[0]] = stock[1] return ret def get_cached_stock(stock, base_path=LOCAL_CACHE_PATH): names = os.listdir(base_path) file_name = None for name in names: if stock in name: file_name = name break if file_name is None: print("can't find cached stock:" + stock) return pd.DataFrame() return pd.read_csv(os.path.join(base_path, file_name)) def draw_k_lines(k_data:pd.DataFrame, indecator_k:pd.Series, specified={}, save_to=None): k_data.loc[:,'date'] = pd.to_datetime(k_data['date']) k_data.set_index('date', inplace=True) mc = mpf.make_marketcolors( up='r', # 上涨蜡烛颜色(绿色) down='g', # 下跌蜡烛颜色(红色) edge='inherit', # 蜡烛边框颜色,inherit 表示继承上涨或下跌颜色 wick='inherit', # 影线颜色,inherit 表示继承上涨或下跌颜色 volume='inherit' # 成交量颜色,inherit 表示继承上涨或下跌颜色 ) if save_to is not None: out_dir, _ = os.path.split(save_to) if not os.path.exists(out_dir): os.makedirs(out_dir) # 指数曲线 curv_ap = mpf.make_addplot(indecator_k, color='b', linestyle='-', secondary_y=True) # 创建自定义样式 s = mpf.make_mpf_style(marketcolors=mc) markers = ['' if i not in specified else '^' for i in range(len(k_data))] marker_colors = ['black' if i not in specified else specified[i] for i in range(len(k_data))] ap = mpf.make_addplot(k_data['close'], type='scatter', marker=markers, color=marker_colors) if save_to is None: mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True) else: mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True, savefig=save_to) def select_opportunity(code, save_to_dir=None, draw_opportunity=False, draw_to_file=False, indicator_k=None, draw_no_buy_point=False): ok_cnt = 0 if save_to_dir is not None: serial_path = os.path.join(save_to_dir, code+".json") if os.path.exists(serial_path): exist_data = [] with open(serial_path, "r") as file: print("already processed stock:{0}".format(code)) exist_data = json.load(file) ok_cnt = 0 for d in exist_data: if len(d["trade"]) > 0: ok_cnt = ok_cnt + 1 return len(exist_data), ok_cnt if save_to_dir is not None: if not os.path.exists(save_to_dir): os.makedirs(save_to_dir) day_k_data = get_cached_stock(code) # print(day_k_data.loc[690:710]) out_json = [] candidates = select_bullish_not_break(day_k_data, BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT) print("stock:{0} total found {1} results".format(code, len(candidates))) for idx, date in candidates: serial_data = {} profits = check_profit(day_k_data, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx) correlation = 0 p_value = 0 if indicator_k is not None: start_k_idx = idx end_k_idx = idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN + MAX_KEEP_DAYS if end_k_idx >= day_k_data.shape[0]: end_k_idx = day_k_data.shape[0] correlation, p_value = pearsonr(day_k_data["close"][start_k_idx:end_k_idx], indicator_k["close"][start_k_idx:end_k_idx]) print("皮尔逊相关系数: {0}, p 值: {1}".format(correlation, p_value)) print(" bullish {0} has {1} buy points:".format(date, len(profits))) serial_data["bullish"] = date serial_data["pearson_correlation"] = correlation serial_data["pearson_p"] = p_value serial_data["trade"] = [] if len(profits) > 0: ok_cnt = ok_cnt + 1 for k, v in profits.items(): buyday = str(day_k_data.at[k, "date"]) buy_data = {"buyday":buyday, "sells":[]} print(" buy date:{0} has {1} sell points:".format(day_k_data.at[k, "date"], len(v))) for sell in v: day = sell[0] profit_rate = sell[1] sell_data = {"sellday":str(day_k_data.at[day, "date"]), "keep":sell[2], "profit":profit_rate} buy_data["sells"].append(sell_data) print(" sell point:{0} get profit:{1}".format(day_k_data.at[day, "date"], profit_rate)) serial_data["trade"].append(buy_data) print("-------------------------------------------------------------------------------") if len(serial_data) > 0: out_json.append(serial_data) if draw_opportunity: if len(profits) > 0 or draw_no_buy_point: kdata = day_k_data.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS] indicator = indicator_k.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS] colors = ["purple", "yellow", "pink", "black", "white", "green", "orange", "blue", "gray"] group = 0 color_group = {} for b in profits.keys(): color_group[b-idx] = colors[group] for v in profits[b]: color_group[v[0]-idx] = colors[group] group = group + 1 out_pic = None if draw_to_file: out_pic = os.path.join(save_to_dir, code) if len(profits) == 0: out_pic = os.path.join(out_pic, "no_buy_point") else: out_pic = os.path.join(out_pic, "buy_point") out_pic = os.path.join(out_pic, str(date)+".jpg") draw_k_lines(kdata, indicator["close"], color_group, out_pic) if save_to_dir and len(out_json)>0: with open(serial_path, "w") as file: json.dump(out_json, file, indent=2) return len(candidates), ok_cnt def worker(stock_code, indecator): try: return select_opportunity(stock_code, None, False, DRAW_TO_FILE, indecator, True) except Exception as e: print(f"Error processing {stock_code}: {e}") return (0, 0) # if __name__ == '__main__': # if not os.path.exists(LOCAL_CACHE_PATH): # pull_stock_data("2022-03-31", "2025-04-11") # all_stocks = list_cached_stocks() # sh_indecator = get_cached_stock("sh.000001") # sz_indecator = get_cached_stock("sz.399001") # no_buy_point_stock_count = 0 # has_buy_point_stock_count = 0 # no_bullish_stock_count = 0 # for code, name in all_stocks.items(): # ok = 0 # bullish = 0 # if str(code).startswith("sh"): # bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sh_indecator, True) # elif str(code).startswith("sz"): # bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sz_indecator, True) # else: # bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, None, True) # if bullish > 0: # if ok > 0: # has_buy_point_stock_count = has_buy_point_stock_count + 1 # else: # no_buy_point_stock_count = no_buy_point_stock_count + 1 # else: # no_bullish_stock_count = no_bullish_stock_count + 1 # print("total {0} stocks, {1} without bullish, {3} has buy point, {4} no buy point".format( # len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count)) def check_result(code, date): stock = get_cached_stock(code) idx = stock[stock["date"] == date].index.tolist()[0] result = check_profit(stock, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx) print("result:{0}".format(result)) def cal_profite(all_stocks, sh_indecator, sz_indecator, pool=None): no_buy_point_stock_count = 0 has_buy_point_stock_count = 0 no_bullish_stock_count = 0 work_param = [] for stock in all_stocks: work_param.append((stock, sh_indecator if str(stock).startswith("sh") else sz_indecator)) if pool is None: pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) results = pool.starmap(worker, work_param) pool.close() pool.join() total_bullish = 0 total_buy_point = 0 for bullish, ok in results: total_bullish += bullish total_buy_point += ok if bullish > 0: if ok > 0: has_buy_point_stock_count = has_buy_point_stock_count + 1 else: no_buy_point_stock_count = no_buy_point_stock_count + 1 else: no_bullish_stock_count = no_bullish_stock_count + 1 print("total {0} stocks, {1} without bullish, {2} has buy point, {3} no buy point".format( len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count)) print(" total bullish:{0}, total buy point:{1}, rate:{2}".format( total_bullish, total_buy_point, float(total_buy_point)/float(total_bullish))) summary = { "bullish_len": BULLISH_K_INCREASE_LEN, "not_break_len": NOT_BREAK_K_COUNT, "max_keep_days": MAX_KEEP_DAYS, "buy_window_len": BUY_WINDOW_LEN, "least_profit_rate": LEAST_PROFIT_RATE, "total_bullish": total_bullish, "total_buy_point": total_buy_point, "win_rate": float(total_buy_point)/float(total_bullish) } with open("summary.json", "r+") as file: try: old = json.load(file) if len(old) == 0: old = [] except: old = [] old.append(summary) file.seek(0) json.dump(old, file, indent=2) if __name__ == '__main__': # list_cached_stocks() # check_result("sz.000002", "2022-11-11") # exit() if not os.path.exists(LOCAL_CACHE_PATH): pull_stock_data("2022-03-31", "2025-04-14") all_stocks = list_cached_stocks() sh_indecator = get_cached_stock("sh.000001") sz_indecator = get_cached_stock("sz.399001") for bullish_k_len in range(1, 10): for not_break_k_count in range(2, 6): for max_keep_days in range(1, 20): for buy_window_len in range(2, 10): for least_profit_rate in [0.01, 0.05, 0.1, 0.15, 0.2]: old = [] with open("summary.json", "r+") as file: try: old = json.load(file) except: pass inold = False for exist in old: if bullish_k_len == exist["bullish_len"] and not_break_k_count == exist["not_break_len"] and max_keep_days == exist["max_keep_days"] and buy_window_len == exist["buy_window_len"] and least_profit_rate == exist["least_profit_rate"]: print("already processed this param") inold = True break if inold: continue BULLISH_K_INCREASE_LEN = bullish_k_len NOT_BREAK_K_COUNT = not_break_k_count MAX_KEEP_DAYS = max_keep_days BUY_WINDOW_LEN = buy_window_len LEAST_PROFIT_RATE = least_profit_rate print("BULLISH_K_INCREASE_LEN:{0}, NOT_BREAK_K_COUNT:{1}, MAX_KEEP_DAYS:{2}, BUY_WINDOW_LEN:{3}, LEAST_PROFIT_RATE:{4}".format( BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT, MAX_KEEP_DAYS, BUY_WINDOW_LEN, LEAST_PROFIT_RATE)) cal_profite(all_stocks, None, None)