Network Camera Scanner — 完整使用与原理教程


Network Camera Scanner — 完整使用与原理教程

本教程基于已在画布中提供的脚本 network_camera_scanner.py。该脚本旨在尽可能完整地检测当前网络内的摄像头设备(包括本地 USB / 内置 摄像头与局域网 IP 摄像头)。

目录

  1. 目标与概览
  2. 环境与依赖安装(可选依赖)
  3. 脚本功能总结(模块级说明)
  4. 详细使用步骤(运行与参数)
  5. 核心检测方法与实现细节
    • 本地摄像头检测
    • 网段识别
    • 主机发现(ARP / Ping)
    • SSDP(UPnP)发现
    • 端口与服务探测(RTSP / HTTP)
    • 启发式判断与打分
  6. 如何阅读扫描报告(JSON 字段说明)
  7. 性能优化与常见场景调整
  8. 常见问题与排查技巧
  9. 进阶扩展(GUI、ONVIF、凭证尝试、报表)
  10. 合规与安全注意事项
  11. 结语与下一步建议

1) 目标与概览

本教程目标:

  • 帮助你理解、运行并定制 network_camera_scanner.py,以便在本地网络中发现摄像头设备。
  • 解释每个检测步骤的原理与优缺点,帮助你减少漏检与误报。

本脚本并不保证 100% 检出率(网络/设备配置和安全设置会影响检测),但覆盖了家庭/办公场景中最常见的发现方法:本地摄像头、ARP、ping、SSDP、RTSP、HTTP。

2) 环境与依赖安装(可选依赖)

推荐 Python 版本

  • Python 3.8+。

可选但推荐安装的包

  • opencv-python —— 本地摄像头检测(读取帧以确认)
  • netifaces —— 精确获取网卡 IP 和子网掩码
  • scapy —— 高速 ARP 扫描(需要管理员权限或 root)
  • requests —— 更稳定的 HTTP 探测

安装命令:

pip install opencv-python requests netifaces scapy

说明:脚本会检测这些库的可用性;若缺失会自动降级为系统命令或纯 socket 实现(可用但慢一些)。

3) 脚本功能总结(模块级)

  • check_local_cameras:使用 OpenCV 逐索引打开摄像头并尝试读取一帧,确认可用性。
  • detect_network_from_interfaces / default_network_guess:确定本机所在网段(优先 netifaces,后退化为 x.x.x.0/24)。
  • arp_scan(scapy 优先):通过 ARP 广播获取局域网内存活主机。
  • discover_hosts_by_ping:并发 ping 网段中主机(无 scapy 时使用)。
  • ssdp_discover:向组播地址发送 SSDP M-SEARCH,发现支持 UPnP/SSDP 的设备(摄像头及 NVR 常支持)。
  • probe_host_for_camera:对存活主机的常见摄像头端口并发检测(端口连通、RTSP OPTIONS、HTTP GET),并进行关键词启发式判断。
  • 并发执行使大网段(/24)在合理时间内扫描完成,并将结果输出为 JSON 报告。

4) 详细使用步骤(运行与参数)

运行脚本

在脚本所在目录运行:

python network_camera_scanner.py

指定网段(推荐)

如果你知道本地网段,建议指定以加速扫描并避免遗漏:

python network_camera_scanner.py --network 192.168.0.0/24

其他常用参数

  • --max-local:检测本地摄像头时尝试的最大设备索引(默认6)。例如:--max-local 8

输出

脚本会生成 JSON 报告,文件名示例:camera_scan_report_192.168.0.0_1700000000.json。报告包含扫描时间、网段、发现的在线主机、SSDP 响应与每个主机的探测详情。

5) 核心检测方法与实现细节

5.1 本地摄像头检测(OpenCV)

  • 方法:依次尝试 cv2.VideoCapture(i)(i 从 0 开始)并调用 read() 尝试拉一帧。
  • 优点:能直接确认摄像头是否被系统可用。读到帧则为可用。
  • 缺点:某些系统或驱动下索引不连续或被独占(被其他程序占用)会导致误判。

5.2 网段识别

  • 优先使用 netifaces 读取系统网卡 IP 与子网掩码,准确推断网段。
  • 若不可用,则通过 UDP 连接 8.8.8.8:80 获取本机对外 IP,然后退化到 /24 网段。

5.3 主机发现:ARP 与 Ping

  • **ARP (scapy)**:向广播发送 ARP 请求并收集响应,速度快且可靠,能发现处于开机且连到同一二层网络的设备(但通常需要 root/管理员权限)。
  • Ping 扫描:在无 scapy 权限或库时使用。对整个网段并发发送 ping,较慢且对某些设备会被 ICMP 阻止。

