Python 如何与 Windows API 交互

PythonBeginner
立即练习

简介

本综合教程将指导你使用 Python 的 ctypes 库与 Windows API 交互。你将学习如何访问 Windows 系统功能、管理进程,以及直接从 Python 中利用 Windows 特有的特性。通过完成本实验,你将了解如何构建能够与 Windows 操作系统无缝集成并执行强大系统级操作的应用程序。

理解 Python 的 ctypes 库

在这一步中,我们将探讨 Python 的 ctypes 库的基础知识,它作为与 Windows API 交互的基础。ctypes 库充当 Python 和原生 C 库之间的桥梁,允许我们直接从 DLL(动态链接库,Dynamic Link Libraries)中调用函数。

什么是 ctypes?

ctypes 是一个用于 Python 的外部函数库,它提供 C 兼容的数据类型,并允许调用 DLL 或共享库中的函数。它特别适用于:

  • 访问底层系统函数
  • 与硬件交互
  • 从特定于平台的库中调用函数

安装所需的软件包

在开始编写代码之前,让我们安装必要的软件包。在 WebIDE 中打开一个终端并运行:

pip install pywin32-ctypes

这将安装一个与我们的环境兼容的库。

ctypes 的基本用法

让我们创建一个简单的 Python 脚本来理解 ctypes 的工作原理。在 WebIDE 中,在 /home/labex/project 目录下创建一个名为 ctypes_basics.py 的新文件,内容如下:

import ctypes

## Load a standard C library
libc = ctypes.CDLL('libc.so.6')

## Call a simple C function
print("Random number from C library:", libc.rand())

## Find out the size of int type on this machine
print("Size of int:", ctypes.sizeof(ctypes.c_int), "bytes")

## Create and use a C-compatible string
message = ctypes.create_string_buffer(b"Hello from ctypes!")
print("C-compatible string:", message.value.decode())
print("String buffer size:", ctypes.sizeof(message), "bytes")

使用以下命令运行脚本:

python3 ctypes_basics.py

你应该看到类似这样的输出:

Random number from C library: 1804289383
Size of int: 4 bytes
C-compatible string: Hello from ctypes!
String buffer size: 19 bytes

理解 Python 中的 C 数据类型

ctypes 为 C 数据类型提供了 Python 兼容的包装器。以下是常见 C 数据类型及其 ctypes 等效项的参考表:

C 类型 ctypes 类型 Python 类型
char c_char 1-character bytes object
int c_int int
unsigned int c_uint int
long c_long int
void * c_void_p int or None
char * c_char_p bytes or None
wchar_t * c_wchar_p str or None

让我们通过另一个例子来探索这些数据类型。创建一个名为 ctypes_types.py 的文件:

import ctypes

## Integer types
i = ctypes.c_int(42)
ui = ctypes.c_uint(123)
print(f"Integer: {i.value}, Unsigned Integer: {ui.value}")

## Floating point types
f = ctypes.c_float(3.14)
d = ctypes.c_double(2.71828)
print(f"Float: {f.value}, Double: {d.value}")

## Character and string types
c = ctypes.c_char(b'A')
s = ctypes.c_char_p(b"Hello, World!")
print(f"Character: {c.value.decode()}, String: {s.value.decode()}")

## Create an array of integers
int_array = (ctypes.c_int * 5)(1, 2, 3, 4, 5)
print("Array elements:", [int_array[i] for i in range(5)])

运行脚本:

python3 ctypes_types.py

预期输出:

Integer: 42, Unsigned Integer: 123
Float: 3.140000104904175, Double: 2.71828
Character: A, String: Hello, World!
Array elements: [1, 2, 3, 4, 5]

ctypes 的这种基本理解将帮助我们继续与更复杂的系统库,特别是 Windows API 交互。

使用 ctypes 访问系统库

在这一步中,我们将学习如何使用 ctypes 访问系统库并调用其函数。由于我们在 Linux 环境中工作,我们将专注于 Linux 系统库,同时解释也适用于 Windows API 访问的原则。

加载动态库

