Python Snap7 终极教程:从入门到精通的完整指南
python-snap7 是一个功能强大的库,它允许你的 Python 程序作为客户端,直接与西门子 S7 系列 PLC(S7-300, 400, 1200, 1500)进行以太网通信。本教程将重点介绍最常见的数据块 (DB) 读写操作。
第 1 步:环境准备
在开始编码之前,必须确保 Python 和 PLC 两端的设置都正确。
1.1 Python 端:安装库
这很简单,使用 pip:
pip install python-snap7
1.2 PLC 端:TIA Portal (博图) 关键设置
如果 PLC 设置不正确,snap7 永远无法连接或读取数据。
- 启用 PUT/GET 通信 (最重要!)
- 在 TIA Portal 中,双击你的 CPU,打开“设备配置”。
- 在“属性” -> “常规” -> “保护与安全” -> “连接机制”中。
- 必须勾选 “允许来自远程伙伴(PLC, HMI, OPC, …)的 PUT/GET 通信”。
- 取消“优化的块访问” (读写 DB 块必须)
snap7依赖“绝对地址”(例如DB1.DBD4对应偏移量4)。“优化”的 DB 块没有这种固定地址。- 在你想要通过
snap7访问的 每一个 DB 块 上,右键 -> “属性”。 - 在“常规” -> “属性”中,取消勾选 “优化的块访问”。
- 取消后,你需要重新编译并下载硬件和软件配置到 PLC。
- 网络设置
- 确保你的 Python 计算机和 PLC 在同一个 IP 子网内,并且彼此可以
ping通。 - 记下 PLC 的 IP 地址。
- 确保你的 Python 计算机和 PLC 在同一个 IP 子网内,并且彼此可以
- Rack (机架) 和 Slot (插槽)
- 这是 S7 协议的寻址方式。
- 对于 S7-1200 / S7-1500,CPU 通常是 Rack = 0, Slot = 1。
- 对于 S7-300,CPU 通常是 Rack = 0, Slot = 2。
- 你可以在 TIA Portal 的设备视图中确认。
第 2 步:连接与断开 PLC
一个健壮的程序总是使用 try...except...finally 来确保连接被正确关闭。
import snap7
import snap7.client as c
from snap7.util import *
import time
# --- PLC 连接参数 ---
PLC_IP = '192.168.0.1' # 替换成你的 PLC IP
PLC_RACK = 0 # 默认 0
PLC_SLOT = 1 # S7-1200/1500 默认 1 (S7-300 默认 2)
# --- 实例化客户端 ---
plc = c.Client()
try:
# --- 建立连接 ---
print(f"正在连接 PLC ({PLC_IP})...")
plc.connect(PLC_IP, PLC_RACK, PLC_SLOT)
# --- 检查连接状态 ---
if plc.get_connected():
print("连接成功!")
# --- 获取 PLC CPU 状态 (可选) ---
# 0x04 = STOP, 0x08 = RUN
cpu_state_raw = plc.get_cpu_state()
print(f"PLC 状态: {cpu_state_raw}") # 'S7CpuStatusRun', 'S7CpuStatusStop'
# ...
# ... 接下来所有的读写操作都在这个 'if' 块中进行 ...
# ...
else:
print("连接失败。请检查:")
print("1. IP 地址是否正确?")
print("2. Rack/Slot 是否正确? (S7-1200/1500 通常是 0 和 1)")
print("3. TIA Portal 中是否勾选了 'PUT/GET' 权限?")
print("4. 网络是否可达 (ping)?")
except snap7.snap7exceptions.Snap7Exception as e:
# 捕获 Snap7 可能的连接异常
print(f"连接时发生 Snap7 错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
# --- 断开连接 ---
# 确保无论程序成功还是失败,都断开连接释放 PLC 资源
if plc.get_connected():
plc.disconnect()
print("已断开 PLC 连接。")
第 3 步:核心 - 读取不同类型的数据
所有读取操作都返回一个 bytearray。我们必须使用 snap7.util 库中的辅助函数来将其转换为 Python 数据类型。
我们将创建一系列函数,假设这些函数都在 if plc.get_connected(): 块内被调用。
3.1 读取 BOOL (位)
- PLC 地址:
DB1.DBX0.0(DB 块 1, 字节 0, 位 0) - 方法: 你不能只读 1 个位,你必须读取它所在的整个字节 (1 byte),然后再解析出那个位。
def read_single_bool(client: c.Client, db_number: int, byte_index: int, bit_index: int):
"""
从PLC的一个特定DB地址读取单个BOOL值。
"""
try:
# 1. 读取包含该位的 1 个字节
byte_data = client.db_read(db_number, byte_index, 1)
# 2. 从读取到的字节中解析出指定的位
# get_bool(bytearray, 字节索引, 位索引)
value = get_bool(byte_data, 0, bit_index)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBX{byte_index}.{bit_index} (BOOL) 失败: {e}")
return None
# --- 调用示例 ---
# is_running = read_single_bool(plc, 1, 0, 0) # 读取 DB1.DBX0.0
# if is_running is not None:
# print(f"DB1.DBX0.0 的值: {is_running}")
3.2 读取 REAL (32位 浮点数)
- PLC 地址:
DB1.DBD4(DB 块 1, 从第 4 字节开始的双字) - 方法:
REAL占用 4 个字节。
def read_single_real(client: c.Client, db_number: int, start_byte_index: int):
"""
从PLC的一个特定DB地址读取单个REAL (浮点数)。
"""
try:
# 1. 读取 4 个字节
byte_data = client.db_read(db_number, start_byte_index, 4)
# 2. 使用 get_real 解析
# get_real(bytearray, 字节索引)
value = get_real(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBD{start_byte_index} (REAL) 失败: {e}")
return None
# --- 调用示例 ---
# temperature = read_single_real(plc, 1, 4) # 读取 DB1.DBD4
# if temperature is not None:
# print(f"DB1.DBD4 (温度) 的值: {temperature:.2f}") # 格式化为2位小数
3.3 读取 INT (16位 整数)
- PLC 地址:
DB1.DBW2(DB 块 1, 从第 2 字节开始的字) - 方法:
INT占用 2 个字节。
def read_single_int(client: c.Client, db_number: int, start_byte_index: int):
"""
从PLC的一个特定DB地址读取单个INT (16位整数)。
"""
try:
# 1. 读取 2 个字节
byte_data = client.db_read(db_number, start_byte_index, 2)
# 2. 使用 get_int 解析
value = get_int(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBW{start_byte_index} (INT) 失败: {e}")
return None
# --- 调用示例 ---
# item_count = read_single_int(plc, 1, 2) # 读取 DB1.DBW2
# if item_count is not None:
# print(f"DB1.DBW2 (计数值) 的值: {item_count}")
3.4 读取 DINT (32位 整数)
- PLC 地址:
DB1.DBD8(DB 块 1, 从第 8 字节开始的双字) - 方法:
DINT占用 4 个字节。
def read_single_dint(client: c.Client, db_number: int, start_byte_index: int):
"""
从PLC的一个特定DB地址读取单个DINT (32位整数)。
"""
try:
# 1. 读取 4 个字节
byte_data = client.db_read(db_number, start_byte_index, 4)
# 2. 使用 get_dint 解析
value = get_dint(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBD{start_byte_index} (DINT) 失败: {e}")
return None
# --- 调用示例 ---
# large_value = read_single_dint(plc, 1, 8) # 读取 DB1.DBD8
# if large_value is not None:
# print(f"DB1.DBD8 (DINT值) 的值: {large_value}")
3.5 读取 M 区 / I 区 / Q 区 (以 M0.0 为例)
如果你想读取 M、I (输入) 或 Q (输出) 区域,使用 read_area。
snap7.types.Area.MK-> M 区snap7.types.Area.PE-> I 区 (输入)snap7.types.Area.PA-> Q 区 (输出)
try:
# 读取 M0.0 (第 0 字节, 第 0 位)
# read_area(area, db_number, start, size)
# M区没有db_number,所以填 0
m_byte = plc.read_area(snap7.types.Area.MK, 0, 0, 1)
m0_0_value = get_bool(m_byte, 0, 0)
print(f"M0.0 的值: {m0_0_value}")
# 读取 IW0 (输入字 0,占用2字节)
i_word = plc.read_area(snap7.types.Area.PE, 0, 0, 2)
iw0_value = get_int(i_word, 0)
print(f"IW0 的值: {iw0_value}")
except Exception as e:
print(f"读取 M/I/Q 区失败: {e}")
3.6 读取 STRING (S7 字符串)
PLC 地址: DB1.DBB10 (STRING[50]) (DB 块 1, 从第 10 字节开始, 假设最大长度 50)
方法: STRING 在 PLC 中存储很特殊。一个 STRING[50] 占用 52 个字节:
- 第 1 字节: 最大长度 (值总是 50)
- 第 2 字节: 实际长度 (例如 ‘ABC’ 长度为 3)
- 后面 50 字节: 实际的 ASCII 字符
snap7 库简化了这个过程。我们必须读取所有 52 个字节 (max_len + 2),get_string 函数会智能地解析它。
def read_string(client: c.Client, db_number: int, start_byte_index: int, max_string_len: int):
"""
从PLC的一个特定DB地址读取S7 STRING。
:param max_string_len: 字符串在 TIA Portal 中定义的*最大*长度,例如 STRING[50] -> 50
"""
try:
# 1. 读取总共 (最大长度 + 2) 个字节
total_bytes = max_string_len + 2
byte_data = client.db_read(db_number, start_byte_index, total_bytes)
# 2. 使用 get_string 解析
# get_string(bytearray, 字节索引)
# 它会自动处理头部的 2 个字节,并返回实际的字符串
value = get_string(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBB{start_byte_index} (STRING[{max_string_len}]) 失败: {e}")
return None
# --- 调用示例 ---
# model_name = read_string(plc, 1, 10, 50) # 假设 DB1.DBB10 是 STRING[50]
# if model_name is not None:
# print(f"DB1.DBB10 (型号) 的值: {model_name}")
第 4 步:核心 - 写入不同类型的数据
写入是读取的逆过程:
- 创建一个正确大小的
bytearray。 - 使用
snap7.util.set_...函数将 Python 值“填充”到bytearray中。 - 调用
db_write()将bytearray发送到 PLC。
4.1 写入 BOOL (位) - 【最特殊】
警告: 你不能只写入 1 个位。这样做会覆盖同一字节中的其他 7 个位。 你必须使用 “读-改-写” (Read-Modify-Write) 模式。
def write_single_bool(client: c.Client, db_number: int, byte_index: int, bit_index: int, value: bool):
"""
使用 "读-改-写" 模式安全地写入单个BOOL值。
"""
try:
# 1. 读: 读取当前的整个字节
byte_data = client.db_read(db_number, byte_index, 1)
# 2. 改: 在 Python 内存中修改那个位
# set_bool(bytearray, 字节索引, 位索引, 值)
set_bool(byte_data, 0, bit_index, value)
# 3. 写: 把修改后的整个字节写回去
client.db_write(db_number, byte_index, byte_data)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBX{byte_index}.{bit_index} (BOOL) 失败: {e}")
return False
# --- 调用示例 ---
# print("正在写入 True 到 DB1.DBX0.1...")
# success = write_single_bool(plc, 1, 0, 1, True) # 写入 DB1.DBX0.1
# if success:
# print("写入成功。")
4.2 写入 REAL (32位 浮点数)
- PLC 地址:
DB1.DBD4
def write_single_real(client: c.Client, db_number: int, start_byte_index: int, value: float):
"""
向PLC的一个特定DB地址写入单个REAL (浮点数)。
"""
try:
# 1. 创建 4 字节的 bytearray
data_to_write = bytearray(4)
# 2. 将浮点数值填充到 bytearray
# set_real(bytearray, 字节索引, 值)
set_real(data_to_write, 0, value)
# 3. 写入 PLC
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBD{start_byte_index} (REAL) 失败: {e}")
return False
# --- 调用示例 ---
# new_temp = 98.6
# print(f"正在写入 {new_temp} 到 DB1.DBD4...")
# success = write_single_real(plc, 1, 4, new_temp)
# if success:
# print("写入成功。")
4.3 写入 INT (16位 整数)
- PLC 地址:
DB1.DBW2
def write_single_int(client: c.Client, db_number: int, start_byte_index: int, value: int):
"""
向PLC的一个特定DB地址写入单个INT (16位整数)。
"""
try:
# 1. 创建 2 字节的 bytearray
data_to_write = bytearray(2)
# 2. 填充
set_int(data_to_write, 0, value)
# 3. 写入
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBW{start_byte_index} (INT) 失败: {e}")
return False
# --- 调用示例 ---
# new_count = 500
# print(f"正在写入 {new_count} 到 DB1.DBW2...")
# success = write_single_int(plc, 1, 2, new_count)
# if success:
# print("写入成功。")
4.4写入 DINT (32位 整数)
PLC 地址: DB1.DBD8
def write_single_dint(client: c.Client, db_number: int, start_byte_index: int, value: int):
"""
向PLC的一个特定DB地址写入单个DINT (32位整数)。
"""
try:
# 1. 创建 4 字节的 bytearray
data_to_write = bytearray(4)
# 2. 填充
set_dint(data_to_write, 0, value)
# 3. 写入
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBD{start_byte_index} (DINT) 失败: {e}")
return False
# --- 调用示例 ---
# large_count = 1234567
# print(f"正在写入 {large_count} 到 DB1.DBD8...")
# success = write_single_dint(plc, 1, 8, large_count)
# if success:
# print("写入成功。")
4.5 写入 STRING (S7 字符串)
PLC 地址: DB1.DBB10 (STRING[50])
方法: set_string 辅助函数非常智能。它会为我们自动创建 2 字节的头部(最大长度和实际长度)。
def write_string(client: c.Client, db_number: int, start_byte_index: int, value: str, max_string_len: int):
"""
向PLC的一个特定DB地址写入S7 STRING。
如果 value 超过 max_string_len,它将被自动截断。
"""
try:
# 1. 创建 (最大长度 + 2) 字节的 bytearray
total_bytes = max_string_len + 2
data_to_write = bytearray(total_bytes)
# 2. 使用 set_string 填充
# set_string(bytearray, 字节索引, 字符串值, 最大字符串长度)
set_string(data_to_write, 0, value, max_string_len)
# 3. 写入
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBB{start_byte_index} (STRING) 失败: {e}")
return False
# --- 调用示例 ---
# new_model_name = "Model-XYZ-2025"
# print(f"正在写入 '{new_model_name}' 到 DB1.DBB10...")
# success = write_string(plc, 1, 10, new_model_name, 50) # 假设是 STRING[50]
# if success:
# print("写入成功。")
第 5 步:性能优化与最佳实践
1. 批量读写 (最重要的性能技巧)
不要 像这样在循环中单独读取数据:
# 错误的方式 (非常慢!):
start_time = time.time()
value_list = []
for i in range(10):
# 每次循环都是一次完整的网络请求
value_list.append(read_single_real(plc, 1, 4 + i*4))
print(f"读取 10 个 REAL (循环) 耗时: {time.time() - start_time:.4f} 秒")
要 一次性读取一个大数据块,然后在 Python 中解析:
# 正确的方式 (非常快!):
DB_NUM = 1
START_OFFSET = 4
NUM_REALS = 10
READ_SIZE_BYTES = NUM_REALS * 4 # 10 个 REAL * 4 字节/REAL
start_time = time.time()
try:
# 1. 一次性读取 40 个字节
all_data = plc.db_read(DB_NUM, START_OFFSET, READ_SIZE_BYTES)
value_list = []
# 2. 在 Python 内存中循环解析 (这几乎不花时间)
for i in range(NUM_REALS):
byte_offset = i * 4
value = get_real(all_data, byte_offset)
value_list.append(value)
print(f"读取 10 个 REAL (批量) 耗时: {time.time() - start_time:.4f} 秒")
# print(value_list)
except Exception as e:
print(f"批量读取失败: {e}")
网络延迟是瓶颈,而不是 Python 的解析速度。批量操作可以将性能提升 100 倍以上。写入也是同理。
5.2 批量写入 (Batch Write)
原理: 与批量读取相同。不要在循环中单独写入,而是在 Python 中**构建一个大的 bytearray**,然后一次性将其写入 PLC。
示例: 假设我们要一次性写入 DB1.DBD4 (REAL), DB1.DBD8 (DINT) 和 DB1.DBW12 (INT)。这 3 个变量在 DB 块中是连续的。
DBD4(REAL) @ 偏移 4, 长度 4DBD8(DINT) @ 偏移 8, 长度 4DBW12(INT) @ 偏移 12, 长度 2
总数据块从 DBB4 开始,到 DBB13 结束,总长度 (12 + 2) - 4 = 10 字节。
# --- 批量写入示例 ---
DB_NUM = 1
START_OFFSET = 4
TOTAL_SIZE = 10 # 4 (REAL) + 4 (DINT) + 2 (INT)
print("正在准备批量写入...")
try:
# 1. 创建一个 10 字节的空 bytearray
all_data_to_write = bytearray(TOTAL_SIZE)
# 2. 在 Python 内存中填充数据
# 注意偏移量是相对于这个 bytearray 的
set_real(all_data_to_write, 0, 99.9) # 写入 DBD4 (偏移 0)
set_dint(all_data_to_write, 4, 123456) # 写入 DBD8 (偏移 4)
set_int(all_data_to_write, 8, -500) # 写入 DBW12 (偏移 8)
# 3. 一次性写入 10 个字节
plc.db_write(DB_NUM, START_OFFSET, all_data_to_write)
print("批量写入成功!")
except Exception as e:
print(f"批量写入失败: {e}")
5.3 终极性能:Multi-Read / Multi-Write
原理: 当你需要读取的数据不连续,甚至不在同一个 DB 块或内存区时(例如,DB1.DBD4, DB10.DBW0, M0.0),使用 read_multi()。它将所有请求打包成一个,发给 PLC,PLC 再返回所有结果。
方法: read_multi() 接受一个 S7Map 对象列表。
from snap7.types import S7Map, S7AreaDB, S7AreaMK, S7WLReal, S7WLWord, S7WLBit
print("正在准备多区域读取...")
# 1. 定义你要读取的所有变量
# S7Map(Area, DBNumber, Start, Size)
items_to_read = (
# 项 1: DB1.DBD4 (REAL, 4 字节)
S7Map(S7AreaDB, 1, 4, 4),
# 项 2: DB10.DBW0 (INT, 2 字节)
S7Map(S7AreaDB, 10, 0, 2),
# 项 3: M0.0 (BOOL, 1 字节中的 1 位)
# 注意:读取 BOOL 仍然需要读取 1 整个字节
S7Map(S7AreaMK, 0, 0, 1)
)
try:
# 2. 一次性读取所有项
# plc.read_multi() 返回一个 S7Map 结果的列表
results = plc.read_multi(items_to_read)
# 3. 单独解析每个结果
# 结果的顺序与你请求的顺序一致
# 结果 0: DB1.DBD4
# (结果是一个 bytearray)
val_real = get_real(results[0], 0)
print(f"Multi-Read [DB1.DBD4]: {val_real}")
# 结果 1: DB10.DBW0
val_int = get_int(results[1], 0)
print(f"Multi-Read [DB10.DBW0]: {val_int}")
# 结果 2: M0.0
val_bool = get_bool(results[2], 0, 0)
print(f"Multi-Read [M0.0]: {val_bool}")
except Exception as e:
print(f"多区域读取 (read_multi) 失败: {e}")
# write_multi() 的用法完全相同,只是你需要先准备 S7Map 列表和
# 对应的 data 列表,然后调用 plc.write_multi(items, data)。
4. 完整性检查
- 再次检查 TIA Portal 设置:
PUT/GET和优化的块访问是所有问题的根源。 - 使用
try...finally: 始终确保plc.disconnect()被调用,否则你可能会耗尽 PLC 的连接资源(它可能只允许 8-16 个并发连接)。
第 6 步:完整可运行示例
这个脚本将所有内容(连接、辅助函数、读/写调用、断开)组合在一起。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Python-Snap7 终极教程 - 完整示例脚本
"""
import snap7
import snap7.client as c
from snap7.util import *
import time
# --- PLC 连接参数 ---
PLC_IP = '192.168.0.1' # 替换成你的 PLC IP
PLC_RACK = 0 # 默认 0
PLC_SLOT = 1 # S7-1200/1500 默认 1 (S7-300 默认 2)
# --- DB 和地址定义 (仅为示例) ---
DB_NUM = 1
# 地址
ADDR_BOOL_R = 'DBX0.0' # (Byte 0, Bit 0)
ADDR_BOOL_W = 'DBX0.1' # (Byte 0, Bit 1)
ADDR_INT_RW = 'DBW2' # (Byte 2)
ADDR_REAL_RW = 'DBD4' # (Byte 4)
ADDR_STRING_RW = 'DBB10' # (Byte 10)
STRING_MAX_LEN = 50 # 假设是 STRING[50]
# ----------------------------------------------------
# 步骤 3 & 4:定义所有读写辅助函数
# ----------------------------------------------------
# (为了简洁,这里省略了函数的 DocString,请参考上面教程中的定义)
def read_single_bool(client: c.Client, db_number: int, byte_index: int, bit_index: int):
try:
byte_data = client.db_read(db_number, byte_index, 1)
value = get_bool(byte_data, 0, bit_index)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBX{byte_index}.{bit_index} 失败: {e}")
return None
def write_single_bool(client: c.Client, db_number: int, byte_index: int, bit_index: int, value: bool):
try:
byte_data = client.db_read(db_number, byte_index, 1) # 1. 读
set_bool(byte_data, 0, bit_index, value) # 2. 改
client.db_write(db_number, byte_index, byte_data) # 3. 写
return True
except Exception as e:
print(f"写入 DB{db_number}.DBX{byte_index}.{bit_index} 失败: {e}")
return False
def read_single_real(client: c.Client, db_number: int, start_byte_index: int):
try:
byte_data = client.db_read(db_number, start_byte_index, 4)
value = get_real(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBD{start_byte_index} 失败: {e}")
return None
def write_single_real(client: c.Client, db_number: int, start_byte_index: int, value: float):
try:
data_to_write = bytearray(4)
set_real(data_to_write, 0, value)
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBD{start_byte_index} 失败: {e}")
return False
def read_single_int(client: c.Client, db_number: int, start_byte_index: int):
try:
byte_data = client.db_read(db_number, start_byte_index, 2)
value = get_int(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBW{start_byte_index} 失败: {e}")
return None
def write_single_int(client: c.Client, db_number: int, start_byte_index: int, value: int):
try:
data_to_write = bytearray(2)
set_int(data_to_write, 0, value)
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBW{start_byte_index} 失败: {e}")
return False
def read_string(client: c.Client, db_number: int, start_byte_index: int, max_string_len: int):
try:
total_bytes = max_string_len + 2
byte_data = client.db_read(db_number, start_byte_index, total_bytes)
value = get_string(byte_data, 0)
return value
except Exception as e:
print(f"读取 DB{db_number}.DBB{start_byte_index} 失败: {e}")
return None
def write_string(client: c.Client, db_number: int, start_byte_index: int, value: str, max_string_len: int):
try:
total_bytes = max_string_len + 2
data_to_write = bytearray(total_bytes)
set_string(data_to_write, 0, value, max_string_len)
client.db_write(db_number, start_byte_index, data_to_write)
return True
except Exception as e:
print(f"写入 DB{db_number}.DBB{start_byte_index} 失败: {e}")
return False
# ----------------------------------------------------
# 步骤 2:主程序 - 连接、执行、断开
# ----------------------------------------------------
def main():
plc = c.Client()
try:
# --- 建立连接 ---
print(f"正在连接 PLC ({PLC_IP})...")
plc.connect(PLC_IP, PLC_RACK, PLC_SLOT)
if plc.get_connected():
print("连接成功!")
# --- 1. 读取初始值 ---
print("\n--- 1. 正在读取初始值 ---")
init_temp = read_single_real(plc, DB_NUM, 4)
init_count = read_single_int(plc, DB_NUM, 2)
init_flag = read_single_bool(plc, DB_NUM, 0, 0)
init_model = read_string(plc, DB_NUM, 10, STRING_MAX_LEN)
print(f" 初始温度 (DBD4): {init_temp}")
print(f" 初始计数 (DBW2): {init_count}")
print(f" 初始标志 (DBX0.0): {init_flag}")
print(f" 初始型号 (DBB10): {init_model}")
# --- 2. 写入新值 ---
print("\n--- 2. 正在写入新值 ---")
new_temp = round(time.time() % 100, 2) # 用当前时间戳做新值
new_count = (init_count or 0) + 1
new_flag = not init_flag
new_model = f"Test_{int(time.time())}"
write_single_real(plc, DB_NUM, 4, new_temp)
write_single_int(plc, DB_NUM, 2, new_count)
write_single_bool(plc, DB_NUM, 0, 1, new_flag) # 写入 DBX0.1
write_string(plc, DB_NUM, 10, new_model, STRING_MAX_LEN)
print(f" 写入温度 -> {new_temp}")
print(f" 写入计数 -> {new_count}")
print(f" 写入标志 (DBX0.1) -> {new_flag}")
print(f" 写入型号 -> {new_model}")
# --- 3. 读回写入的值进行验证 ---
print("\n--- 3. 正在读回验证 ---")
val_temp = read_single_real(plc, DB_NUM, 4)
val_count = read_single_int(plc, DB_NUM, 2)
val_flag = read_single_bool(plc, DB_NUM, 0, 1)
val_model = read_string(plc, DB_NUM, 10, STRING_MAX_LEN)
print(f" 读回温度 (DBD4): {val_temp} (应为: {new_temp})")
print(f" 读回计数 (DBW2): {val_count} (应为: {new_count})")
print(f" 读回标志 (DBX0.1): {val_flag} (应为: {new_flag})")
print(f" 读回型号 (DBB10): {val_model} (应为: {new_model})")
# --- 4. 演示批量读取 (来自步骤 5.1) ---
print("\n--- 4. 演示批量读取 (DBB0-DBB9) ---")
try:
# 假设我们想一次性读取 DBX0.0 到 DBB9
all_data = plc.db_read(DB_NUM, 0, 10) # 读 10 个字节
val_b_0_0 = get_bool(all_data, 0, 0)
val_b_0_1 = get_bool(all_data, 0, 1)
val_i_2 = get_int(all_data, 2)
val_r_4 = get_real(all_data, 4)
print(f" 批量读 DBX0.0: {val_b_0_0}")
print(f" 批量读 DBX0.1: {val_b_0_1}")
print(f" 批量读 DBW2: {val_i_2}")
print(f" 批量读 DBD4: {val_r_4}")
except Exception as e:
print(f" 批量读取失败: {e}")
else:
print("连接失败。请检查 TIA Portal 设置 (PUT/GET) 和网络。")
except snap7.snap7exceptions.Snap7Exception as e:
print(f"连接时发生 Snap7 错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
# --- 断开连接 ---
if plc.get_connected():
plc.disconnect()
print("\n已断开 PLC 连接。")
if __name__ == "__main__":
main()
第 7 步:常见问题 (FAQ) 与总结
- 我收到了
Snap7Exception: CPU: Item not available(项目不可用) 错误!- 90% 的可能:你访问的 DB 块 仍然是“优化的块访问”。去 TIA Portal,右键该 DB -> 属性 -> 取消勾选“优化的块访问”,然后重新编译并下载。
- 10% 的可能:你的地址(DB号、起始字节、长度)写错了。例如,你试图读取
DB1.DBB100,但DB1只有 50 字节长。
- 我收到了
Snap7Exception: ... function refused(功能被拒绝) 错误!- 99% 的可能:你没有在 TIA Portal 的 CPU 属性中勾选“允许来自远程伙伴…的 PUT/GET 通信”。
- 我写入一个 BOOL (DBX0.0),为什么 DBX0.1 到 DBX0.7 都变成 0 了?
- 你没有使用“读-改-写”模式(见 4.1 节)。你直接用
db_write写入了 1 个位,snap7必须写入整个字节。你必须先db_read整个字节,在内存中修改那 1 个位,然后再db_write整个字节回去。
- 你没有使用“读-改-写”模式(见 4.1 节)。你直接用
- 我读取的 STRING 为什么是乱码或者不完整?
- 你
db_read的长度错误。读取STRING[50],你必须db_read52 个字节 (max_len + 2)。snap7.util.get_string需要这 2 个头部字节来正确解析。 - 你
write_string的最大长度参数错误。set_string需要知道max_string_len才能正确生成头部。
- 你