介绍
在本项目中,我们将使用 Python 开发一个服务器端口扫描器,用于检测目标服务器上的开放端口。该工具对于系统管理员(用于验证安全策略)和潜在攻击者(用于识别主机上运行的网络服务)都至关重要。我们将涵盖端口扫描的核心环节,包括其工作原理和实际应用。我们将深入探讨如何利用 Python 的强大功能创建一个简单且高效的端口扫描器,并重点通过多线程方法来提升扫描效率和性能。
端口扫描是指向一系列服务器端口发送请求,以确定哪些端口处于开放状态的过程。这一步对于维护安全以及攻击者寻找易受攻击的服务都非常关键。我们将从探索端口扫描的基础知识及其影响开始。
核心概念:
- 端口扫描器有助于检测服务器或主机上的开放端口。
- 它们被用于安全评估,也被攻击者用来识别易受攻击的服务。
- 端口扫描最简单的形式是尝试对一系列端口进行 TCP 连接。
👀 预览
这是我们将要构建的端口扫描器工具的预览:
python port_scanner.py 127.0.0.1 5000-9000
输出结果:
Opened Port: 8081
Scanning completed.
🎯 任务
在本项目中,你将学习:
- 如何利用 Python 的套接字(socket)编程能力与网络端口进行交互。
- 如何在 Python 中实现多线程方法,以提高网络扫描任务的效率。
- 如何开发一个 Python 命令行工具,通过接收用户输入实现灵活的端口扫描。
🏆 成就
完成本项目后,你将能够:
- 使用 Python 的
socket库创建网络连接、测试端口可用性并处理网络异常。 - 理解并应用 Python 中的多线程技术来执行并发任务,显著提升网络密集型操作的性能。
- 构建一个实用的命令行端口扫描工具,增强你的 Python 脚本编写能力以及对命令行参数解析的理解。
建立 TCP 连接测试
我们的第一步是编写一个函数,用于测试目标 IP 上的某个 TCP 端口是否开放。该函数将尝试连接到指定的端口并确定其状态。
打开 /home/labex/project 目录下名为 port_scanner.py 的文件,并添加以下 Python 函数:
import argparse
from queue import Queue
from socket import AF_INET, gethostbyname, socket, SOCK_STREAM
import threading
def tcp_test(port: int, target_ip: str) -> None:
with socket(AF_INET, SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((target_ip, port))
if result == 0:
print(f"Opened Port: {port}")
这段代码定义了一个名为 tcp_test 的函数,它尝试与目标 IP 地址上的指定端口建立 TCP 连接。它设置了超时时间以避免在每个端口上等待过久,并使用 connect_ex 来尝试连接。
如果 connect_ex 返回 0,则表示连接成功,意味着该端口是开放的,随后函数会打印一条消息显示该开放端口。
设置并发端口检查
接下来,我们将创建一个 worker 函数。每个线程都会使用该函数从队列中获取端口,并利用 tcp_test 函数检查每个端口的状态。
将以下代码添加到你的 port_scanner.py 中:
def worker(target_ip: str, queue: Queue) -> None:
while not queue.empty():
port = queue.get()
tcp_test(port, target_ip)
queue.task_done()
worker 函数旨在由多个线程运行。它会不断从队列中检索端口号,并使用之前定义的 tcp_test 函数检查目标 IP 上的这些端口是否开放。
测试完一个端口后,它会在队列中将该任务标记为已完成。此函数允许并发扫描端口,从而显著加快扫描进程。
编排扫描流程
main 函数通过设置队列、初始化工作线程以及管理指定端口范围内的扫描操作来编排整个扫描流程。
继续将 main 函数添加到你的 port_scanner.py 中:
def main(host: str, start_port: int, end_port: int) -> None:
target_ip = gethostbyname(host)
queue = Queue()
for port in range(start_port, end_port + 1):
queue.put(port)
for _ in range(100): ## Adjust the number of threads if necessary
t = threading.Thread(target=worker, args=(target_ip, queue,))
t.daemon = True
t.start()
queue.join()
print("Scanning completed.")
main 函数负责协调整个端口扫描过程。
它首先将目标主机名解析为 IP 地址,然后为待扫描的端口号创建一个队列。它将指定范围内的所有端口填充到队列中,并生成多个工作线程(本例中为 100 个),这些线程通过检查每个端口来并发处理队列。函数会等待所有端口处理完毕(queue.join()),然后打印一条表示扫描完成的消息。
运行端口扫描器
最后,是时候运行我们的端口扫描器了。我们将在本地主机(localhost)上扫描一系列端口来测试我们的工具。
在 port_scanner.py 的末尾添加以下几行代码,使脚本可以执行:
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='TCP Port Scanner')
parser.add_argument('host', help='Host to scan')
parser.add_argument('ports', help='Port range to scan, formatted as start-end')
args = parser.parse_args()
start_port, end_port = map(int, args.ports.split('-'))
main(args.host, start_port, end_port)
这段代码为端口扫描脚本添加了命令行参数解析功能,使其更加通用且易于使用。
通过使用 argparse 库,它定义了两个必需参数:host 和 ports。
host参数用于指定你想要扫描的目标主机地址或主机名。ports参数需要一个定义扫描端口范围的字符串,格式为“起始 - 结束”(例如 "5000-8000")。脚本使用-分隔符将其拆分为start_port和end_port,然后转换为整数。这使得用户在从命令行运行脚本时可以轻松指定目标和端口范围。
现在,在终端中运行以下命令来执行扫描器:
python port_scanner.py 127.0.0.1 5000-9000
输出结果:
Opened Port: 8081
Scanning completed.
既然你已经使用端口扫描器识别出了一个开放端口(8081),说明那里运行着一个 Web 服务器,你可以探索一下那里托管的内容。
为此,首先点击添加按钮并选择「Web Service」选项。

然后输入端口 8081 并点击「Access」按钮。

你会发现 Web 服务器正在你的系统中运行,并监听 8081 端口。

总结
在本项目中,我们使用 Python 构建了一个基础的端口扫描器,从最基本的 tcp_test 函数开始,逐步演进到多线程扫描方法。我们将整个过程分解为易于管理的步骤,最终完成了一个能够识别目标服务器开放端口的功能性工具。通过这次实践,我们不仅加深了对网络安全的理解,还磨练了 Python 编程和并发管理的技能。