在 Python 中,我们可以使用 ctypes 提供的几种方法来加载动态库:

  • CDLL - 用于加载标准 C 库
  • WinDLL - 用于加载 Windows DLL(在 Windows 上)
  • OleDLL - 用于加载 COM 库(在 Windows 上)

让我们创建一个名为 system_info.py 的文件,使用标准库来探索系统信息:

import ctypes
import os
import platform

print(f"Python Platform: {platform.platform()}")
print(f"System: {platform.system()}")
print(f"Machine: {platform.machine()}")
print(f"Processor: {platform.processor()}")

## Load the C standard library
libc = ctypes.CDLL('libc.so.6')

## Get system information
print("\n--- System Information ---")

## Get hostname
hostname_buffer = ctypes.create_string_buffer(1024)
if libc.gethostname(hostname_buffer, ctypes.sizeof(hostname_buffer)) == 0:
    print(f"Hostname: {hostname_buffer.value.decode()}")
else:
    print("Failed to get hostname")

## Get user information
uid = os.getuid()
pwd = libc.getpwuid(uid)
if pwd:
    print(f"Current user ID: {uid}")
else:
    print(f"Current user ID: {uid} (failed to get user info)")

## Memory page size
page_size = libc.getpagesize()
print(f"Memory page size: {page_size} bytes")

## Get system uptime (if available)
try:
    class timespec(ctypes.Structure):
        _fields_ = [
            ('tv_sec', ctypes.c_long),
            ('tv_nsec', ctypes.c_long)
        ]

    time_buf = timespec()
    CLOCK_BOOTTIME = 7  ## Linux specific
    if hasattr(libc, 'clock_gettime') and libc.clock_gettime(CLOCK_BOOTTIME, ctypes.byref(time_buf)) == 0:
        uptime_seconds = time_buf.tv_sec
        days, remainder = divmod(uptime_seconds, 86400)
        hours, remainder = divmod(remainder, 3600)
        minutes, seconds = divmod(remainder, 60)
        print(f"System uptime: {days} days, {hours} hours, {minutes} minutes, {seconds} seconds")
    else:
        print("Could not get system uptime")
except Exception as e:
    print(f"Error getting uptime: {e}")

运行脚本:

python3 system_info.py

你应该看到关于你系统的详细信息,类似于这样:

Python Platform: Linux-5.15.0-1031-aws-x86_64-with-glibc2.35
System: Linux
Machine: x86_64
Processor: x86_64

--- System Information ---
Hostname: labex-container
Current user ID: 1000
Memory page size: 4096 bytes
System uptime: 0 days, 1 hours, 23 minutes, 45 seconds

创建一个进程监视器

现在,让我们创建一个更实用的应用程序:一个简单的进程监视器,它将列出正在运行的进程。创建一个名为 process_monitor.py 的文件:

import ctypes
import os
import time
from datetime import datetime

def list_processes():
    """List all running processes using /proc filesystem"""
    processes = []

    ## On Linux, process information is available in the /proc filesystem
    for pid in os.listdir('/proc'):
        if pid.isdigit():
            try:
                ## Read process name from /proc/[pid]/comm
                with open(f'/proc/{pid}/comm', 'r') as f:
                    name = f.read().strip()

                ## Get process status
                with open(f'/proc/{pid}/status', 'r') as f:
                    status_lines = f.readlines()
                    status = {}
                    for line in status_lines:
                        if ':' in line:
                            key, value = line.split(':', 1)
                            status[key.strip()] = value.strip()

                ## Get memory usage (VmRSS is physical memory used)
                memory_kb = int(status.get('VmRSS', '0 kB').split()[0]) if 'VmRSS' in status else 0

                processes.append({
                    'pid': int(pid),
                    'name': name,
                    'state': status.get('State', 'Unknown'),
                    'memory_kb': memory_kb
                })
            except (IOError, FileNotFoundError, ProcessLookupError):
                ## Process might have terminated while we were reading
                continue

    return processes