5.4 SSDP(UPnP)发现

  • 发送 SSDP M-SEARCH239.255.255.250:1900,设备若支持 UPnP,会返回含 LOCATIONSERVER 的响应,响应体中可能包含设备描述 XML 的 URL,进一步可抓取设备型号信息。
  • 优点:不需知道 IP 范围,UPnP 设备会主动响应。
  • 缺点:很多设备关闭 UPnP,或路由器阻止组播。

5.5 端口与服务探测(RTSP / HTTP)

  • 常见端口:554, 8554 (RTSP), 80, 8080, 8000, 37777, 8888, 9000(HTTP 或 web interface)。
  • RTSP 探测:向 RTSP 端口发送 OPTIONS rtsp://ip/ 请求,若回应包含 RTSP 则很可能是摄像头或流媒体服务器。
  • HTTP 探测:请求 /,查看响应头与 body,若包含诸如 camerahikvisiondahuawebcam 等关键词,则标记为疑似摄像头。

5.6 启发式判断

脚本将综合:

  • 是否打开常见摄像头端口
  • RTSP 响应
  • HTTP header 或 body 中是否含关键字
  • SSDP 是否指向设备

若满足任一强指标(例如 RTSP 返回)或多项弱指标叠加,则标为 is_camera_guess = True

6) 如何阅读扫描报告(JSON 字段说明)

报告结构(简化示例):

{
  "scanned_at": "2025-11-12T15:00:00Z",
  "network": "192.168.0.0/24",
  "live_hosts": ["192.168.0.2", "192.168.0.10"],
  "ssdp": [{"ip": "192.168.0.10", "raw": "..."}],
  "probes": [
    {
      "ip": "192.168.0.10",
      "open_ports": [80, 554],
      "rtsp": "RTSP/1.0 200 OK...",
      "http": {"status_code": 200, "headers": {...}, "text_snippet": "..."},
      "is_camera_guess": true
    }
  ],
  "local_cameras": [0]
}

字段说明:

  • scanned_at:扫描的 UTC 时间(ISO 格式)。
  • network:被扫描的网段。
  • live_hosts:主机发现阶段认为在线的主机 IP 列表。
  • ssdp:SSDP 响应集合(包含响应原文)。
  • probes:每个在线主机的探测结果(端口、RTSP 响应、HTTP 响应、是否疑似摄像头)。
  • local_cameras:本地可用摄像头索引。

7) 性能优化与常见场景调整

  • 缩小网段:尽量扫描 /24 或更小网段;扫描 /16 会耗费大量时间与网络流量。
  • 并发调优:脚本使用 ThreadPoolExecutor(max_workers=200),可根据主机性能与网络环境调整(过高可能导致网络丢包或被防火墙限速)。
  • 优先 ARP:在同一二层网络中使用 ARP 扫描比 ping 更快、可靠(但需 root)。
  • 跳过已知安全设备:某些路由器或网关会阻止回应,考虑在配置中排除路由器 IP。

