python如何加载数据,

  python如何加载数据,

  本文主要结合singleton模式的实际应用案例,分享实现实时增量数据加载工具的解决方案。最重要的是实现了一个可以添加、修改、删除的增量ID记录表。需要什么可以参考。

  00-1010创建增量ID记录表数据库连接类增量数据服务客户端结果测试本次主要结合singleton模式的实际应用案例分享实现实时增量数据加载工具的解决方案。最重要的是实现了一个可以添加、修改、删除的增量ID记录表。

  单例模式:提供一个全局访问点,以确保一个类只有一个特定类型的对象。它通常用于以下场景:日志记录或数据库操作等。以避免使用资源的请求冲突。

  

目录

  importsqlite3

  导入日期时间

  importpymssql

  importpandasaspd

  进口时间

  PD . set _ option( expand _ frame _ repr ,False)

  导入所需模块

  #创建数据表

   database_path=r 。\Database\ID_Record.db

  fromsqlite3importconnect

  with connect(database _ path)as con :

  连接执行(

  CREATETABLEIFNOTEXISTSIncremental _ data _ max _ id _ record(idintegerprimarykeyautocincrement,F_SDaqID_MAXTEXT,record _ datedatetime))

  最新增量记录ID-F_SDaqID_MAX数据库存储

  #将数据保存到本地txt

  Text _ save (filename,record) : # filename是写入txt文件的路径,record是写入F_SDaqID_MAX和record_date的数据列表。

  file=open(filename, a )追加方法

  #file=open(文件名, w)# overwrite方法

  foriinrange(len(record)):

  s=str(记录[i])。替换([, )。替换(],)

  S=S. replace( , )。replace(,, ) \ n #删除单引号、逗号,并在每行末尾追加换行符。

  文件.写入

  file.close()

  最新增量记录ID-F_SDaqID_MAX临时文件存储

  增量ID记录提供了两种实现方案,一种是数据持久存储模式,另一种是临时文件存储模式。顾名思义,数据持久化模式是指在创建对象时,可以记录增量ID-F_SDaqID_MAX等关键操作信息。这种标志记录映射是一种常见的设计模式。

  

创建增量ID记录表

  要实现实时增量数据采集,需要实现两个数据库连接类:增量数据ID存储类和增量目标数据源类。这里,数据库操作类采用singleton模式实现,增量服务记录信息存储在数据库或特定的日志文件中,以保持数据的一致性。

  1.增量数据ID存储sqlite连接类代码。

  class database _ SQLite(meta class=MetaSingleton):

   database_path=r 。\ Database \ energy _ RC _ configure . db

  连接=无

  defconnect(self):

  ifself.connectionisNone:

  self . connection=sqlite3 . connect(self . database _ path,check_same_thread=False,isolation_level=None)

  self . cursor obj=self . connection . cursor()

  returnself.cursorobj,self.connection

  #插入最大记录

  @静态方法

  定义插入最大标识记录(f1,f2):

  cursor=Database_sqlit

  e().connect()

          print(cursor)

          sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")"""

          cursor[0].execute(sql)

          # sql = "insert  into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)"

          # cursor[0].execute(sql,(f"{f1}",f"{f2}"))

          cursor[1].commit()

          print("插入成功!")

          # cursor[0].close()

          return 

      # 取出增量数据库中最新一次ID记录

      @staticmethod

      def View_Max_ID_Records():

          cursor = Database_sqlite().connect()

          sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record"

          cursor[0].execute(sql)

          results = cursor[0].fetchone()[0]

          # #单例模式不用关闭数据库连接

          # cursor[0].close()

          print("最新记录ID", results)

          return results

      #删除数据记录ID

      @staticmethod

      def Del_Max_ID_Records():

          cursor = Database_sqlite().connect()

          sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)"

          cursor[0].execute(sql)

          # results = cursor[0].fetchone()[0]

          # # cursor[0].close()

          cursor[1].commit()

          print("删除成功")

          return

  2、增量数据源sqlserver连接类代码

  

