psycopg2 详细使用教程:从入门到精通


psycopg2 详细使用教程:从入门到精通

1. 什么是 psycopg2?

  • PostgreSQL: 一款功能极其强大的开源对象-关系型数据库系统。
  • psycopg2: 一个 Python 库,充当 Python 应用程序和 PostgreSQL 数据库之间的“驱动”或“适配器”。它实现了 Python DB-API 2.0 规范,允许你使用标准的 Python 代码来执行 SQL 命令。

2. 安装

安装 psycopg2 有两种常见方式:

  1. 推荐 (用于开发和学习):

    pip install psycopg2-binary

    这个包预编译了所有必需的库,安装最简单,开箱即用。

  2. 用于生产环境 (需要编译):

    pip install psycopg2

    这需要你在系统上安装了 libpq(PostgreSQL 的 C 库)和相关的开发工具。它性能更好,但安装更复杂。

对于本教程,我们假设你已使用 psycopg2-binary


3. 核心工作流:连接与游标 (Cursor)

与数据库的所有交互都遵循以下模式:

  1. 连接 (Connect): 建立到 PostgreSQL 服务器的连接。
  2. 游标 (Cursor): 创建一个游标对象。游标是执行 SQL 命令的“手柄”。
  3. 执行 (Execute): 使用游标执行 SQL 语句。
  4. 获取 (Fetch): (如果是 SELECT 语句) 从游标中取出结果。
  5. 提交 (Commit): (如果是 INSERT, UPDATE, DELETE) 提交事务,使更改永久生效。
  6. 关闭 (Close): 关闭游标和连接,释放资源。

4. 【重要】最佳实践:使用 with 语句

手动管理 connect()close() 非常容易出错(比如程序崩溃时连接未关闭)。psycopg2 的连接和游标对象都是上下文管理器,必须使用 with 语句来自动管理资源。

这是本教程推荐的标准模式:

import psycopg2
import sys

# 数据库连接参数
# 强烈建议使用环境变量或配置文件,不要硬编码
DB_PARAMS = {
    'host': 'localhost',      # 数据库主机
    'port': 5432,             # 端口 (默认 5432)
    'user': 'postgres',       # 你的用户名
    'password': 'your_password', # 你的密码
    'dbname': 'test_db'       # 你要连接的数据库
}

try:
    # 1. 建立连接
    # with 语句确保连接在使用后自动关闭
    with psycopg2.connect(**DB_PARAMS) as conn:
        
        # 2. 创建游标
        # with 语句确保游标在使用后自动关闭
        with conn.cursor() as cur:
            
            # 3. 执行 SQL
            cur.execute("SELECT version();")
            
            # 4. 获取结果
            db_version = cur.fetchone() # fetchone() 获取一条结果
            print(f"数据库版本: {db_version[0]}")

            # 5. 提交 (对于 SELECT 不是必需的, 但对于 DML 是必需的)
            # conn.commit() # 在这里,with conn 会自动处理
            
except (Exception, psycopg2.DatabaseError) as error:
    print(f"连接数据库时出错: {error}")
    sys.exit(1)
    
# 6. 关闭
# 在 'with' 块结束时,cur.close() 和 conn.close() 已自动调用
print("连接已自动关闭。")

5. 【极其重要】防止 SQL 注入:参数化查询

永远不要使用 Python 的字符串格式化(如 f-string%)来构建 SQL 查询。这会导致严重的安全漏洞(SQL 注入)。

错误的方式 (危险!):

# 千万不要这样做!
user_id = "105 OR 1=1"
cur.execute(f"SELECT * FROM users WHERE id = {user_id}") 

正确的方式 (安全): psycopg2 会自动为你处理所有特殊字符的转义。你需要在 SQL 语句中使用 %s 作为占位符,然后将一个元组 (tuple) 作为第二个参数传递给 execute()

# 正确的方式:使用占位符
user_id = "105"
query = "SELECT * FROM users WHERE id = %s"

# 注意:即使只有一个参数,也必须将其放在元组中 (user_id,)
cur.execute(query, (user_id,))
  • 注意psycopg2 统一使用 %s 作为占位符,无论数据类型是字符串、数字还是日期

6. CRUD 操作详解

让我们创建一个 employees 表并对其进行操作。

A. 创建表 (CREATE)

