国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - python實現多線程行情抓取工具的方法

python實現多線程行情抓取工具的方法

2021-01-18 00:26duduniao85 Python

當我們實現了單線程,接下來就是實現多線程了,下面這篇文章主要給大家介紹了關于python實現多線程行情抓取工具的方法,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧。

思路

借助python當中threading模塊與Queue模塊組合可以方便的實現基于生產者-消費者模型的多線程模型。Jimmy大神的tushare一直是廣大python數據分析以及業余量化愛好者喜愛的免費、開源的python財經數據接口包。

平時一直有在用阿里云服務器通過tushare的接口自動落地相關財經數據,但日復權行情數據以往在串行下載的過程當中,速度比較慢,有時遇到網絡原因還需要重下。每只股票的行情下載過程中都需要完成下載、落地2個步驟,一個可能需要網絡開銷、一個需要數據庫mysql的存取開銷。2者原本就可以獨立并行執行,是個典型的“生產者-消費者”模型。

基于queue與threading模塊的線程使用一般采用以下的套路:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
producerQueue=Queue()
consumerQueue=Queue()
lock = threading.Lock()
class producerThead(threading.Thread):
 def __init__(self, producerQueue,consumerQueue):
 self.producerQueue=producerQueue
 self.consumerQueue=consumerQueue
 
 
 
 def run(self):
 while not self.thread_stop:
  try:
  #接收任務,如果連續20秒沒有新的任務,線程退出,否則會一直執行
  item=self.producerQueue.get(block=True, timeout=20)
  #阻塞調用進程直到有數據可用。如果timeout是個正整數,
  #阻塞調用進程最多timeout秒,
  #如果一直無數據可用,拋出Empty異常(帶超時的阻塞調用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  #實現生產者邏輯,生成消費者需要處理的內容 consumerQueue.put(someItem)
  #還可以邊處理,邊生成新的生產任務
  doSomethingAboutProducing()
  self.producerQueue.task_done()
 def stop(self):
 self.thread_stop = True
 
class consumerThead(threading.Thread):
 def __init__(self,lock, consumerQueue):
 self.consumerQueue=consumerQueue
 def run(self):
 while true:
  try:
  #接收任務,如果連續20秒沒有新的任務,線程退出,否則會一直執行
  item=self.consumerQueue.get(block=True, timeout=20)
  #阻塞調用進程直到有數據可用。如果timeout是個正整數,
  #阻塞調用進程最多timeout秒,
  #如果一直無數據可用,拋出Empty異常(帶超時的阻塞調用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  doSomethingAboutConsuming(lock)# 處理消費者邏輯,必要時使用線程鎖 ,如文件操作等
  self.consumerQueue.task_done()
#定義主線程
def main():
 for i in range(n):#定義n個i消費者線程
 t = ThreadRead(producerQueue, consumerQueue)
 t.setDaemon(True)
 t.start()
 producerTasks=[] #定義初始化生產者任務隊列
 producerQueue.put(producerTasks)
 for i in range(n):#定義n個生產者錢程
 t = ThreadWrite(consumerQueue, lock)
 t.setDaemon(True)
 t.start()
 stock_queue.join()
 data_queue.join()

相關接口

1,股票列表信息接口

作用

獲取滬深上市公司基本情況。屬性包括:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
code,代碼
name,名稱
industry,所屬行業
area,地區
pe,市盈率
outstanding,流通股本(億)
totals,總股本(億)
totalAssets,總資產(萬)
liquidAssets,流動資產
fixedAssets,固定資產
reserved,公積金
reservedPerShare,每股公積金
esp,每股收益
bvps,每股凈資
pb,市凈率
timeToMarket,上市日期
undp,未分利潤
perundp, 每股未分配
rev,收入同比(%)
profit,利潤同比(%)
gpr,毛利率(%)
npr,凈利潤率(%)
holders,股東人數

調用方法

?
1
2
import tushare as ts
ts.get_stock_basics()

返回效果

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  name industry area  pe outstanding  totals totalAssets
code
600606 金豐投資  房產服務 上海  0.00  51832.01 51832.01 744930.44
002285 世聯行  房產服務 深圳 71.04  76352.17 76377.60 411595.28
000861 海印股份  房產服務 廣東 126.20  83775.50 118413.84 730716.56
000526 銀潤投資  房產服務 福建 2421.16  9619.50 9619.50  20065.32
000056 深國商  房產服務 深圳  0.00  14305.55 26508.14 787195.94
600895 張江高科  園區開發 上海 171.60 154868.95 154868.95 1771040.38
600736 蘇州高新  園區開發 江蘇 48.68 105788.15 105788.15 2125485.75
600663 陸家嘴  園區開發 上海 47.63 135808.41 186768.41 4562074.50
600658 電子城  園區開發 北京 19.39  58009.73 58009.73 431300.19
600648 外高橋  園區開發 上海 65.36  81022.34 113534.90 2508100.75
600639 浦東金橋  園區開發 上海 57.28  65664.88 92882.50 1241577.00
600604 市北高新  園區開發 上海 692.87  33352.42 56644.92 329289.50