def display_processes(processes, top_n=10):
    """Display processes sorted by memory usage"""
    ## Sort processes by memory usage (highest first)
    sorted_processes = sorted(processes, key=lambda p: p['memory_kb'], reverse=True)

    ## Display only top N processes
    print(f"\nTop {top_n} processes by memory usage at {datetime.now().strftime('%H:%M:%S')}:")
    print(f"{'PID':<7} {'NAME':<20} {'STATE':<10} {'MEMORY (KB)':<12}")
    print("-" * 50)

    for proc in sorted_processes[:top_n]:
        print(f"{proc['pid']:<7} {proc['name'][:19]:<20} {proc['state']:<10} {proc['memory_kb']:<12}")

## Monitor processes continuously
print("Simple Process Monitor")
print("Press Ctrl+C to exit")

try:
    while True:
        processes = list_processes()
        display_processes(processes)
        time.sleep(3)  ## Update every 3 seconds
except KeyboardInterrupt:
    print("\nProcess monitoring stopped")

运行进程监视器:

python3 process_monitor.py

你应该看到类似这样的输出,它将每 3 秒刷新一次:

Simple Process Monitor
Press Ctrl+C to exit

Top 10 processes by memory usage at 14:30:25:
PID     NAME                 STATE      MEMORY (KB)
-------------------------------------------------------
1234    python3              S (sleeping) 56789
2345    node                 S (sleeping) 34567
3456    code                 S (sleeping) 23456
...

让进程监视器运行大约 10 秒钟,然后按 Ctrl+C 停止它。

在这些例子中,我们已经学会了如何:

  1. 使用 ctypes 加载和使用系统库
  2. 访问系统信息
  3. 创建一个实用的进程监视工具

这些原则与你通过 ctypes 与 Windows API 交互时使用的原则相同,只是使用了不同的库名称和函数调用。

使用 ctypes 创建和使用 C 结构体

在这一步中,我们将学习如何在 Python 中定义和使用 C 结构体。结构体在与系统 API 交互时至关重要,因为它们通常用于在 Python 和系统库之间交换数据。

理解 Python 中的 C 结构体

C 结构体可以使用 ctypes.Structure 类在 Python 中表示。这允许我们创建与 C 函数期望的内存布局匹配的复杂数据结构。

让我们创建一个名为 struct_example.py 的文件来探索如何使用结构体:

import ctypes

## Define a simple C structure
class Point(ctypes.Structure):
    _fields_ = [
        ("x", ctypes.c_int),
        ("y", ctypes.c_int)
    ]

## Create an instance of the Point structure
p = Point(10, 20)
print(f"Point coordinates: ({p.x}, {p.y})")

## Modify structure fields
p.x = 100
p.y = 200
print(f"Updated coordinates: ({p.x}, {p.y})")

## Create a Point from a dictionary
values = {"x": 30, "y": 40}
p2 = Point(**values)
print(f"Point from dictionary: ({p2.x}, {p2.y})")

## Get the raw memory view (pointer) of the structure
p_ptr = ctypes.pointer(p)
print(f"Memory address of p: {ctypes.addressof(p)}")
print(f"Access via pointer: ({p_ptr.contents.x}, {p_ptr.contents.y})")

## Define a nested structure
class Rectangle(ctypes.Structure):
    _fields_ = [
        ("top_left", Point),
        ("bottom_right", Point),
        ("color", ctypes.c_int)
    ]

## Create a rectangle with two points
rect = Rectangle(Point(0, 0), Point(100, 100), 0xFF0000)
print(f"Rectangle: Top-Left: ({rect.top_left.x}, {rect.top_left.y}), "
      f"Bottom-Right: ({rect.bottom_right.x}, {rect.bottom_right.y}), "
      f"Color: {hex(rect.color)}")

## Calculate the area (demonstrating structure manipulation)
width = rect.bottom_right.x - rect.top_left.x
height = rect.bottom_right.y - rect.top_left.y
print(f"Rectangle area: {width * height}")

运行脚本:

python3 struct_example.py

你应该看到类似这样的输出:

