Python快速大批量写数据到PostgreSQL数据表
Python和PostgreSQL数据库协同工作,需要用到psycopg2工具库,这也是Python编程中最常用的接口。 但是,为了适应不同的写入速度,psycopg2工具库提供了多种执行写入的方法,常用的有以下四种: execute() executemany() execute_batch() 构建个性化的字符串
下面对这四种方法的执行写数据到PostgreSQL数据表的速度做一个比较,看看插入10000条、100000条、1000000条记录,以上各种方法各自耗时多少。
创建数据表,定义计时函数 CREATE TABLE IF NOT EXISTS public.test_table ( source TEXT, datetime TIMESTAMP, mean_temp NUMERIC );
自定义的计时函数:def measure_time(func): def time_it(*args, **kwargs): time_started = time.time() func(*args, **kwargs) time_elapsed = time.time() print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__, sec=round( time_elapsed - time_started, 4), rows=len( kwargs.get("values")))) return time_it
方法1:execute()
此方法是执行数据库操作的标准函数,为了插入大批量的数据行,就必须要针对数据使用循环,并且调用该方法,一行一行地把数据插入到数据库的表中。执行execute()的方法代码如下:def method_execute(self, values): for value in values: self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value) self.connection.commit()
众所周知,执行循环是很慢的,插入1万条、10万条、100万条数据的执行结果如下:
可以看出,数据越多,花费的时间越长,两者基本上是线性关系。
方法2:executemany()
Psycopg2提供的 executemany()方法,并不是对execute()方法的简单改进,执行executemany()的方法代码如下:@measure_time def method_execute_many(self, values): self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values) self.connection.commit()
下图是执行不同的数据量所耗费的时间,结果如下图:
可以看出,方法1与方法2耗时并没有明显的差别。
方法3:execute_batch()
这是Psycopg2提供的又一种方法,它减少了访问数据库服务器的次数,与 executemany()相比,性能有了很大的改进。这种方法把许多语句拼接在一起,只受到page_size的限制(PostgreSQL中一般是8kB),执行execute_batch()的方法代码如下:@measure_time def method_execute_batch(self, values): psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values) self.connection.commit()
插入不同长度的数据,耗时结果如下图:
可以看出,这种方法的执行速度,比execute()方法和executemany()方法提高了将近4倍!
方法4:构建个性化的字符串
这种方法是构建一个个性化的字符串,然后用一条execute()语句来执行,代码如下: @measure_time def method_string_building(self, values): argument_string = ",".join("("%s", "%s")" % (x, y) for (x, y) in values) self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string) self.connection.commit()
看看这种方法的运行时间,结果如下图:
从图中的结果可以看出,这种方法是最快的,比方法1、方法2提高了20多倍,比方法3提高了3倍!
本文完整代码如下:import time import psycopg2 import psycopg2.extras TABLE_NAME = "TestTable" def measure_time(func): def time_it(*args, **kwargs): time_started = time.time() func(*args, **kwargs) time_elapsed = time.time() print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__, sec=round(time_elapsed - time_started,4), rows=len(kwargs.get("values")))) return time_it class PsycopgTest(): def __init__(self, num_rows): self.num_rows = num_rows def create_dummy_data(self): values = [] for i in range(self.num_rows): values.append((i + 1, "test")) return values def connect(self): conn_string = "host={0} user={1} dbname={2} password={3}".format("localhost", "postgres", "tutorial", "caspar") self.connection = psycopg2.connect(conn_string) self.cursor = self.connection.cursor() def create_table(self): self.cursor.execute( "CREATE TABLE IF NOT EXISTS {table} (id INT PRIMARY KEY, NAME text)".format(table=TABLE_NAME)) self.connection.commit() def truncate_table(self): self.cursor.execute("TRUNCATE TABLE {table} RESTART IDENTITY".format(table=TABLE_NAME)) self.connection.commit() @measure_time def method_execute(self, values): """Loop over the dataset and insert every row separately""" for value in values: self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value) self.connection.commit() @measure_time def method_execute_many(self, values): self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values) self.connection.commit() @measure_time def method_execute_batch(self, values): psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values) self.connection.commit() @measure_time def method_string_building(self, values): argument_string = ",".join("("%s", "%s")" % (x, y) for (x, y) in values) self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string) self.connection.commit() def main(): psyco = PsycopgTest(10000) psyco.connect() values = psyco.create_dummy_data() psyco.create_table() psyco.truncate_table() psyco.method_execute(values=values) # psyco.method_execute_many(values=values) # psyco.method_execute_batch(values=values) # psyco.method_string_building(values=values) if __name__ == "__main__": main()