create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    department VARCHAR(50),
    salary NUMERIC(10, 2),
    hire_date DATE
);
"""
try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute(create_table_query)
            # DDL (如 CREATE TABLE) 语句也需要提交
            conn.commit() 
            print("表 'employees' 创建成功 (如果它不存在的话)。")
except (Exception, psycopg2.DatabaseError) as error:
    print(error)
  • **conn.commit()*:这是关键。任何修改数据库结构 (DDL) 或数据 (DML) 的操作,都必须调用 conn.commit() 才能生效。with conn: 块在成功退出时*会自动提交。

B. 插入数据 (INSERT)

插入单行
query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data = ("Alice Green", "Engineering", 75000.00, "2023-01-15")

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute(query, data)
            # conn.commit() 在 with 块结束时自动调用
    print("单行插入成功。")
except (Exception, psycopg2.DatabaseError) as error:
    print(f"插入失败: {error}")
    # 如果出错,with conn 块会引发异常,自动执行 conn.rollback()
插入多行 (高效)

使用 cur.executemany() 来批量插入,这比在 for 循环中调用 execute() 高效得多。

query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data_list = [
    ("Bob White", "Sales", 60000.00, "2023-03-01"),
    ("Charlie Black", "Engineering", 80000.00, "2022-11-20"),
    ("David Brown", "Marketing", 55000.00, "2023-05-10")
]

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.executemany(query, data_list)
            # 自动提交
    print("批量插入成功。")
except (Exception, psycopg2.DatabaseError) as error:
    print(f"批量插入失败: {error}")
获取插入的 ID (RETURNING)

插入后经常需要获取新生成的 PRIMARY KEY

query = "INSERT INTO employees (name, department) VALUES (%s, %s) RETURNING id"
data = ("Eva Blue", "HR")

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute(query, data)
            new_id = cur.fetchone()[0] # 获取 RETURNING 的值
            print(f"成功插入新员工,ID: {new_id}")
            # 自动提交
except (Exception, psycopg2.DatabaseError) as error:
    print(f"插入失败: {error}")

C. 查询数据 (SELECT)

获取单条记录 fetchone()
try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM employees WHERE department = %s", ("Engineering",))
            
            print("正在获取一名工程师:")
            record = cur.fetchone() # 获取第一条
            if record:
                print(record) # (1, 'Alice Green', 'Engineering', 75000.00, datetime.date(2023, 1, 15))

            # 再次调用 fetchone() 会获取下一条
            record2 = cur.fetchone()
            if record2:
                print(record2)

            # 如果没有更多数据,fetchone() 返回 None
            record3 = cur.fetchone()
            print(f"第三条记录: {record3}") # None

except (Exception, psycopg2.DatabaseError) as error:
    print(error)
  • 结果格式: 默认情况下,psycopg2 返回的数据是**元组 (tuple)**。
获取所有记录 fetchall()
try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT name, salary FROM employees WHERE salary > %s", (58000,))
            
            print("\n薪水 > 58000 的所有员工:")
            records = cur.fetchall() # 获取所有剩余结果
            
            if records:
                for row in records:
                    print(f"姓名: {row[0]}, 薪水: {row[1]}")
            else:
                print("未找到记录。")
                
except (Exception, psycopg2.DatabaseError) as error:
    print(error)
  • 注意: 如果结果集非常大,fetchall() 可能会消耗大量内存。
迭代游标 (内存高效)

对于大数据集,最好的方式是直接迭代游标:

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            # 游标充当了迭代器
            cur.execute("SELECT * FROM employees")
            print("\n--- 迭代所有员工 ---")
            for row in cur:
                print(f"ID: {row[0]}, 姓名: {row[1]}")
                
except (Exception, psycopg2.DatabaseError) as error:
    print(error)

D. 更新数据 (UPDATE)

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            query = "UPDATE employees SET salary = salary * 1.10 WHERE department = %s"
            cur.execute(query, ("Engineering",))
            
            # cur.rowcount 包含受影响的行数
            updated_rows = cur.rowcount
            print(f"Engineering 部门 {updated_rows} 名员工已加薪。")
            # 自动提交
            
except (Exception, psycopg2.DatabaseError) as error:
    print(f"更新失败: {error}")

E. 删除数据 (DELETE)

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            query = "DELETE FROM employees WHERE name = %s"
            cur.execute(query, ("David Brown",))
            
            deleted_rows = cur.rowcount
            print(f"成功删除 {deleted_rows} 名员工。")
            # 自动提交
            
except (Exception, psycopg2.DatabaseError) as error:
    print(f"删除失败: {error}")

7. 事务管理 (Transaction)

数据库事务是一组“要么全做,要么全不做”的操作。

with psycopg2.connect(...) as conn: 块本身就构成了一个事务:

  • 如果 with 块中的代码成功执行完毕(没有抛出异常),conn.commit() 会在块退出时自动调用。
  • 如果 with 块中的代码抛出了异常conn.rollback() 会自动调用,撤销块中已执行的所有 SQL 操作。

示例:转账(原子操作) 假设 Alice 给 Bob 转账 1000。

def transfer_money(from_id, to_id, amount):
    try:
        with psycopg2.connect(**DB_PARAMS) as conn:
            # conn 现在处于一个事务中
            with conn.cursor() as cur:
                
                # 1. 从 Alice 账户扣款
                cur.execute(
                    "UPDATE employees SET salary = salary - %s WHERE id = %s", 
                    (amount, from_id)
                )
                
                # (模拟一个错误)
                # if from_id == 1:
                #     raise Exception("网络故障!")

                # 2. 向 Bob 账户加款
                cur.execute(
                    "UPDATE employees SET salary = salary + %s WHERE id = %s",
                    (amount, to_id)
                )
            
            # 3. 退出 with conn 块,自动 commit
            print("转账成功!")
            
    except (Exception, psycopg2.DatabaseError) as error:
        # 4. 发生异常,自动 rollback
        print(f"转账失败! 错误: {error}")
        print("事务已回滚。")

# 假设 Alice (id=1) 给 Bob (id=2) 转账 1000
transfer_money(1, 2, 1000.00) 

如果你取消上面“模拟一个错误”的注释,Alice 的扣款和 Bob 的加款 都不会 发生,数据库将保持原样。这就是事务的威力。


8. 高级技巧

A. 将结果作为字典获取 (RealDictCursor)

默认返回元组 (1, 'Alice') 不太方便。我们通常想要字典 {'id': 1, 'name': 'Alice'}

你需要导入 psycopg2.extras 并指定 cursor_factory

from psycopg2.extras import RealDictCursor

try:
    # 1. 在 connect 时指定 cursor_factory
    with psycopg2.connect(**DB_PARAMS, cursor_factory=RealDictCursor) as conn:
        
        # 2. 也可以在创建 cursor 时指定
        # with conn.cursor(cursor_factory=RealDictCursor) as cur:
        
        with conn.cursor() as cur: # cur 现在是 RealDictCursor
            cur.execute("SELECT id, name, salary FROM employees WHERE id = %s", (1,))
            record = cur.fetchone()
            
            if record:
                print("\n--- 字典游标结果 ---")
                print(record)
                # 输出: {'id': 1, 'name': 'Alice Green', 'salary': 82500.00}
                print(f"姓名: {record['name']}") # 可以通过键名访问
            
except (Exception, psycopg2.DatabaseError) as error:
    print(error)

B. 处理 NULL

psycopg2 会自动处理 Python 的 None 和 SQL 的 NULL 之间的转换。

这是你之前(在聊天记录中)遇到 invalid input syntax for "None" 错误的原因: 如果你使用 f-string (错误方式),None 会被转成字符串 "None"INSERT ... VALUES ('None') -> 数据库试图将字符串 “None” 存入 double precision 字段,导致失败。

正确方式 (使用参数化): psycopg2 会将 Python 的 None 正确转换为 SQL 的 NULL

# 插入一个 hire_date 为 NULL 的记录
query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data = ("Frank Nullman", "IT", 62000.00, None) # Python 的 None

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            cur.execute(query, data)
    print("成功插入带 NULL 值的记录。")
except (Exception, psycopg2.DatabaseError) as error:
    print(f"插入 NULL 失败: {error}")

C. 处理 JSON/JSONB 数据

PostgreSQ 对 JSON 有很好的支持。psycopg2 可以自动序列化和反序列化 Python 的 dictlist

from psycopg2.extras import Json

# 假设表有
# ALTER TABLE employees ADD COLUMN metadata JSONB;

# 写入 JSON
meta_data = {
    'skills': ['Python', 'SQL', 'OPC-UA'],
    'manager_id': 10
}

try:
    with psycopg2.connect(**DB_PARAMS) as conn:
        with conn.cursor() as cur:
            # 使用 extras.Json 包装一下
            query = "UPDATE employees SET metadata = %s WHERE name = %s"
            cur.execute(query, (Json(meta_data), "Alice Green"))
            
            # 读取 JSON
            cur.execute("SELECT metadata FROM employees WHERE name = %s", ("Alice Green",))
            result = cur.fetchone()[0]
            
            print(f"\n读取到的 JSONB 数据: {result}")
            print(f"数据类型: {type(result)}") # <class 'dict'>
            print(f"技能: {result['skills'][0]}") # 'Python'
            
except (Exception, psycopg2.DatabaseError) as error:
    print(f"JSON 操作失败: {error}")

9. 总结:最佳实践清单

  1. 始终使用 with 语句 管理连接 (conn) 和游标 (cur)。
  2. 永远使用参数化查询 (cur.execute(query, (params,))) 来防止 SQL 注入。%s 是唯一的占位符。
  3. 使用 conn.commit() 提交更改。依赖 with conn: 块的自动提交/回滚功能是最安全的方式。
  4. 使用 cur.executemany() 进行批量插入。
  5. 使用 psycopg2.extras.RealDictCursor 来获取字典形式的结果。
  6. **正确处理 None**:参数化查询会自动将 None 转为 NULL
  7. 捕获 psycopg2.DatabaseError 来处理数据库相关的特定异常。

文章作者: 0xdadream
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 0xdadream !
评论
  目录