Point coordinates: (10, 20)
Updated coordinates: (100, 200)
Point from dictionary: (30, 40)
Memory address of p: 140737345462208
Access via pointer: (100, 200)
Rectangle: Top-Left: (0, 0), Bottom-Right: (100, 100), Color: 0xff0000
Rectangle area: 10000

创建一个系统时间实用程序

现在,让我们创建一个实用的应用程序,它使用 C 结构体与系统时间函数交互。创建一个名为 time_utility.py 的文件:

import ctypes
import time
from datetime import datetime

## Define the timespec structure (used in Linux for high-resolution time)
class timespec(ctypes.Structure):
    _fields_ = [
        ("tv_sec", ctypes.c_long),
        ("tv_nsec", ctypes.c_long)
    ]

## Define the tm structure (used for calendar time representation)
class tm(ctypes.Structure):
    _fields_ = [
        ("tm_sec", ctypes.c_int),     ## seconds (0 - 60)
        ("tm_min", ctypes.c_int),     ## minutes (0 - 59)
        ("tm_hour", ctypes.c_int),    ## hours (0 - 23)
        ("tm_mday", ctypes.c_int),    ## day of month (1 - 31)
        ("tm_mon", ctypes.c_int),     ## month of year (0 - 11)
        ("tm_year", ctypes.c_int),    ## year - 1900
        ("tm_wday", ctypes.c_int),    ## day of week (0 - 6, Sunday = 0)
        ("tm_yday", ctypes.c_int),    ## day of year (0 - 365)
        ("tm_isdst", ctypes.c_int)    ## is daylight saving time in effect
    ]

## Load the C library
libc = ctypes.CDLL("libc.so.6")

def get_system_time():
    """Get the current system time using C functions"""
    ## Get current time as seconds since epoch
    time_t_ptr = ctypes.pointer(ctypes.c_long())
    libc.time(time_t_ptr)  ## time() function gets current time

    ## Convert to printable time string
    time_val = time_t_ptr.contents.value
    time_str = ctypes.string_at(libc.ctime(time_t_ptr))

    return {
        "timestamp": time_val,
        "formatted_time": time_str.decode().strip()
    }

def get_high_resolution_time():
    """Get high resolution time using clock_gettime"""
    ts = timespec()

    ## CLOCK_REALTIME is usually 0
    CLOCK_REALTIME = 0

    ## Call clock_gettime to fill the timespec structure
    if libc.clock_gettime(CLOCK_REALTIME, ctypes.byref(ts)) != 0:
        raise OSError("Failed to get time")

    return {
        "seconds": ts.tv_sec,
        "nanoseconds": ts.tv_nsec,
        "precise_time": ts.tv_sec + (ts.tv_nsec / 1_000_000_000)
    }

def time_breakdown():
    """Break down the current time into its components using localtime"""
    ## Get current time
    time_t_ptr = ctypes.pointer(ctypes.c_long())
    libc.time(time_t_ptr)

    ## Get local time
    tm_ptr = libc.localtime(time_t_ptr)
    tm_struct = ctypes.cast(tm_ptr, ctypes.POINTER(tm)).contents

    ## Return time components
    return {
        "year": 1900 + tm_struct.tm_year,
        "month": 1 + tm_struct.tm_mon,  ## tm_mon is 0-11, we adjust to 1-12
        "day": tm_struct.tm_mday,
        "hour": tm_struct.tm_hour,
        "minute": tm_struct.tm_min,
        "second": tm_struct.tm_sec,
        "weekday": tm_struct.tm_wday,  ## 0 is Sunday
        "yearday": tm_struct.tm_yday + 1  ## tm_yday is 0-365, we adjust to 1-366
    }

## Main program
print("Time Utility using C Structures")
print("-" * 40)

## Get and display system time
sys_time = get_system_time()
print(f"System time: {sys_time['formatted_time']}")
print(f"Timestamp (seconds since epoch): {sys_time['timestamp']}")

## Get and display high-resolution time
hi_res = get_high_resolution_time()
print(f"\nHigh resolution time:")
print(f"Seconds: {hi_res['seconds']}")
print(f"Nanoseconds: {hi_res['nanoseconds']}")
print(f"Precise time: {hi_res['precise_time']}")

