Мониторинг трафика сетевого интерфейса
Теперь, когда мы понимаем, как получать информацию о сетевом интерфейсе, давайте сделаем еще один шаг и создадим скрипт для мониторинга сетевого трафика в реальном времени. Это позволит нам увидеть, сколько данных отправляется и принимается через конкретный интерфейс.
Создание скрипта мониторинга сетевого трафика
Создайте новый файл с именем monitor_traffic.py в редакторе VSCode:
- Нажмите кнопку "New File" в области Explorer
- Назовите файл
monitor_traffic.py
- Добавьте следующий код:
import psutil
import time
import sys
import netifaces
def bytes_to_readable(bytes_count):
"""Convert bytes to a human-readable format (KB, MB, GB)"""
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
converted_value = float(bytes_count)
while converted_value >= 1024 and unit_index < len(units) - 1:
converted_value /= 1024
unit_index += 1
return f"{converted_value:.2f} {units[unit_index]}"
def monitor_network_traffic(interface_name, interval=1, iterations=10):
"""
Monitor network traffic on a specific interface for a given number of iterations.
Args:
interface_name (str): Name of the network interface to monitor
interval (int): Time interval between measurements in seconds
iterations (int): Number of measurements to take
"""
## Verify interface exists
if interface_name not in netifaces.interfaces():
print(f"Error: Interface '{interface_name}' does not exist.")
print(f"Available interfaces: {', '.join(netifaces.interfaces())}")
return
print(f"Monitoring network traffic on {interface_name} (Ctrl+C to stop)")
print("-" * 80)
print(f"{'Time':<12} {'Bytes Sent':<15} {'Bytes Received':<15} {'Packets Sent':<15} {'Packets Received':<15}")
print("-" * 80)
try:
## Get initial stats
net_io_counters = psutil.net_io_counters(pernic=True)
if interface_name not in net_io_counters:
print(f"Error: Cannot get stats for interface '{interface_name}'")
return
prev_stats = net_io_counters[interface_name]
prev_bytes_sent = prev_stats.bytes_sent
prev_bytes_recv = prev_stats.bytes_recv
prev_packets_sent = prev_stats.packets_sent
prev_packets_recv = prev_stats.packets_recv
## Monitor for the specified number of iterations
for i in range(iterations):
## Wait for the specified interval
time.sleep(interval)
## Get new stats
net_io_counters = psutil.net_io_counters(pernic=True)
new_stats = net_io_counters[interface_name]
## Calculate differences
bytes_sent_diff = new_stats.bytes_sent - prev_bytes_sent
bytes_recv_diff = new_stats.bytes_recv - prev_bytes_recv
packets_sent_diff = new_stats.packets_sent - prev_packets_sent
packets_recv_diff = new_stats.packets_recv - prev_packets_recv
## Update previous values
prev_bytes_sent = new_stats.bytes_sent
prev_bytes_recv = new_stats.bytes_recv
prev_packets_sent = new_stats.packets_sent
prev_packets_recv = new_stats.packets_recv
## Print the current stats
current_time = time.strftime("%H:%M:%S")
print(f"{current_time:<12} {bytes_to_readable(bytes_sent_diff):<15} "
f"{bytes_to_readable(bytes_recv_diff):<15} {packets_sent_diff:<15} "
f"{packets_recv_diff:<15}")
except KeyboardInterrupt:
print("\nMonitoring stopped by user")
except Exception as e:
print(f"Error during monitoring: {e}")
print("\nTraffic summary for this session:")
print(f"Total monitored time: {iterations * interval} seconds")
if __name__ == "__main__":
## Get the interface name from command line arguments or use a default
if len(sys.argv) > 1:
interface_name = sys.argv[1]
else:
## Try to find a non-loopback interface
interfaces = [i for i in netifaces.interfaces() if i != 'lo']
if interfaces:
interface_name = interfaces[0]
print(f"No interface specified, using {interface_name}")
else:
print("No network interfaces available")
sys.exit(1)
## Get number of iterations from command line (default: 10)
iterations = 10
if len(sys.argv) > 2:
try:
iterations = int(sys.argv[2])
except ValueError:
print(f"Invalid iterations value: {sys.argv[2]}. Using default: 10")
## Monitor the network traffic
monitor_network_traffic(interface_name, iterations=iterations)
Сохраните файл, нажав Ctrl+S.
Запуск скрипта мониторинга трафика
Теперь давайте запустим скрипт для мониторинга сетевого трафика на конкретном интерфейсе:
python3 monitor_traffic.py eth0
При необходимости замените eth0 на соответствующее имя интерфейса в вашей системе.
Скрипт отобразит таблицу, показывающую объем отправленных и полученных данных через сетевой интерфейс, обновляющуюся раз в секунду в течение 10 итераций:
Monitoring network traffic on eth0 (Ctrl+C to stop)
--------------------------------------------------------------------------------
Time Bytes Sent Bytes Received Packets Sent Packets Received
--------------------------------------------------------------------------------
14:25:30 0.00 B 156.00 B 0 2
14:25:31 0.00 B 0.00 B 0 0
14:25:32 0.00 B 0.00 B 0 0
14:25:33 456.00 B 1.24 KB 3 5
14:25:34 0.00 B 0.00 B 0 0
14:25:35 0.00 B 0.00 B 0 0
14:25:36 66.00 B 102.00 B 1 1
14:25:37 0.00 B 0.00 B 0 0
14:25:38 0.00 B 0.00 B 0 0
14:25:39 0.00 B 0.00 B 0 0
Traffic summary for this session:
Total monitored time: 10 seconds
Вы также можете указать количество итераций в качестве второго аргумента командной строки:
python3 monitor_traffic.py eth0 5
Это будет отслеживать трафик в течение 5 итераций вместо 10 по умолчанию.
Генерация сетевого трафика
Чтобы увидеть скрипт мониторинга в действии с фактическим трафиком, мы можем открыть второй терминал и сгенерировать некоторый сетевой трафик. Давайте создадим простой скрипт для этого:
Создайте новый файл с именем generate_traffic.py:
import requests
import time
import sys
def generate_traffic(iterations=5, delay=1):
"""Generate some network traffic by making HTTP requests"""
print(f"Generating network traffic over {iterations} iterations...")
for i in range(iterations):
try:
## Make a GET request to a public API
response = requests.get("https://httpbin.org/get")
print(f"Request {i+1}/{iterations}: Status {response.status_code}, "
f"Size {len(response.content)} bytes")
## Wait before the next request
if i < iterations - 1:
time.sleep(delay)
except Exception as e:
print(f"Error making request: {e}")
print("Traffic generation complete")
if __name__ == "__main__":
## Get number of iterations from command line (default: 5)
iterations = 5
if len(sys.argv) > 1:
try:
iterations = int(sys.argv[1])
except ValueError:
print(f"Invalid iterations value: {sys.argv[1]}. Using default: 5")
generate_traffic(iterations)
Сначала вам нужно установить библиотеку requests:
pip install requests
Теперь запустите скрипт мониторинга в одном терминале:
python3 monitor_traffic.py eth0 15
А в отдельном терминале запустите скрипт генерации трафика:
python3 generate_traffic.py
Теперь вы должны увидеть сетевую активность, зафиксированную скриптом мониторинга, когда скрипт генерации трафика выполняет HTTP-запросы.
Этот пример демонстрирует, как можно использовать Python для мониторинга активности сетевого интерфейса в реальном времени, что полезно для диагностики проблем с сетью, мониторинга использования полосы пропускания и понимания шаблонов сетевого трафика.