2,日復權行情接口

作用

提供股票上市以來所有歷史數據,默認為前復權,讀取后存到本地,作為后續分析的基礎

調用方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ts.get_h_data('002337', start='2015-01-01', end='2015-03-16') #兩個日期之間的前復權數據
 
parameter:
code:string,股票代碼 e.g. 600848
start:string,開始日期 format:YYYY-MM-DD 為空時取當前日期
end:string,結束日期 format:YYYY-MM-DD 為空時取去年今日
autype:string,復權類型,qfq-前復權 hfq-后復權 None-不復權,默認為qfq
index:Boolean,是否是大盤指數,默認為False
retry_count : int, 默認3,如遇網絡等問題重復執行的次數
pause : int, 默認 0,重復請求數據過程中暫停的秒數,防止請求間隔時間太短出現的問題
 
return
date : 交易日期 (index)
open : 開盤價
high : 最高價
close : 收盤價
low : 最低價
volume : 成交量
amount : 成交金額

返回結果

?
1
2
3
4
5
6
7
8
9
10
11
12
   open high close low  volume  amount
date
2015-03-16 13.27 13.45 13.39 13.00 81212976 1073862784
2015-03-13 13.04 13.38 13.37 13.00 40548836 532739744
2015-03-12 13.29 13.95 13.28 12.96 71505720 962979904
2015-03-11 13.35 13.48 13.15 13.00 59110248 780300736
2015-03-10 13.16 13.67 13.59 12.72 105753088 1393819776
2015-03-09 13.77 14.73 14.13 13.70 139091552 1994454656
2015-03-06 12.17 13.39 13.39 12.17 89486704 1167752960
2015-03-05 12.79 12.80 12.17 12.08 26040832 966927360
2015-03-04 13.96 13.96 13.30 12.58 26636174 1060270720
2015-03-03 12.17 13.10 13.10 12.05 19290366 733336768

實現

廢話不多說,直接上代碼,

生產者線程,讀取行情

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ThreadRead(threading.Thread):
 def __init__(self, queue, out_queue):
  '''
  用于根據股票代碼、需要讀取的日期,讀取增量的日行情數據,
  :param queue:用于保存需要讀取的股票代碼、起始日期的列表
  :param out_queue:用于保存需要寫入到數據庫表的結果集列表
  :return:
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.out_queue = out_queue
 def run(self):
  while true:
   item = self.queue.get()
   time.sleep(0.5)
   try:
    df_h_data = ts.get_h_data(item['code'], start=item['startdate'], retry_count=10, pause=0.01)
    if df_h_data is not None and len(df_h_data)>0:
     df_h_data['secucode'] = item['code']
     df_h_data.index.name = 'date'
     print df_h_data.index,item['code'],item['startdate']
     df_h_data['tradeday'] = df_h_data.index.strftime('%Y-%m-%d')
     self.out_queue.put(df_h_data)
   except Exception, e:
    print str(e)
    self.queue.put(item) # 將沒有爬取成功的數據放回隊列里面去,以便下次重試。
    time.sleep(10)
    continue
   self.queue.task_done()

消費者線程,本地存儲

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ThreadWrite(threading.Thread):
 def __init__(self, queue, lock, db_engine):
  '''
  :param queue: 某種形式的任務隊列,此處為tushare為每個股票返回的最新日復權行情數據
  :param lock: 暫時用連接互斥操作,防止mysql高并發,后續可嘗試去掉
  :param db_engine: mysql數據庫的連接對象
  :return:no
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.lock = lock
  self.db_engine = db_engine
 
 def run(self):
  while True:
   item = self.queue.get()
   self._save_data(item)
   self.queue.task_done()
 
 def _save_data(self, item):
   with self.lock:
    try:
     item.to_sql('cron_dailyquote', self.db_engine, if_exists='append', index=False)
    except Exception, e: # 如果是新股,則有可能df_h_data是空對象,因此需要跳過此類情況不處理
     print str(e)