## Get and display time breakdown
components = time_breakdown()
print(f"\nTime breakdown:")
print(f"Date: {components['year']}-{components['month']:02d}-{components['day']:02d}")
print(f"Time: {components['hour']:02d}:{components['minute']:02d}:{components['second']:02d}")
print(f"Day of week: {components['weekday']} (0=Sunday)")
print(f"Day of year: {components['yearday']}")

## Compare with Python's datetime
now = datetime.now()
print(f"\nPython datetime: {now}")
print(f"Python timestamp: {time.time()}")

运行时间实用程序:

python3 time_utility.py

你应该看到类似这样的输出:

Time Utility using C Structures
----------------------------------------
System time: Wed Jun 14 15:22:36 2023
Timestamp (seconds since epoch): 1686756156

High resolution time:
Seconds: 1686756156
Nanoseconds: 923456789
Precise time: 1686756156.923457

Time breakdown:
Date: 2023-06-14
Time: 15:22:36
Day of week: 3 (0=Sunday)
Day of year: 165

Python datetime: 2023-06-14 15:22:36.923499
Python timestamp: 1686756156.9234989

使用 ctypes 理解内存管理

在使用 ctypes 和 C 结构体时,理解内存管理非常重要。让我们创建另一个例子 memory_management.py 来演示这一点:

import ctypes
import gc  ## Garbage collector module

## Define a simple structure
class MyStruct(ctypes.Structure):
    _fields_ = [
        ("id", ctypes.c_int),
        ("value", ctypes.c_double),
        ("name", ctypes.c_char * 32)  ## Fixed-size character array
    ]

def demonstrate_memory_management():
    print("Memory Management with ctypes")
    print("-" * 40)

    ## Create a structure instance
    my_data = MyStruct(
        id=1,
        value=3.14159,
        name=b"Example"
    )

    print(f"Structure size: {ctypes.sizeof(my_data)} bytes")
    print(f"Memory address: {hex(ctypes.addressof(my_data))}")

    ## Create a pointer to the structure
    data_ptr = ctypes.pointer(my_data)
    print(f"Pointer value: {hex(ctypes.cast(data_ptr, ctypes.c_void_p).value)}")

    ## Access through pointer
    print(f"Access via pointer: id={data_ptr.contents.id}, value={data_ptr.contents.value}")

    ## Allocate memory for a new structure
    new_struct_ptr = ctypes.POINTER(MyStruct)()
    new_struct_ptr = ctypes.cast(
        ctypes.create_string_buffer(ctypes.sizeof(MyStruct)),
        ctypes.POINTER(MyStruct)
    )

    ## Initialize the allocated memory
    new_struct = new_struct_ptr.contents
    new_struct.id = 2
    new_struct.value = 2.71828
    new_struct.name = b"Allocated"

    print(f"\nAllocated structure memory address: {hex(ctypes.addressof(new_struct))}")
    print(f"Allocated structure content: id={new_struct.id}, value={new_struct.value}, name={new_struct.name.decode()}")

    ## Create an array of structures
    StructArray = MyStruct * 3
    struct_array = StructArray(
        MyStruct(10, 1.1, b"First"),
        MyStruct(20, 2.2, b"Second"),
        MyStruct(30, 3.3, b"Third")
    )

    print("\nArray of structures:")
    for i, item in enumerate(struct_array):
        print(f"  [{i}] id={item.id}, value={item.value}, name={item.name.decode()}")

    ## Force garbage collection
    print("\nForcing garbage collection...")
    gc.collect()

    ## Memory is automatically managed by Python

## Run the demonstration
demonstrate_memory_management()

运行内存管理脚本:

python3 memory_management.py

你应该看到类似这样的输出:

Memory Management with ctypes
----------------------------------------
Structure size: 48 bytes
Memory address: 0x7f3c2e32b040
Pointer value: 0x7f3c2e32b040
Access via pointer: id=1, value=3.14159

Allocated structure memory address: 0x7f3c2e32b0a0
Allocated structure content: id=2, value=2.71828, name=Allocated

