乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Sqlite—Python接口

       路人甲Java 2022-04-23
      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      
      import sqlite3,os,time
      import traceback
      
      class Sqlite():
          db_file = None     # 數(shù)據(jù)庫文件
          connection = None  # 數(shù)據(jù)庫連接對象
          
          def __init__(self):
              self.db_file = "/www/wwwroot/ding-server/db/test.db"
      
      	# 獲取數(shù)據(jù)庫對象
          def GetConn(self):
              try:
                  if self.connection == None:
                      self.connection = sqlite3.connect(self.db_file)
                      self.connection.text_factory = str
              except Exception as ex:
                  traceback.print_exc()
                  return "error: " + str(ex)
      
          # 增加任務
          def insert(self, sql):
          	self.write_lock()
          	self.GetConn()
              self.connection.text_factory = str
              try:
                  result = self.connection.execute(sql)
                  id = result.lastrowid
                  self.connection.commit()
                  self.rm_lock()
                  return id
              except Exception as ex:
                  return "error: " + str(ex)
                
          # 刪除任務  
          def delete(self, sql):
          	self.write_lock()
          	self.GetConn()
          	try:
                  result = self.connection.execute(sql)
                  self.connection.commit()
                  self.rm_lock()
                  return result.rowcount
              except Exception as ex:
                  return "error: " + str(ex)
                  
          # 修改任務狀態(tài)(完成/未完成)        
          def update(self, sql):
          	self.GetConn()
          	try:
                  result = self.connection.execute(sql)
                  self.connection.commit()
                  return result.rowcount
              except Exception as ex:
                  return "error: " + str(ex)
                  
          # 查詢?nèi)蝿?    def select(self, sql):
          	self.GetConn()
          	result = self.connection.execute(sql)
              data = result.fetchall()
              tmp = list(map(list,data))   # 元組轉成列表
              data = tmp
              return data
              
          # 方式1:創(chuàng)建數(shù)據(jù)表
          def create(self):
          	self.GetConn()
          	sql = '''create table if not exists tb_user(
          	          ID INT PRIMARY KEY     NOT NULL,
                        NAME           TEXT    NOT NULL,
                        AGE            INT     NOT NULL,
                        ADDRESS        CHAR(50),
                        SALARY         REAL);'''
              try:
          		self.connection.execute(sql)
          		self.connection.commit()
          		self.connection.close()
              except Exception as ex:
                  traceback.print_exc()
                  return "error: " + str(ex)
          
          # 方式2:創(chuàng)建數(shù)據(jù)表
          def create_back(self):
              self.GetConn()
              try:
                  fr = open('/www/wwwroot/ding-server/db/task.sql', 'rb')
                  result = self.connection.executescript(fr.read())
                  fr.close()
                  self.connection.commit()
                  return True
              except Exception as ex:
              	traceback.print_exc()
              	return False
              	
          def writeFile(self,filename,s_body,mode='w+'):
      		try:
      			fp = open(filename, mode);
      			fp.write(s_body)
      			fp.close()
      			return True
      		except:
      			try:
      				fp = open(filename, mode,encoding="utf-8");
      				fp.write(s_body)
      				fp.close()
      				return True
      			except:
      				return False
          	
          # 是否有鎖
          def is_lock(self):
              n = 0
              while os.path.exists(self.__LOCK):
                  n+=1
                  if n > 50:
                      self.rm_lock()
                      break
                  time.sleep(0.01)
                  
          # 寫鎖
          def write_lock(self):
              self.is_lock()
              self.writeFile(self.__LOCK,"True")
              
          # 解鎖
          def rm_lock(self):
              if os.path.exists(self.__LOCK):
                  os.remove(self.__LOCK)
              	
              	
      if __name__ == '__main__':
          obj = Sqlite()
          # obj.GetConn()    # 獲取數(shù)據(jù)庫連接,沒有的話,就創(chuàng)建數(shù)據(jù)庫
          obj.create()
      

      參考:https://www.runoob.com/sqlite/sqlite-python.html

      參考:https://www.jianshu.com/p/5ff30df3ba6b

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多