class Database_sqlserver(metaclass=MetaSingleton):

      """

      #实时数据库

      """

      connection = None

      # def connect(self):

      def __init__(self):

          if self.connection is None:

              self.connection = pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")

              if self.connection:

                  print("连接成功!")

              # 打开数据库连接

              self.cursorobj = self.connection.cursor()

          # return self.cursorobj, self.connection

      # 获取数据源中最大ID

      @staticmethod

      def get_F_SDaqID_MAX():

          # cursor_insert = Database_sqlserver().connect()

          cursor_insert = Database_sqlserver().cursorobj

          sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy"""

          cursor_insert.execute(sql_MAXID)  # 执行查询语句,选择表中所有数据

          F_SDaqID_MAX = cursor_insert.fetchone()[0]  # 获取记录

          print("最大ID值:{0}".format(F_SDaqID_MAX))

          return F_SDaqID_MAX

      # 提取增量数据

      @staticmethod

      def get_incremental_data(incremental_Max_ID):

          # 开始获取增量数据

          sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy  where F_ID > {0}""".format(

              incremental_Max_ID)

          # cursor_find = Database_sqlserver().connect()

          cursor_find = Database_sqlserver().cursorobj

          cursor_find.execute(sql_incremental_data)  # 执行查询语句,选择表中所有数据

          Target_data_source = cursor_find.fetchall()  # 获取所有数据记录

          # cursor_find.close()

          cursor_find.close()

          df = pd.DataFrame(

              Target_data_source,

              columns=[

                  "F_ID",

                  "F_Datetime",

                  "F_Data"])

          print("提取数据", df)

          return df

  数据资源应用服务设计主要考虑数据库操作的一致性和优化数据库的各种操作,提高内存或CPU利用率。

  实现多种读取和写入操作,客户端操作调用API,执行相应的DB操作。

  注:

  1、使用metaclass实现创建具有单例特征的类

  Database_sqlserver(metaclass=MetaSingleton)

  Database_sqlite(metaclass=MetaSingleton)

  使用class定义新类时,数据库类Database_sqlserver由MetaSingleton装饰后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法将自动执行。

  

class MetaSingleton(type):

      _instances={}

      def __call__(cls, *args, **kwargs):

          if cls not in cls._instances:

              cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)

          return cls._instances[cls]

  

  以上代码基于元类的单例实现,当客户端对数据库执行某些操作时,会多次实例化数据库类,但是只创建一个对象,所以对数据库的调用是同步的。

  2、多线程使用同一数据库连接资源需采取一定同步机制

  如果没采用同步机制,可能出现一些意料之外的情况

  1)withcls.lock加锁

  

class MetaSingleton(type):

      _instances={}

      lock = threading.Lock()

      def __call__(cls, *args, **kwargs):

          with cls.lock:

              if cls not in cls._instances:

                  time.sleep(0.05)  #模拟耗时

                  cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)

              return cls._instances[cls]

  

  锁的创建和释放需要消耗资源,上面代码每次创建都必须获得锁。

  3、如果我们开发的程序非单个应用,而是集群化的,即多个客户端共享单个数据库,导致数据库操作无法同步,而数据库连接池是更好的选择。大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。

  数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。

  

  

增量数据服务客户端

  增量处理策略:第一次加载先判断增量数据表中是否存在最新记录,若有直接加载;否则,记录一下最大/最新的数据记录ID或时间点,保存到一个增量数据库或记录文件中。

  从第二次加载开始只加载最大/最新的ID或时间点以后的数据。当加载过程全部成功完成之后并同步更新增量数据库或记录文件,更新这次数据记录的最后记录ID或时间点。

  一般这类数据记录表有自增长列,那么也可以使用自增长列来实现这个标识特征。比如本次我用到数据表增长列F_ID。

  