8) 常见问题与排查技巧

  • 问题:扫描后显示没有摄像头,但我确定有设备。
    • 检查摄像头是否在相同子网;摄像头可能在另一个 VLAN。
    • 摄像头可能禁用 RTSP/HTTP 或更改为非标准端口。
    • 尝试在摄像头所在的设备上运行 ARP 表或检查管理界面。`
  • 问题:Ping 扫描非常慢。
    • 使用 scapy 的 ARP 扫描(需要管理员),或缩小扫描范围并提高并发数。
  • 问题:大量误报(错误识别为摄像头)。
    • 检查 http.text_snippethttp.headers 中的关键字;调整关键词或增加更严格规则(如检测 WWW-Authenticate 或设备 XML)。

9) 进阶扩展(建议与实现方向)

  • GUI(Tkinter / PyQt)
    • 显示本地摄像头、网络扫描进度、发现结果列表;支持开始/停止、导出 JSON。可把脚本中 scan_network 封装并在后台线程运行,主线程更新 UI。
  • ONVIF 支持:安装 onvif-zeepzeep 等库,使用 ONVIF 服务获取设备信息(型号、序列号、网络配置)。需摄像头支持 ONVIF。
  • 凭证尝试(非常谨慎):自动尝试默认用户名/密码会带来伦理与法律风险,不建议在未授权网络中使用。仅在你有权限并在安全环境下测试。
  • HTML 报表 / 仪表盘:将 JSON 报告渲染为漂亮的 HTML 表格并加上设备截图(若可匿名拉取快照)。

10) 合规与安全注意事项

  • 必须在你有权限的网络中运行扫描。公司/校园/公共网络可能禁止或记录端口扫描,可能违反使用条款或法律。
  • 避免暴力破解/凭证猜测,未经授权访问设备是非法行为。
  • 保持脚本和依赖在受信任环境中运行,扫描会产生网络流量,注意不要触发 IDS/IPS 报警。

完整代码

	#!/usr/bin/env python3
"""
network_camera_scanner.py

功能:在本机运行,尽可能完整地检测当前网络下的摄像头设备(本地 USB / 内置 摄像头 + 局域网 IP 摄像头)。
设计原则:优先使用已安装的高权限库(scapy、netifaces、requests);如缺失会自动降级为纯 Python 实现(ping / socket / SSDP)。

主要功能:
 1) 自动侦测本地摄像头(OpenCV)——尽量打开并读取一帧以确认可用。
 2) 自动判断本地网段(尝试 netifaces -> UDP探测 ->  /24 退化)
 3) 主机存活探测:优先 ARP (scapy),否则并发 ping(子进程)。
 4) 端口探测:常见摄像头端口(554 rtsp, 80/8080 http, 8000, 8554, 37777 等)。
 5) RTSP探测:在 554/8554 上发送 RTSP OPTIONS 并解析响应。
 6) HTTP探测:GET / 并检查 Server/WWW-Authenticate/设备关键字("camera","dvr","ipcam","webcam","motion" 等)。
 7) SSDP(UPnP)广播发现:接收来自摄像设备的响应。
 8) 并发(ThreadPoolExecutor)以加速扫描。
 9) 输出 JSON 报告并打印友好表格。

注意:无法保证 100% 检出率 —— 某些摄像头可能在受限网络、启用了强认证或使用非标准端口并关闭了发现服务。但本脚本覆盖了常见和高概率的发现手段。

依赖(可选,脚本会检测并降级):
  - opencv-python  (cv2)       : 本地摄像头检测
  - netifaces                  : 精确获取网卡网段
  - scapy                      : 更快速、可靠的 ARP 扫描
  - requests                   : 更友好的 HTTP 探测

用法:
  python network_camera_scanner.py            # 使用自动网段检测(默认)
  python network_camera_scanner.py --network 192.168.1.0/24  # 指定网段

"""

import argparse
import concurrent.futures
import ipaddress
import json
import platform
import socket
import subprocess
import sys
import threading
import time
from datetime import datetime

# optional imports
try:
    import cv2
except Exception:
    cv2 = None

try:
    import netifaces
except Exception:
    netifaces = None

try:
    from scapy.all import ARP, Ether, srp, conf
except Exception:
    srp = None

try:
    import requests
except Exception:
    requests = None

# --------------------------- Configuration ---------------------------
COMMON_CAMERA_PORTS = [554, 8554, 80, 8080, 8000, 37777, 5000, 9000, 8888]
SSDP_MCAST = ("239.255.255.250", 1900)
RTSP_OPTIONS_STR = "OPTIONS rtsp://{ip}/ RTSP/1.0\r\nCSeq: 1\r\n\r\n"
HTTP_DEVICE_KEYWORDS = ["camera", "ipcam", "dvr", "webcam", "hikvision", "dahua", "axis", "gwell", "motion", "nvr"]

# --------------------------- Utilities ---------------------------

def get_local_ip():
    """通过 UDP 方式获得本机用于上行的 IP(不发起真实连接)"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        return None


def detect_network_from_interfaces():
    """尝试使用 netifaces 获取本地网段(优先);返回IPv4Network对象或 None"""
    if netifaces is None:
        return None
    try:
        for iface in netifaces.interfaces():
            addrs = netifaces.ifaddresses(iface)
            if netifaces.AF_INET in addrs:
                for addr in addrs[netifaces.AF_INET]:
                    ip = addr.get('addr')
                    netmask = addr.get('netmask')
                    if ip and netmask and not ip.startswith('127.'):
                        network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
                        return network
    except Exception:
        return None
    return None


def default_network_guess():
    ip = get_local_ip()
    if not ip:
        return None
    # fallback: assume /24
    network = ipaddress.IPv4Network(ip + '/24', strict=False)
    return network


# -------------------- Local camera detection --------------------

def check_local_cameras(max_devices=10, read_frame=True):
    """检测本机摄像头,返回可用设备索引列表。尝试读取一帧以确认真正可用。"""
    results = []
    if cv2 is None:
        print("[WARN] 未安装 opencv-python,跳过本地摄像头检测。安装: pip install opencv-python")
        return results

    print("[LOCAL] 正在检测本地摄像头...")
    for i in range(max_devices):
        cap = cv2.VideoCapture(i)
        if not cap or not cap.isOpened():
            if cap:
                cap.release()
            continue
        usable = True
        if read_frame:
            ret, frame = cap.read()
            if not ret or frame is None:
                usable = False
        cap.release()
        if usable:
            print(f"[LOCAL] 发现可用摄像头 索引={i}")
            results.append(i)
    if not results:
        print("[LOCAL] 未发现本地摄像头")
    return results


# -------------------- Network host discovery --------------------

def arp_scan(network, timeout=2):
    """优先使用 scapy ARP 扫描以获得在线主机列表。返回 IP 列表。"""
    if srp is None:
        return None
    try:
        conf.verb = 0
        # scapy expects a string like '192.168.0.0/24'
        ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=str(network)), timeout=timeout, retry=1)
        ips = [rcv.psrc for snd, rcv in ans]
        return list(sorted(set(ips), key=lambda x: tuple(map(int, x.split('.')))))
    except Exception:
        return None


def ping_host(ip, timeout=1):
    """使用系统 ping 判断主机是否存活(小平台差异处理)。返回 True/False"""
    # platform-specific params
    plat = platform.system().lower()
    if plat == 'windows':
        cmd = ['ping', '-n', '1', '-w', str(int(timeout*1000)), str(ip)]
    else:
        # macOS and linux
        cmd = ['ping', '-c', '1', '-W', str(int(timeout)), str(ip)]
    try:
        res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        return res.returncode == 0
    except Exception:
        return False


def discover_hosts_by_ping(network, max_workers=200):
    ips = [str(ip) for ip in network.hosts()]
    live = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(ping_host, ip): ip for ip in ips}
        for fut in concurrent.futures.as_completed(futures):
            ip = futures[fut]
            try:
                if fut.result():
                    live.append(ip)
            except Exception:
                pass
    return live


# -------------------- SSDP discovery --------------------

def ssdp_discover(timeout=2):
    """发送 SSDP M-SEARCH 广播,收集响应。返回 (ST, LOCATION, USN, SERVER, raw) 列表。"""
    msg = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: \"ssdp:discover\"\r\nMX: 1\r\nST: ssdp:all\r\n\r\n"
    responses = []
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        sock.settimeout(timeout)
        sock.sendto(msg.encode('utf-8'), SSDP_MCAST)
        start = time.time()
        while time.time() - start < timeout:
            try:
                data, addr = sock.recvfrom(65507)
                text = data.decode('utf-8', errors='ignore')
                responses.append((addr[0], text))
            except socket.timeout:
                break
    except Exception:
        pass
    return responses


# -------------------- Port & service probes --------------------

def is_port_open(ip, port, timeout=0.6):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        s.connect((ip, port))
        s.close()
        return True
    except Exception:
        return False


def probe_rtsp(ip, port, timeout=1.0):
    """发送 RTSP OPTIONS 请求并检查返回是否像 RTSP。"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        s.connect((ip, port))
        s.sendall(RTSP_OPTIONS_STR.format(ip=ip).encode('utf-8'))
        data = s.recv(1024)
        s.close()
        if b'RTSP' in data:
            return data.decode('utf-8', errors='ignore')
    except Exception:
        pass
    return None


def probe_http(ip, port, timeout=1.0):
    """尝试使用 requests(如可用)或 socket 发起简单的 HTTP GET 检测并返回响应头和部分body。"""
    if requests is not None:
        try:
            url = f'http://{ip}:{port}/'
            r = requests.get(url, timeout=timeout)
            info = {
                'status_code': r.status_code,
                'headers': dict(r.headers),
                'text_snippet': r.text[:1024]
            }
            return info
        except Exception:
            return None
    else:
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(b'GET / HTTP/1.0\r\nHost: %b\r\n\r\n' % ip.encode())
            data = s.recv(2048)
            s.close()
            txt = data.decode('utf-8', errors='ignore')
            # crude parse
            lines = txt.split('\r\n')
            headers = {}
            for ln in lines[1:20]:
                if ':' in ln:
                    k, v = ln.split(':', 1)
                    headers[k.strip()] = v.strip()
            return {'status_code': None, 'headers': headers, 'text_snippet': txt[:1024]}
        except Exception:
            return None


# -------------------- Host-level camera probe --------------------

def probe_host_for_camera(ip):
    """对目标主机进行端口 + 服务探测,返回字典结果(可能是摄像头)。"""
    result = {'ip': ip, 'open_ports': [], 'rtsp': None, 'http': None, 'ssdp_headers': [], 'is_camera_guess': False}

    # 1) SSDP responses matching this IP
    # (ssdp discovery performed globally; here we only do per-host checks in main flow)

    # 2) test ports
    for p in COMMON_CAMERA_PORTS:
        if is_port_open(ip, p):
            result['open_ports'].append(p)
            # probe specific
            if p in (554, 8554):
                r = probe_rtsp(ip, p)
                if r:
                    result['rtsp'] = r
            if p in (80, 8080, 8000, 8888, 9000, 5000):
                r = probe_http(ip, p)
                if r:
                    result['http'] = r

    # 3) quick heuristic: if any response headers or RTSP mention camera-related keywords
    rc = False
    if result['rtsp']:
        rc = True
    if result['http'] and 'headers' in result['http']:
        hs = json.dumps(result['http']['headers']).lower()
        for kw in HTTP_DEVICE_KEYWORDS:
            if kw in hs or kw in (result['http'].get('text_snippet') or '').lower():
                rc = True
                break
    # If opened common camera ports -> raise suspicion
    if any(p in result['open_ports'] for p in [554, 8554, 37777, 8000, 9000]):
        rc = True

    result['is_camera_guess'] = rc
    return result


# -------------------- Main scanner --------------------

def scan_network(network, use_scapy=True):
    report = {
        'scanned_at': datetime.utcnow().isoformat() + 'Z',
        'network': str(network),
        'live_hosts': [],
        'ssdp': [],
        'probes': []
    }

    # 1) SSDP discovery
    ssdp_resp = ssdp_discover(timeout=2)
    report['ssdp'] = [{'ip': ip, 'raw': raw} for ip, raw in ssdp_resp]

    # 2) host discovery
    live_hosts = None
    if use_scapy and srp is not None:
        print('[NET] 使用 scapy ARP 扫描 ...')
        live_hosts = arp_scan(network)
    if not live_hosts:
        print('[NET] 使用 ping 扫描主机 ... (慢但无库依赖)')
        live_hosts = discover_hosts_by_ping(network)

    report['live_hosts'] = live_hosts

    # 3) probe each host
    print(f'[NET] 对 {len(live_hosts)} 个存活主机进行服务探测 (并发) ...')
    with concurrent.futures.ThreadPoolExecutor(max_workers=200) as ex:
        futures = {ex.submit(probe_host_for_camera, ip): ip for ip in live_hosts}
        for fut in concurrent.futures.as_completed(futures):
            try:
                res = fut.result()
                report['probes'].append(res)
                if res['is_camera_guess']:
                    print(f"[FOUND] 疑似摄像头: {res['ip']} open_ports={res['open_ports']}")
            except Exception as e:
                pass

    return report


# -------------------- CLI and Runner --------------------

def main():
    parser = argparse.ArgumentParser(description='全面的本地 + 局域网摄像头检测器')
    parser.add_argument('--network', '-n', help='指定网段,例如 192.168.1.0/24', default=None)
    parser.add_argument('--max-local', type=int, default=6, help='本地摄像头最大索引检测数量')
    args = parser.parse_args()

    # local cameras
    local = check_local_cameras(max_devices=args.max_local)

    # network detection
    network = None
    if args.network:
        network = ipaddress.IPv4Network(args.network, strict=False)
    else:
        network = detect_network_from_interfaces()
        if network is None:
            network = default_network_guess()

    if network is None:
        print('[ERROR] 无法自动检测网段,请使用 --network 指定(例如 192.168.1.0/24)')
        sys.exit(1)

    print(f"[INFO] 扫描网段: {network}")

    report = scan_network(network)
    report['local_cameras'] = local

    # 输出 JSON 报告
    out_file = f'camera_scan_report_{network.network_address}_{int(time.time())}.json'
    try:
        with open(out_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        print(f"[DONE] 报告已写入: {out_file}")
    except Exception as e:
        print('[WARN] 无法写入报告文件:', e)

    # 简短汇总
    cams = [p['ip'] for p in report['probes'] if p.get('is_camera_guess')]
    print('\n=== 扫描汇总 ===')
    print('本地摄像头数量:', len(local))
    print('发现可疑摄像头主机数:', len(cams))
    for ip in cams:
        print(' -', ip)


if __name__ == '__main__':
    main()

文章作者: 0xdadream
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 0xdadream !
评论
  目录