乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Python+MongoDB導(dǎo)入股票日線及分鐘線數(shù)據(jù)

       imelee 2017-09-11

      本文演示了采用Python腳本,配合MongoDB數(shù)據(jù)庫,讀取股票數(shù)據(jù)文件,并寫入數(shù)據(jù)庫的過程。 導(dǎo)入數(shù)據(jù)庫的方法支持單線程及多線程兩種方式。經(jīng)過本地開發(fā)機測試,開啟2-3個線程時,導(dǎo)入速度快于單線程導(dǎo)入;當(dāng)開啟線程超過4個之后,導(dǎo)入速度反而小于單線程;猜測可能是跟機器CPU核心數(shù)有關(guān),建議開始CPU核心數(shù)N-1的線程。例如,若CPU核心數(shù)為8時,可以開啟7個線程進(jìn)行導(dǎo)入,可大幅提高導(dǎo)入速度。 Python操作MongoDB,需要引入pymongo模塊,可自行搜索該模塊的安裝方法,不再贅述。以下是Python代碼:

      util_file.py 讀取數(shù)據(jù)文本,導(dǎo)入數(shù)據(jù)庫的公共方法類

      #!/usr/bin/env python
      #coding:utf-8
      
      import threading
      import pymongo
      import time
      
      class UtilFile:
          def IsSubString(SubStrList,Str):
              flag=True  
              for substr in SubStrList:  
                  if not(substr in Str):  
                      flag=False  
              return flag
      
          def GetFileList(FindPath,FlagStr=[]):
              import os  
              FileList=[]  
              FileNames=os.listdir(FindPath)  
              if (len(FileNames)>0):  
                 for fn in FileNames:  
                     if (len(FlagStr)>0):  
                         #返回指定類型的文件名  
                         if (UtilFile.IsSubString(FlagStr,fn)):  
                             #fullfilename=os.path.join(FindPath,fn)  
                             #FileList.append(fullfilename)  
                             FileList.append(fn)
                     else:  
                         #默認(rèn)直接返回所有文件名  
                         #fullfilename=os.path.join(FindPath,fn)  
                         #FileList.append(fullfilename)  
                         FileList.append(fn)
      
              #對文件名排序  
              if (len(FileList)>0):  
                  FileList.sort()       
              return FileList
      
          '''
          函數(shù)功能:讀取通達(dá)信數(shù)據(jù)導(dǎo)出文件內(nèi)容,導(dǎo)入到數(shù)據(jù)庫中 
          入?yún)ⅲ?    conn_db: 導(dǎo)入所使用的數(shù)據(jù)庫
          file_dir: 數(shù)據(jù)文件目錄
          file_list: 數(shù)據(jù)目錄下文件名List
          file_type: 數(shù)據(jù)文件類型。枚舉值。 M1:1分鐘線數(shù)據(jù); D: 日線數(shù)據(jù)
          tb_pre_name: 數(shù)據(jù)表名稱前綴
          print_level: 打印級別。枚舉值。0:不打印; 1:按文件打印; 2:按記錄打印
          thread_name: 線程名稱(打印輸出用)
          '''
          def ImpDataFromFile(conn_db, file_dir, file_list, file_type, tb_pre_name, print_level, thread_name):
              # file_list = UtilFile.GetFileList(file_dir)    #獲取1分鐘線目錄下文件名清單
              file_num = len(file_list)       #文件總數(shù)量
              file_prct = round(1/file_num,4)     #每文件占總數(shù)比
      
      
              # 讀取文件清單中每個文件的內(nèi)容,并解析每行記錄 {
              file_cnt = 0
              for file in file_list:
                  file_cnt += 1   #當(dāng)前處理文件序號
                  tm_start = time.clock()
                  #計算文件中的總行數(shù) {
                  op = open(file_dir+'/'+file)
                  record_num = 0
                  for line in op:
                      record_num += 1
                  #計算文件中的總行數(shù) }
      
                  #將文件逐行讀入表中 {
                  op = open(file_dir+'/'+file)
                  record_cnt = 0
                  for line in op:
                      if not("數(shù)據(jù)來源" in line):
                          content_temp = line.split('\n')
                          content = content_temp[0].split('\t')
      
                          if file_type == "M1":
                              record = {
                                          "date":content[0], #日期
                                          "time":content[1], #時間
                                          "begin":float(content[2]), #開盤
                                          "high":float(content[3]), #最高
                                          "low":float(content[4]), #最低    
                                          "end":float(content[5]), #收盤
                                          "vol":int(content[6]), #成交量
                                          "amt":float(content[7])#成交額
                                       }
                          elif file_type == "D":
                              record = {
                                          "date":content[0], #日期
                                          "begin":float(content[1]), #開盤
                                          "high":float(content[2]), #最高
                                          "low":float(content[3]), #最低    
                                          "end":float(content[4]), #收盤
                                          "vol":int(content[5]), #成交量
                                          "amt":float(content[6])#成交額
                                       }
      
                          clct = conn_db[tb_pre_name+file]    #打開/創(chuàng)建 數(shù)據(jù)表
      
                          #計算處理百分比 {
                          record_cnt += 1
                          if print_level == 2:
                              file_prcs = file_prct*(file_cnt-1)
                              record_prct = round(record_cnt/record_num,4)
                              ttl_prcs = round((file_prcs+file_prct*record_prct)*100,2)
                              print(thread_name+"\t"+str(ttl_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(record_cnt)+"/"+str(record_num))
                          #計算處理百分比 }
      
                          #確保索引存在 {
                          if file_type == "M1":
                              clct.ensure_index([("date", pymongo.DESCENDING), ("time", pymongo.DESCENDING)])
                              if clct.find({"date":content[0],"time":content[1]}).count() > 0:
                                  pass        #防止重復(fù)導(dǎo)入
                              else:
                                  clct.insert(record)
                          elif file_type == "D":
                              clct.ensure_index([("date", pymongo.DESCENDING)])
                              if clct.find({"date":content[0]}).count() > 0:
                                  pass        #防止重復(fù)導(dǎo)入
                              else:
                                  clct.insert(record)
                          #確保索引存在 }
      
                  #將文件逐行讀入表中 }
      
                  tm_finish = time.clock()
                  tm_spend = round(tm_finish - tm_start, 2)
                  if print_level == 1:
                      file_prcs = round(file_prct*file_cnt*100,2)
                      print(thread_name+"\t"+str(file_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(tm_spend)+"s")
              # 讀取文件清單中每個文件的內(nèi)容,并解析每行記錄 }
              print(thread_name+"\t"+"Import Done!")
      
          '''
          函數(shù)功能:使用多線程方式導(dǎo)入數(shù)據(jù)入口方法 
          入?yún)ⅲ?    imp_thread_num: 開啟線程數(shù)
          conn_db: 導(dǎo)入所使用的數(shù)據(jù)庫
          file_dir: 數(shù)據(jù)文件目錄
          file_type: 數(shù)據(jù)文件類型。枚舉值。 M1:1分鐘線數(shù)據(jù); D: 日線數(shù)據(jù)
          tb_pre_name: 數(shù)據(jù)表名稱前綴
          print_level: 打印級別。枚舉值。0:不打印; 1:按文件打印; 2:按記錄打印
          '''
          def ImpDataFromFileMultThrd(imp_thread_num, conn_db, file_dir, file_type, tb_pre_name, print_level):
              file_list = UtilFile.GetFileList(file_dir, ".txt")
              thrd_num = imp_thread_num
              thrd_task = []
              for i in range(thrd_num):
                  thrd_task.append([])
      
              for i in range(len(file_list)):
                  idx = divmod(i,thrd_num)[1]
                  thrd_task[idx].append(file_list[i])
      
              print(thrd_task)
      
              for i in range(thrd_num):
                  thrd = MyThread(i,"Thread-"+str(i+1),[conn_db, file_dir, thrd_task[i-1], file_type, tb_pre_name, print_level])
                  thrd.start()
      
      class MyThread (threading.Thread):   #繼承父類threading.Thread
          def __init__(self, threadID, name, func_args):
              threading.Thread.__init__(self)
              self.threadID = threadID
              self.name = name
      
              self.conn_db = func_args[0]
              self.file_dir = func_args[1]
              self.file_list = func_args[2]
              self.file_type = func_args[3]
              self.tb_pre_name = func_args[4]
              self.print_level = func_args[5]
      
          def run(self):                   #把要執(zhí)行的代碼寫到run函數(shù)里面 線程在創(chuàng)建后會直接運行run函數(shù) 
              print("Starting " + self.name)
              UtilFile.ImpDataFromFile(self.conn_db, self.file_dir, self.file_list, self.file_type, self.tb_pre_name, self.print_level, self.name)
              print("Exiting " + self.name)

      util_otr.py 一些其它常用的公共函數(shù)方法

      #!/usr/bin/env python
      #coding:utf-8
      
      '''
      函數(shù)功能:根據(jù)傳入的字符串,刪除名稱匹配的聚集 
      入?yún)ⅲ?db_conn: 數(shù)據(jù)庫連接
      clct_substr: 需匹配名稱的字符串
      '''
      def DelClcts(db_conn, clct_substr):
          a = db_conn.collection_names()
          b = []
          for i in a:
              if clct_substr in i:
                  b.append(i)
                  db_conn[i].drop()
                  print("Drop:"+i)
      
      '''
      函數(shù)功能:對Dict按Key排序
      入?yún)ⅲ?dic: 數(shù)據(jù)字典
      '''
      def SortDictKeys(dic): 
          return sorted(dic.items(), key=lambda d: d[0])

      main.py 導(dǎo)入的主程序,入口方法

      #!/usr/bin/env python
      #coding:utf-8
      
      
      from util_file import UtilFile
      # from util_thread import MyThread
      import pymongo
      
      #************************************************************************
      #常量定義清單
      #************************************************************************
      #文件
      stck_m1_file_dir = "D:/new_tdx/T0002/export/min1"   #1分鐘線數(shù)據(jù)文件目錄
      stck_day_file_dir = "D:/new_tdx/T0002/export/day"   #1分鐘線數(shù)據(jù)文件目錄
      
      #數(shù)據(jù)庫
      ip = '127.0.0.1'
      port = 27017
      db_name = 'robin'
      
      #表名稱
      bs_m1_pre = 'BS_M1_'#基礎(chǔ)表_1分鐘線數(shù)據(jù)表前綴
      bs_day_pre = 'BS_DAY_'#基礎(chǔ)表_日線數(shù)據(jù)表前綴
      
      #************************************************************************
      #主程序
      #************************************************************************
      conn = pymongo.MongoClient(host=ip, port=port)      #連接Mongodb數(shù)據(jù)庫服務(wù)器
      db = conn[db_name]      #選擇存儲數(shù)據(jù)庫
      
      
      #************************************************************************
      ##從文件中導(dǎo)入數(shù)據(jù)到基礎(chǔ)表
      #************************************************************************
      # UtilFile.ImpDataFromFileMultThrd(2, db, stck_m1_file_dir, "M1", bs_m1_pre, 1)     #導(dǎo)入1分鐘線數(shù)據(jù)
      UtilFile.ImpDataFromFileMultThrd(2, db, stck_day_file_dir, "D", bs_day_pre, 1)      #導(dǎo)入日線數(shù)據(jù)

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多