class IncrementalRecordServer:

      _servers = []

      _instance = None

      def __new__(cls, *args, **kwargs):

          if not IncrementalRecordServer._instance:

              # IncrementalRecordServer._instance = super().__new__(cls)

              IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls)

          return IncrementalRecordServer._instance

      def __init__(self,changeServersID=None):

          """

          变量初始化过程

          """

          self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()

          self.record_date = datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S)

          self.changeServersID = changeServersID

      # 回调更新本地记录,清空记录替换,临时记录

      def record(func):

          def Server_record(self):

              v = func(self)

              text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)

              print("保存成功")

              return v

          return Server_record

      #增加服务记录

      @record

      def addServer(self):

          self._servers.append([int(self.F_SDaqID_MAX),self.record_date])

          print("添加记录")

          Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date)

      #修改服务记录

      @record

      def changeServers(self):

          # self._servers.pop()

          # 此处传入手动修改的记录ID

          self._servers.append([self.changeServersID,self.record_date])

          #先删除再插入实现修改

          Database_sqlite.Del_Max_ID_Records()

          Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID, f2=self.record_date)

          print("更新记录")

      #删除服务记录

      @record

      def popServers(self):

          # self._servers.pop()

          print("删除记录")

          Database_sqlite.Del_Max_ID_Records()

      # 最新服务记录

      def getServers(self):

          # print(self._servers[-1])

          Max_ID_Records = Database_sqlite.View_Max_ID_Records()

          print("查看记录",Max_ID_Records)

          return Max_ID_Records

      #提取数据

      def Incremental_data_client(self):

          """

          # 提取数据(增量数据MAXID获取,并提取增量数据)

          """

          # 实时数据库

          # 第一次加载先判断是否存在最新记录

          if self.getServers() == None:

              # 插入增量数据库ID

              self.addServer()

              # 提取增量数据

              data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)

              return data

          # 获取增量数据库中已有的最新最大ID记录

          incremental_Max_ID = self.getServers()

          #添加记录

          self.addServer()

          # 提取增量数据

          Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID)

          return Target_data_source

  

  优化策略:

  1、延迟加载方式

  以上增量记录服务类IncrementalRecordServer通过覆盖__new__方法来控制对象的创建,我们在创建对象的时候会先检查对象是否存在。也可以通过懒加载的方式实现,节约资源优化如下。

  

class IncrementalRecordServer:

      _servers = []

      _instance = None

      def __init__(self,changeServersID=None):

          """

          变量初始化过程

          """

          self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()

          self.record_date = datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S)

          self.changeServersID = changeServersID

          if not IncrementalRecordServer._instance:

              print("__init__对象创建")

          else:

              print("对象已经存在:",IncrementalRecordServer._instance)

              self.getInstance()

      @classmethod

      def getInstance(cls):

          if not cls._instance:

              cls._instance = IncrementalRecordServer()

          return cls._instance

  懒汉式实例化能够确保实际需要时才创建对象,实例化a= IncrementalRecordServer()时,调用初始化__init__方法,但是没有新的对象创建。懒汉式这种方式加载类对象,也称为延迟加载方式。

  2、单例模式能有效利用空间资源,每次利用同一空间资源。

  不同操作对象的内存地址相同,且不同对象初始化将上一个对象初始化变量覆盖,确保最新记录实时更新。表面上以上代码实现了单例模式没问题,但多线程并发情况下,存在线程安全问题,可能同时创建不同的对象空间。考虑到线程安全,也可以进一步加锁处理.

  3、适用范围及注意事项

  本次代码适用于部署生产指定时间点运行之后产出的增量数据,长时间未启用再启动需要清空历史记录即增量数据库或文件ID需清空,一般实时数据增量实现一次加载没有什么问题,所以这一点也不用很关注(文件方式代码可自行完善);当加载历史数据库或定时间隔产生数据量过大时,需要进一步修改代码,需要判断数据规模,指定起始节点及加载数据量,综合因素考虑,下次分享一下亿级数据量提取方案。

  4、进一步了解Python垃圾回收机制;并发情况下,通过优化线程池来管理资源。

  最后可以添加一个函数来释放资源

  

def __del__(self):

      class_name = self.__class__.__name__

      print(class_name,"销毁")

  del obj 调用__del__() 销毁对象,释放其空间;只有Python 对象在不再引用对象时被释放。当程序中有其它变量引用该实例对象时,即便手动调用 __del__() 方法,该方法也不会立即执行。这和 Python 的垃圾回收机制的实现有关。

  

  

结果测试

  

if __name__ == __main__:

      for i in range(6):

          hc1 = IncrementalRecordServer()

          hc1.addServer()

          print("Record_ID",hc1._servers[i])

          # del hc1

          time.sleep(60)

      #Server2-客户端client

      # 最新服务记录

      hc2 = IncrementalRecordServer()

      hc2.getServers()

      #查看增量数据

      hc2.Incremental_data_client()

  

  插入记录

  模拟每1分钟插入一条记录,向增量数据库插入7条

  

  

  

if __name__ == __main__:

      # Server3-客户端client

      # 手动添加增量起始ID记录

      hc3 = IncrementalRecordServer(changeServersID=346449980)

      hc3.changeServers()

  

  

  

  

if __name__ == __main__:

      #删除ID

      hc3 = IncrementalRecordServer(changeServersID=346449980)

      # hc3.changeServers()

      hc3.popServers()

  

  

  以上就是Python实现实时增量数据加载工具的解决方案的详细内容,更多关于Python增量数据加载的资料请关注盛行IT软件开发工作室其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: