psycopg2 详细使用教程:从入门到精通
1. 什么是 psycopg2?
- PostgreSQL: 一款功能极其强大的开源对象-关系型数据库系统。
- psycopg2: 一个 Python 库,充当 Python 应用程序和 PostgreSQL 数据库之间的“驱动”或“适配器”。它实现了 Python DB-API 2.0 规范,允许你使用标准的 Python 代码来执行 SQL 命令。
2. 安装
安装 psycopg2 有两种常见方式:
推荐 (用于开发和学习):
pip install psycopg2-binary这个包预编译了所有必需的库,安装最简单,开箱即用。
用于生产环境 (需要编译):
pip install psycopg2这需要你在系统上安装了
libpq(PostgreSQL 的 C 库)和相关的开发工具。它性能更好,但安装更复杂。
对于本教程,我们假设你已使用 psycopg2-binary。
3. 核心工作流:连接与游标 (Cursor)
与数据库的所有交互都遵循以下模式:
- 连接 (Connect): 建立到 PostgreSQL 服务器的连接。
- 游标 (Cursor): 创建一个游标对象。游标是执行 SQL 命令的“手柄”。
- 执行 (Execute): 使用游标执行 SQL 语句。
- 获取 (Fetch): (如果是
SELECT语句) 从游标中取出结果。 - 提交 (Commit): (如果是
INSERT,UPDATE,DELETE) 提交事务,使更改永久生效。 - 关闭 (Close): 关闭游标和连接,释放资源。
4. 【重要】最佳实践:使用 with 语句
手动管理 connect() 和 close() 非常容易出错(比如程序崩溃时连接未关闭)。psycopg2 的连接和游标对象都是上下文管理器,必须使用 with 语句来自动管理资源。
这是本教程推荐的标准模式:
import psycopg2
import sys
# 数据库连接参数
# 强烈建议使用环境变量或配置文件,不要硬编码
DB_PARAMS = {
'host': 'localhost', # 数据库主机
'port': 5432, # 端口 (默认 5432)
'user': 'postgres', # 你的用户名
'password': 'your_password', # 你的密码
'dbname': 'test_db' # 你要连接的数据库
}
try:
# 1. 建立连接
# with 语句确保连接在使用后自动关闭
with psycopg2.connect(**DB_PARAMS) as conn:
# 2. 创建游标
# with 语句确保游标在使用后自动关闭
with conn.cursor() as cur:
# 3. 执行 SQL
cur.execute("SELECT version();")
# 4. 获取结果
db_version = cur.fetchone() # fetchone() 获取一条结果
print(f"数据库版本: {db_version[0]}")
# 5. 提交 (对于 SELECT 不是必需的, 但对于 DML 是必需的)
# conn.commit() # 在这里,with conn 会自动处理
except (Exception, psycopg2.DatabaseError) as error:
print(f"连接数据库时出错: {error}")
sys.exit(1)
# 6. 关闭
# 在 'with' 块结束时,cur.close() 和 conn.close() 已自动调用
print("连接已自动关闭。")
5. 【极其重要】防止 SQL 注入:参数化查询
永远不要使用 Python 的字符串格式化(如 f-string 或 %)来构建 SQL 查询。这会导致严重的安全漏洞(SQL 注入)。
错误的方式 (危险!):
# 千万不要这样做!
user_id = "105 OR 1=1"
cur.execute(f"SELECT * FROM users WHERE id = {user_id}")
正确的方式 (安全): psycopg2 会自动为你处理所有特殊字符的转义。你需要在 SQL 语句中使用 %s 作为占位符,然后将一个元组 (tuple) 作为第二个参数传递给 execute()。
# 正确的方式:使用占位符
user_id = "105"
query = "SELECT * FROM users WHERE id = %s"
# 注意:即使只有一个参数,也必须将其放在元组中 (user_id,)
cur.execute(query, (user_id,))
- 注意:
psycopg2统一使用%s作为占位符,无论数据类型是字符串、数字还是日期。
6. CRUD 操作详解
让我们创建一个 employees 表并对其进行操作。
A. 创建表 (CREATE)
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50),
salary NUMERIC(10, 2),
hire_date DATE
);
"""
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute(create_table_query)
# DDL (如 CREATE TABLE) 语句也需要提交
conn.commit()
print("表 'employees' 创建成功 (如果它不存在的话)。")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
- **
conn.commit()*:这是关键。任何修改数据库结构 (DDL) 或数据 (DML) 的操作,都必须调用conn.commit()才能生效。with conn:块在成功退出时*会自动提交。
B. 插入数据 (INSERT)
插入单行
query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data = ("Alice Green", "Engineering", 75000.00, "2023-01-15")
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute(query, data)
# conn.commit() 在 with 块结束时自动调用
print("单行插入成功。")
except (Exception, psycopg2.DatabaseError) as error:
print(f"插入失败: {error}")
# 如果出错,with conn 块会引发异常,自动执行 conn.rollback()
插入多行 (高效)
使用 cur.executemany() 来批量插入,这比在 for 循环中调用 execute() 高效得多。
query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data_list = [
("Bob White", "Sales", 60000.00, "2023-03-01"),
("Charlie Black", "Engineering", 80000.00, "2022-11-20"),
("David Brown", "Marketing", 55000.00, "2023-05-10")
]
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.executemany(query, data_list)
# 自动提交
print("批量插入成功。")
except (Exception, psycopg2.DatabaseError) as error:
print(f"批量插入失败: {error}")
获取插入的 ID (RETURNING)
插入后经常需要获取新生成的 PRIMARY KEY。
query = "INSERT INTO employees (name, department) VALUES (%s, %s) RETURNING id"
data = ("Eva Blue", "HR")
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute(query, data)
new_id = cur.fetchone()[0] # 获取 RETURNING 的值
print(f"成功插入新员工,ID: {new_id}")
# 自动提交
except (Exception, psycopg2.DatabaseError) as error:
print(f"插入失败: {error}")
C. 查询数据 (SELECT)
获取单条记录 fetchone()
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM employees WHERE department = %s", ("Engineering",))
print("正在获取一名工程师:")
record = cur.fetchone() # 获取第一条
if record:
print(record) # (1, 'Alice Green', 'Engineering', 75000.00, datetime.date(2023, 1, 15))
# 再次调用 fetchone() 会获取下一条
record2 = cur.fetchone()
if record2:
print(record2)
# 如果没有更多数据,fetchone() 返回 None
record3 = cur.fetchone()
print(f"第三条记录: {record3}") # None
except (Exception, psycopg2.DatabaseError) as error:
print(error)
- 结果格式: 默认情况下,
psycopg2返回的数据是**元组 (tuple)**。
获取所有记录 fetchall()
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute("SELECT name, salary FROM employees WHERE salary > %s", (58000,))
print("\n薪水 > 58000 的所有员工:")
records = cur.fetchall() # 获取所有剩余结果
if records:
for row in records:
print(f"姓名: {row[0]}, 薪水: {row[1]}")
else:
print("未找到记录。")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
- 注意: 如果结果集非常大,
fetchall()可能会消耗大量内存。
迭代游标 (内存高效)
对于大数据集,最好的方式是直接迭代游标:
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
# 游标充当了迭代器
cur.execute("SELECT * FROM employees")
print("\n--- 迭代所有员工 ---")
for row in cur:
print(f"ID: {row[0]}, 姓名: {row[1]}")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
D. 更新数据 (UPDATE)
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
query = "UPDATE employees SET salary = salary * 1.10 WHERE department = %s"
cur.execute(query, ("Engineering",))
# cur.rowcount 包含受影响的行数
updated_rows = cur.rowcount
print(f"Engineering 部门 {updated_rows} 名员工已加薪。")
# 自动提交
except (Exception, psycopg2.DatabaseError) as error:
print(f"更新失败: {error}")
E. 删除数据 (DELETE)
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
query = "DELETE FROM employees WHERE name = %s"
cur.execute(query, ("David Brown",))
deleted_rows = cur.rowcount
print(f"成功删除 {deleted_rows} 名员工。")
# 自动提交
except (Exception, psycopg2.DatabaseError) as error:
print(f"删除失败: {error}")
7. 事务管理 (Transaction)
数据库事务是一组“要么全做,要么全不做”的操作。
with psycopg2.connect(...) as conn: 块本身就构成了一个事务:
- 如果
with块中的代码成功执行完毕(没有抛出异常),conn.commit()会在块退出时自动调用。 - 如果
with块中的代码抛出了异常,conn.rollback()会自动调用,撤销块中已执行的所有 SQL 操作。
示例:转账(原子操作) 假设 Alice 给 Bob 转账 1000。
def transfer_money(from_id, to_id, amount):
try:
with psycopg2.connect(**DB_PARAMS) as conn:
# conn 现在处于一个事务中
with conn.cursor() as cur:
# 1. 从 Alice 账户扣款
cur.execute(
"UPDATE employees SET salary = salary - %s WHERE id = %s",
(amount, from_id)
)
# (模拟一个错误)
# if from_id == 1:
# raise Exception("网络故障!")
# 2. 向 Bob 账户加款
cur.execute(
"UPDATE employees SET salary = salary + %s WHERE id = %s",
(amount, to_id)
)
# 3. 退出 with conn 块,自动 commit
print("转账成功!")
except (Exception, psycopg2.DatabaseError) as error:
# 4. 发生异常,自动 rollback
print(f"转账失败! 错误: {error}")
print("事务已回滚。")
# 假设 Alice (id=1) 给 Bob (id=2) 转账 1000
transfer_money(1, 2, 1000.00)
如果你取消上面“模拟一个错误”的注释,Alice 的扣款和 Bob 的加款 都不会 发生,数据库将保持原样。这就是事务的威力。
8. 高级技巧
A. 将结果作为字典获取 (RealDictCursor)
默认返回元组 (1, 'Alice') 不太方便。我们通常想要字典 {'id': 1, 'name': 'Alice'}。
你需要导入 psycopg2.extras 并指定 cursor_factory。
from psycopg2.extras import RealDictCursor
try:
# 1. 在 connect 时指定 cursor_factory
with psycopg2.connect(**DB_PARAMS, cursor_factory=RealDictCursor) as conn:
# 2. 也可以在创建 cursor 时指定
# with conn.cursor(cursor_factory=RealDictCursor) as cur:
with conn.cursor() as cur: # cur 现在是 RealDictCursor
cur.execute("SELECT id, name, salary FROM employees WHERE id = %s", (1,))
record = cur.fetchone()
if record:
print("\n--- 字典游标结果 ---")
print(record)
# 输出: {'id': 1, 'name': 'Alice Green', 'salary': 82500.00}
print(f"姓名: {record['name']}") # 可以通过键名访问
except (Exception, psycopg2.DatabaseError) as error:
print(error)
B. 处理 NULL 值
psycopg2 会自动处理 Python 的 None 和 SQL 的 NULL 之间的转换。
这是你之前(在聊天记录中)遇到 invalid input syntax for "None" 错误的原因: 如果你使用 f-string (错误方式),None 会被转成字符串 "None"。 INSERT ... VALUES ('None') -> 数据库试图将字符串 “None” 存入 double precision 字段,导致失败。
正确方式 (使用参数化): psycopg2 会将 Python 的 None 正确转换为 SQL 的 NULL。
# 插入一个 hire_date 为 NULL 的记录
query = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
data = ("Frank Nullman", "IT", 62000.00, None) # Python 的 None
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
cur.execute(query, data)
print("成功插入带 NULL 值的记录。")
except (Exception, psycopg2.DatabaseError) as error:
print(f"插入 NULL 失败: {error}")
C. 处理 JSON/JSONB 数据
PostgreSQ 对 JSON 有很好的支持。psycopg2 可以自动序列化和反序列化 Python 的 dict 和 list。
from psycopg2.extras import Json
# 假设表有
# ALTER TABLE employees ADD COLUMN metadata JSONB;
# 写入 JSON
meta_data = {
'skills': ['Python', 'SQL', 'OPC-UA'],
'manager_id': 10
}
try:
with psycopg2.connect(**DB_PARAMS) as conn:
with conn.cursor() as cur:
# 使用 extras.Json 包装一下
query = "UPDATE employees SET metadata = %s WHERE name = %s"
cur.execute(query, (Json(meta_data), "Alice Green"))
# 读取 JSON
cur.execute("SELECT metadata FROM employees WHERE name = %s", ("Alice Green",))
result = cur.fetchone()[0]
print(f"\n读取到的 JSONB 数据: {result}")
print(f"数据类型: {type(result)}") # <class 'dict'>
print(f"技能: {result['skills'][0]}") # 'Python'
except (Exception, psycopg2.DatabaseError) as error:
print(f"JSON 操作失败: {error}")
9. 总结:最佳实践清单
- 始终使用
with语句 管理连接 (conn) 和游标 (cur)。 - 永远使用参数化查询 (
cur.execute(query, (params,))) 来防止 SQL 注入。%s是唯一的占位符。 - 使用
conn.commit()提交更改。依赖with conn:块的自动提交/回滚功能是最安全的方式。 - 使用
cur.executemany()进行批量插入。 - 使用
psycopg2.extras.RealDictCursor来获取字典形式的结果。 - **正确处理
None**:参数化查询会自动将None转为NULL。 - 捕获
psycopg2.DatabaseError来处理数据库相关的特定异常。