定義主線程

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from Queue import Queue
stock_queue = Queue()
data_queue = Queue()
lock = threading.Lock()
def main():
 '''
 用于測試多線程讀取數據
 :return:
 '''
 #獲取環境變量,取得相應的環境配置,上線時不需要再變更代碼
 global stock_queue
 global data_queue
 config=os.getenv('FLASK_CONFIG')
 if config == 'default':
  db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4'
 else:
  db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4'
 db_engine = create_engine(db_url, echo=True)
 conn = db_engine.connect()
 #TODO 增加ts.get_stock_basics()報錯的處理,如果取不到信息則直接用數據庫中的股票代碼信息,來獲取增量信息
 #TODO 增加一個標志,如果一個股票代碼的最新日期不是最新日期,則需標記該代碼不需要重新獲取數據,即記錄該股票更新日期到了最新工作日,
 df = ts.get_stock_basics()
 df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code': CHAR(6)})
 # 計算距離當前日期最大的工作日,以便每日定時更新
 today=time.strftime('%Y-%m-%d',time.localtime(time.time()))
 s1=("select max(t.date) from cron_tradeday t where flag=1 and t.date <='"+ today+"'")
 selectsql=text(s1)
 maxTradeay = conn.execute(selectsql).first()
 # 計算每只股票當前加載的最大工作日期,支持重跑
 s = ("select secucode,max(t.tradeday) from cron_dailyquote t group by secucode ")
 selectsql = text(s)
 result = conn.execute(selectsql) # 執行查詢語句
 df_result = pd.DataFrame(result.fetchall())
 df_result.columns=['stockcode','max_tradeday']
 df_result.set_index(df_result['stockcode'],inplace=True)
 # 開始歸檔前復權歷史行情至數據庫當中,以便可以方便地計算后續選股模型
 
 for i in range(3):#使用3個線程
  t = ThreadRead(stock_queue, data_queue)
  t.setDaemon(True)
  t.start()
 for code in set(list(df.index)):
  try:
   #如果當前股票已經是最新的行情數據,則直接跳過,方便重跑。
   #print maxTradeay[0],df_result.loc[code].values[1]
   if df_result.loc[code].values[1] == maxTradeay[0]:
    continue
   startdate=getLastNdate(df_result.loc[code].values[1],1)
  except Exception, e:
   #如果某只股票沒有相關的行情,則默認開始日期為2015年1月1日
   startdate='2015-01-01'
  item={}
  item['code']=code
  item['startdate']=startdate
  stock_queue.put(item) # 生成生產者任務隊列
 for i in range(3):
  t = ThreadWrite(data_queue, lock, db_engine)
  t.setDaemon(True)
  t.start()
 stock_queue.join()
 data_queue.join()

執行效果

原本需要2,3個小時才能執行完成的每日復權行情增量落地,有效縮短至了1小時以內,這里線程數并不上越多越好,由于復權行情讀的是新浪接口,在高并發情況下會返回HTTP 503服務器過載的錯誤,另外高并發下可能需要使用IP代理池,下載的時段也需要嘗試多個時段進行。初次嘗試,如果有更好的方法或者哪里有考慮不周的地方歡迎留言建議或者指正。

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。

原文鏈接:https://segmentfault.com/a/1190000013332661

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 99视频免费| 日韩二区三区 | 一区二区三区四区精品 | 欧美不卡 | 色综合久久88色综合天天6 | 日韩中文字幕在线免费观看 | 国产欧美日韩综合精品一区二区 | 最好的2019中文大全在线观看 | 欧美成人免费在线 | 欧美中文字幕一区二区三区 | 91精品国产综合久久久久久丝袜 | 欧美精品一区二区三区在线 | 在线欧美亚洲 | 国产成人黄色片 | 婷婷久久综合 | 国产探花在线精品一区二区 | 日韩视频精品在线 | 国产成人精品一区二区三区 | jlzzjlzz国产精品久久 | 免费午夜电影 | 欧美一区久久 | 国产精品一区二区在线观看 | 亚洲国产激情 | 日韩一区电影 | 亚洲日韩中文字幕一区 | 久久国产精品一区二区 | 性欧美精品久久久久久久 | 国产欧美日韩一区二区三区 | 免费久久99精品国产婷婷六月 | 日韩免费一级 | 激情网页 | 久久91精品国产 | 色嫩紧中文字幕在线 | 91网站入口 | 天天操天天干天天插 | 久久精品一 | 成人国产在线 | 一区二区三区在线免费观看 | 黄视频在线播放 | 春色网站 | av电影免费在线看 |