Array of structures:
  [0] id=10, value=1.1, name=First
  [1] id=20, value=2.2, name=Second
  [2] id=30, value=3.3, name=Third

Forcing garbage collection...

在这些例子中,我们已经学会了如何:

  1. 使用 ctypes 在 Python 中定义和使用 C 结构体
  2. 创建指向结构体的指针并访问其内容
  3. 创建嵌套结构体和结构体数组
  4. 使用系统时间函数构建实用的应用程序
  5. 在使用 ctypes 时管理内存

这些技能在处理 Windows API 函数时至关重要,因为这些函数经常需要复杂的数据结构来传递信息。

构建一个完整的系统监视器应用程序

在最后一步中,我们将把我们所学的所有知识结合起来,构建一个全面的系统监视器应用程序。此应用程序将使用 ctypes 收集系统信息,显示实时指标,并演示如何构建一个与系统库交互的更大的 Python 应用程序。

创建系统监视器

创建一个名为 system_monitor.py 的新文件,内容如下:

import ctypes
import os
import time
import platform
from datetime import datetime

class SystemMonitor:
    """使用 ctypes 的系统监视应用程序"""

    def __init__(self):
        """初始化系统监视器"""
        self.libc = ctypes.CDLL("libc.so.6")
        self.running = False

        ## Define a timespec structure for time-related functions
        class timespec(ctypes.Structure):
            _fields_ = [
                ("tv_sec", ctypes.c_long),
                ("tv_nsec", ctypes.c_long)
            ]
        self.timespec = timespec

        ## Print basic system information
        print(f"System Monitor for {platform.system()} {platform.release()}")
        print(f"Python Version: {platform.python_version()}")
        print(f"Machine: {platform.machine()}")
        print("-" * 50)

    def get_uptime(self):
        """获取系统运行时间信息"""
        try:
            CLOCK_BOOTTIME = 7  ## Linux specific
            time_buf = self.timespec()

            if hasattr(self.libc, 'clock_gettime') and self.libc.clock_gettime(CLOCK_BOOTTIME, ctypes.byref(time_buf)) == 0:
                uptime_seconds = time_buf.tv_sec
                days, remainder = divmod(uptime_seconds, 86400)
                hours, remainder = divmod(remainder, 3600)
                minutes, seconds = divmod(remainder, 60)
                return {
                    "total_seconds": uptime_seconds,
                    "days": days,
                    "hours": hours,
                    "minutes": minutes,
                    "seconds": seconds
                }
            return None
        except Exception as e:
            print(f"Error getting uptime: {e}")
            return None

    def get_memory_info(self):
        """使用 /proc/meminfo 获取内存使用信息"""
        memory_info = {}
        try:
            with open('/proc/meminfo', 'r') as f:
                for line in f:
                    if ':' in line:
                        key, value = line.split(':', 1)
                        ## Remove 'kB' and convert to integer
                        value = value.strip()
                        if 'kB' in value:
                            value = int(value.split()[0]) * 1024  ## Convert to bytes
                        memory_info[key.strip()] = value

            ## Calculate memory usage percentage
            if 'MemTotal' in memory_info and 'MemAvailable' in memory_info:
                total = int(memory_info['MemTotal'])
                available = int(memory_info['MemAvailable'])
                used = total - available
                memory_info['UsedPercentage'] = (used / total) * 100

            return memory_info
        except Exception as e:
            print(f"Error getting memory info: {e}")
            return {}

    def get_cpu_info(self):
        """使用 /proc/stat 获取 CPU 信息"""
        cpu_info = {'cpu_percent': 0}
        try:
            ## We need two readings to calculate CPU usage
            def get_cpu_sample():
                with open('/proc/stat', 'r') as f:
                    line = f.readline()
                cpu_values = [int(x) for x in line.split()[1:8]]
                return sum(cpu_values), cpu_values[3]  ## Return total and idle

            ## First sample
            total1, idle1 = get_cpu_sample()
            time.sleep(0.5)  ## Wait for 0.5 second

            ## Second sample
            total2, idle2 = get_cpu_sample()

            ## Calculate CPU usage
            total_delta = total2 - total1
            idle_delta = idle2 - idle1

            if total_delta > 0:
                cpu_info['cpu_percent'] = 100 * (1 - idle_delta / total_delta)

            return cpu_info
        except Exception as e:
            print(f"Error getting CPU info: {e}")
            return cpu_info

    def get_disk_info(self):
        """获取磁盘使用信息"""
        try:
            ## Get disk usage for the root filesystem
            stat = os.statvfs('/')

            ## Calculate total, free, and used space
            total = stat.f_blocks * stat.f_frsize
            free = stat.f_bfree * stat.f_frsize
            used = total - free

            return {
                'total_bytes': total,
                'free_bytes': free,
                'used_bytes': used,
                'used_percent': (used / total) * 100 if total > 0 else 0
            }
        except Exception as e:
            print(f"Error getting disk info: {e}")
            return {}

    def get_process_count(self):
        """计算正在运行的进程数"""
        try:
            return len([p for p in os.listdir('/proc') if p.isdigit()])
        except Exception as e:
            print(f"Error counting processes: {e}")
            return 0

    def format_bytes(self, bytes_value):
        """将字节格式化为人类可读的格式"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_value < 1024 or unit == 'TB':
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024

    def display_dashboard(self):
        """显示系统监视器仪表板"""
        ## Clear the screen (works in most terminals)
        print("\033c", end="")

        ## Display header
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"System Monitor - {current_time}")
        print("-" * 50)

        ## Display uptime
        uptime = self.get_uptime()
        if uptime:
            print(f"Uptime: {uptime['days']} days, {uptime['hours']} hours, "
                  f"{uptime['minutes']} minutes, {uptime['seconds']} seconds")

        ## Display CPU usage
        cpu_info = self.get_cpu_info()
        print(f"CPU Usage: {cpu_info['cpu_percent']:.1f}%")

        ## Display memory information
        memory_info = self.get_memory_info()
        if memory_info and 'MemTotal' in memory_info:
            print("\nMemory Information:")
            total = int(memory_info['MemTotal'])
            available = int(memory_info.get('MemAvailable', 0))
            used = total - available
            print(f"  Total: {self.format_bytes(total)}")
            print(f"  Used: {self.format_bytes(used)} ({memory_info.get('UsedPercentage', 0):.1f}%)")
            print(f"  Available: {self.format_bytes(available)}")
            if 'SwapTotal' in memory_info:
                swap_total = int(memory_info['SwapTotal'])
                swap_free = int(memory_info.get('SwapFree', 0))
                swap_used = swap_total - swap_free
                if swap_total > 0:
                    print(f"  Swap Used: {self.format_bytes(swap_used)} "
                          f"({(swap_used / swap_total) * 100:.1f}%)")

        ## Display disk information
        disk_info = self.get_disk_info()
        if disk_info:
            print("\nDisk Information (/):")
            print(f"  Total: {self.format_bytes(disk_info['total_bytes'])}")
            print(f"  Used: {self.format_bytes(disk_info['used_bytes'])} "
                  f"({disk_info['used_percent']:.1f}%)")
            print(f"  Free: {self.format_bytes(disk_info['free_bytes'])}")

        ## Display process count
        process_count = self.get_process_count()
        print(f"\nRunning Processes: {process_count}")

        print("\nPress Ctrl+C to exit")

    def start_monitoring(self, interval=2):
        """使用指定的刷新间隔启动系统监视"""
        self.running = True
        try:
            while self.running:
                self.display_dashboard()
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\nMonitoring stopped.")
            self.running = False

## Create and start the system monitor
if __name__ == "__main__":
    monitor = SystemMonitor()
    monitor.start_monitoring()

运行系统监视器:

python3 system_monitor.py

系统监视器将显示关于你系统的实时信息,每 2 秒更新一次。你应该看到一个仪表板,其中包含:

  • 系统运行时间
  • CPU 使用率
  • 内存信息(总计、已用、可用)
  • 磁盘使用情况
  • 进程计数

让监视器运行几分钟,观察指标如何变化,然后按 Ctrl+C 退出。

理解系统监视器结构

让我们分解一下系统监视器的关键组件:

  1. 类结构:使用面向对象编程来组织我们的代码
  2. 方法组织:将功能分离到不同的方法中
  3. 错误处理:使用 try-except 块来处理潜在的错误
  4. 数据格式化:将原始数据转换为人类可读的格式
  5. 实时更新:使用带有 time.sleep() 的循环进行定期更新

添加文档

现在,让我们为我们的系统监视器添加一个文档文件。创建一个名为 README.md 的文件:

## Python 系统监视器

一个使用 Python 构建的综合系统监视应用程序,使用 `ctypes` 进行系统交互。

### 功能

- 实时 CPU 使用率监视
- 内存使用情况跟踪
- 磁盘空间分析
- 进程计数
- 系统运行时间显示

### 要求

- Python 3.6 或更高版本
- Linux 操作系统

### 如何运行

只需执行主脚本:

```bash
python system_monitor.py
### 工作原理

此应用程序使用 Python 的 `ctypes` 库与系统库交互并访问低级系统信息。它还使用 `/proc` 文件系统来收集有关系统状态的其他指标。

监视以实时方式进行,每 2 秒更新一次(可配置)。

### 架构

该应用程序遵循面向对象的方法,具有以下关键组件:

1.  **SystemMonitor 类**:协调监视的主类
2.  **数据收集方法**:用于收集不同系统指标的方法
3.  **显示方法**:用于格式化和显示收集的数据的方法

### 扩展监视器

要添加新的监视功能:

1.  在 `SystemMonitor` 类中创建一个新方法来收集所需的数据
2.  更新 `display_dashboard` 方法以显示新信息
3.  确保适当的错误处理以确保稳健性

### 学习资源

-   [Python ctypes 文档](https://docs.python.org/3/library/ctypes.html)
-   [Linux Proc 文件系统文档](https://man7.org/linux/man-pages/man5/proc.5.html)

### 审查和总结

让我们回顾一下我们在本实验中完成的工作:

1.  我们学习了如何使用 Python 的 `ctypes` 库与系统库交互
2.  我们探索了 Python 中的 C 数据类型和结构体
3.  我们创建了几个实用的应用程序:
    -   基本系统信息检索
    -   进程监视
    -   时间实用程序
    -   内存管理演示
    -   综合系统监视器

4.  我们学习了如何:
    -   加载动态库
    -   从 Python 调用 C 函数
    -   定义和操作 C 结构体
    -   使用指针和内存地址
    -   处理系统级错误

这些技能构成了在 Windows 系统上开发时使用 Windows API 的基础。原则保持不变——你将加载 Windows 专用库(如 kernel32.dll、user32.dll 等)而不是 libc,但定义结构体、调用函数和处理数据的方法保持一致。

总结

在这个实验中,你已经学会了如何使用 Python 的 ctypes 库与系统库交互并执行系统级操作。虽然该实验是在 Linux 环境中进行的,但你所学到的原理和技术也直接适用于 Windows API 编程。

本实验的主要收获:

  1. 理解 ctypes 基础:你已经学会了如何加载动态库,定义 C 数据类型,以及从 Python 调用系统函数。

  2. 使用 C 结构体:你已经掌握了在 Python 中创建和操作 C 结构体,这对于与系统库交换复杂数据至关重要。

  3. 内存管理:你已经深入了解了在使用系统库时,内存分配、指针和内存管理。

  4. 构建实用应用程序:你已经应用你的知识来创建有用的应用程序,最终完成了一个全面的系统监视器。

  5. 系统集成:你已经看到了 Python 如何通过 ctypes 桥接与底层系统功能集成。

这些技能为开发需要在底层与操作系统交互的应用程序提供了坚实的基础,无论是在 Linux 还是 Windows 上。在 Windows 系统上工作时,你将使用相同的方法,但使用 Windows 专用的库和 API 函数。

进一步的学习可以包括使用你在本实验中学习的技术,探索 Windows 专用的 API,用于窗口管理、图形界面、系统服务或网络功能。