SQLite JSON 处理

SQLiteSQLiteBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

介绍

在这个实验中,你将学习如何在 SQLite 中处理 JSON 数据。你将探索如何在 SQLite 数据库中存储、提取、过滤和更新 JSON 数据。这个实验提供了一个关于在 SQLite 中使用 JSON 数据的实践性介绍,这项技能在现代数据管理中变得越来越有价值。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL sqlite(("SQLite")) -.-> sqlite/SQLiteGroup(["SQLite"]) sqlite/SQLiteGroup -.-> sqlite/make_table("Create New Table") sqlite/SQLiteGroup -.-> sqlite/add_rows("Insert Multiple Rows") sqlite/SQLiteGroup -.-> sqlite/get_all("Select All Rows") sqlite/SQLiteGroup -.-> sqlite/query_where("Filter With WHERE") sqlite/SQLiteGroup -.-> sqlite/edit_row("Update Single Row") sqlite/SQLiteGroup -.-> sqlite/check_version("Get SQLite Version") subgraph Lab Skills sqlite/make_table -.-> lab-552553{{"SQLite JSON 处理"}} sqlite/add_rows -.-> lab-552553{{"SQLite JSON 处理"}} sqlite/get_all -.-> lab-552553{{"SQLite JSON 处理"}} sqlite/query_where -.-> lab-552553{{"SQLite JSON 处理"}} sqlite/edit_row -.-> lab-552553{{"SQLite JSON 处理"}} sqlite/check_version -.-> lab-552553{{"SQLite JSON 处理"}} end

创建数据库和表

在这个步骤中,你将创建一个 SQLite 数据库和一个用于存储 JSON 数据的表。SQLite 是一个轻量级数据库,它将数据存储在单个文件中,从而易于管理。

首先,打开你的终端。默认路径是 /home/labex/project

现在,让我们创建一个目录来存储我们的数据库。

mkdir sqlite_json
cd sqlite_json

这些命令创建了一个名为 sqlite_json 的目录,然后将当前目录更改为它。这将使你的项目文件保持井井有条。

接下来,创建一个名为 mydatabase.db 的 SQLite 数据库。

sqlite3 mydatabase.db

这个命令打开 SQLite shell,连接到 mydatabase.db 数据库。如果数据库文件不存在,SQLite 将创建它。

现在,创建一个名为 products 的表,其中包含 idnamedetails 列。details 列将以文本形式存储 JSON 数据。

CREATE TABLE products (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT,
    details TEXT
);

这个 SQL 命令创建了 products 表:

  • id:一个唯一的整数,对于每个新产品自动递增。
  • name:产品的名称(例如,Laptop,Smartphone)。
  • details:一个文本字段,用于存储产品的 JSON 数据。

插入 JSON 数据

在这个步骤中,你将把 JSON 数据插入到 products 表中。

让我们将两个示例记录插入到 products 表中。

INSERT INTO products (name, details) VALUES (
    'Laptop',
    '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}'
);

INSERT INTO products (name, details) VALUES (
    'Smartphone',
    '{"brand": "Samsung", "model": "Galaxy S21", "specs": {"display": "6.2 inch", "camera": "12MP", "storage": "128GB"}}'
);

这些 INSERT 语句向 products 表添加了两行数据。details 列包含作为文本字符串的 JSON 数据。

为了验证数据是否已正确插入,请运行以下查询:

SELECT * FROM products;

预期输出:

1|Laptop|{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}
2|Smartphone|{"brand": "Samsung", "model": "Galaxy S21", "specs": {"display": "6.2 inch", "camera": "12MP", "storage": "128GB"}}

此输出确认 JSON 数据已成功存储在 products 表中。

使用自定义函数提取 JSON 字段

由于 SQLite 没有内置的 JSON 函数,你将创建一个自定义 Python 函数来从 JSON 字符串中提取数据。

首先,退出 SQLite shell。

.exit

现在,创建一个名为 json_extractor.py 的 Python 文件。

nano json_extractor.py

将以下 Python 代码粘贴到 json_extractor.py 文件中:

import sqlite3
import json

def json_extract(json_str, path):
    try:
        json_data = json.loads(json_str)
        path_components = path.split('.')
        value = json_data
        for component in path_components:
            value = value.get(component)
        return value
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

def connect_db(db_path):
    conn = sqlite3.connect(db_path)
    conn.create_function("json_extract", 2, json_extract)
    return conn

if __name__ == '__main__':
    conn = connect_db('mydatabase.db')
    cursor = conn.cursor()

    cursor.execute("SELECT json_extract(details, 'brand') FROM products WHERE name = 'Laptop'")
    print(cursor.fetchone()[0])

    cursor.execute("SELECT json_extract(details, 'specs.cpu') FROM products WHERE name = 'Laptop'")
    print(cursor.fetchone()[0])

    conn.close()

这段 Python 代码定义了一个函数 json_extract,它接受一个 JSON 字符串和一个路径作为输入,并返回该路径上的值。它还包括一个 connect_db 函数,用于连接到 SQLite 数据库并注册 json_extract 函数。

  • json.loads(json_str):这行代码将 JSON 字符串解析为 Python 字典。
  • path.split('.'):这行代码将路径拆分为组件列表。例如,'specs.cpu' 变为 ['specs', 'cpu']
  • 循环遍历路径组件,访问 JSON 数据中的嵌套值。

保存文件并退出 nano

现在,运行 Python 脚本。

python3 json_extractor.py

预期输出:

Dell
Intel i7

这个脚本连接到数据库,注册 json_extract 函数,然后使用它来提取 Laptop 的品牌和 CPU。

使用 JSON 查询过滤数据

在这个步骤中,你将使用自定义的 json_extract 函数来根据 JSON 字段中的值过滤数据。

再次打开 json_extractor.py 文件。

nano json_extractor.py

修改 json_extractor.py 文件,使其包含一个用于查询数据库的函数:

import sqlite3
import json

def json_extract(json_str, path):
    try:
        json_data = json.loads(json_str)
        path_components = path.split('.')
        value = json_data
        for component in path_components:
            value = value.get(component)
        return value
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

def connect_db(db_path):
    conn = sqlite3.connect(db_path)
    conn.create_function("json_extract", 2, json_extract)
    return conn

def filter_products(db_path, json_path, value):
    conn = connect_db(db_path)
    cursor = conn.cursor()
    query = f"SELECT * FROM products WHERE json_extract(details, '{json_path}') = '{value}'"
    cursor.execute(query)
    results = cursor.fetchall()
    conn.close()
    return results

if __name__ == '__main__':
    ## Example usage:
    dell_products = filter_products('mydatabase.db', 'brand', 'Dell')
    print("Products with brand 'Dell':", dell_products)

    intel_products = filter_products('mydatabase.db', 'specs.cpu', 'Intel i7')
    print("Products with CPU 'Intel i7':", intel_products)

这段代码添加了一个 filter_products 函数,它接受数据库路径、JSON 路径和值作为输入。然后,它连接到数据库,注册 json_extract 函数,并执行查询以查找 JSON 路径上的值与给定值匹配的所有产品。

保存文件并退出 nano

现在,运行 Python 脚本。

python3 json_extractor.py

预期输出:

Products with brand 'Dell': [(1, 'Laptop', '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}')]
Products with CPU 'Intel i7': [(1, 'Laptop', '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}')]

此输出显示与指定条件匹配的产品。

更新 JSON 值

在这个步骤中,你将学习如何更新 JSON 字段中的值。

打开 json_extractor.py 文件。

nano json_extractor.py

修改 json_extractor.py 文件,使其包含用于更新 JSON 和数据库的函数:

import sqlite3
import json

def json_extract(json_str, path):
    try:
        json_data = json.loads(json_str)
        path_components = path.split('.')
        value = json_data
        for component in path_components:
            value = value.get(component)
        return value
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

def connect_db(db_path):
    conn = sqlite3.connect(db_path)
    conn.create_function("json_extract", 2, json_extract)
    return conn

def filter_products(db_path, json_path, value):
    conn = connect_db(db_path)
    cursor = conn.cursor()
    query = f"SELECT * FROM products WHERE json_extract(details, '{json_path}') = '{value}'"
    cursor.execute(query)
    results = cursor.fetchall()
    conn.close()
    return results

def update_json_value(json_str, path, new_value):
    try:
        json_data = json.loads(json_str)
        path_components = path.split('.')
        target = json_data
        for i in range(len(path_components) - 1):
            target = target.get(path_components[i])

        target[path_components[-1]] = new_value
        return json.dumps(json_data)
    except (json.JSONDecodeError, AttributeError, TypeError):
        return json_str  ## Return original if update fails

def update_product_details(db_path, product_name, json_path, new_value):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    ## Get the current details
    cursor.execute("SELECT details FROM products WHERE name = ?", (product_name,))
    result = cursor.fetchone()
    if not result:
        conn.close()
        return False

    current_details = result[0]

    ## Update the JSON
    updated_details = update_json_value(current_details, json_path, new_value)

    ## Update the database
    cursor.execute("UPDATE products SET details = ? WHERE name = ?", (updated_details, product_name))
    conn.commit()
    conn.close()
    return True

if __name__ == '__main__':
    ## Example usage:
    update_product_details('mydatabase.db', 'Laptop', 'specs.memory', '32GB')
    print("Laptop memory updated to 32GB")

    ## Verify the update
    conn = sqlite3.connect('mydatabase.db')
    cursor = conn.cursor()
    cursor.execute("SELECT details FROM products WHERE name = 'Laptop'")
    updated_details = cursor.fetchone()[0]
    print("Updated Laptop details:", updated_details)
    conn.close()

这段代码添加了两个函数:

  • update_json_value:这个函数接受一个 JSON 字符串、一个路径和一个新值作为输入。它解析 JSON 字符串,更新指定路径上的值,并返回更新后的 JSON 字符串。
  • update_product_details:这个函数接受一个数据库路径、一个产品名称、一个 JSON 路径和一个新值作为输入。它连接到数据库,检索产品的当前 JSON 数据,使用 update_json_value 更新指定路径上的值,然后使用修改后的 JSON 数据更新数据库。

保存文件并退出 nano

现在,运行 Python 脚本。

python3 json_extractor.py

预期输出:

Laptop memory updated to 32GB
Updated Laptop details: {"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "32GB", "storage": "512GB SSD"}}

此输出确认 Laptop 的内存已在数据库中更新为 32GB。

总结

在这个实验中,你已经学习了如何在 SQLite 中处理 JSON 数据。你首先创建了一个数据库和一个表来存储 JSON 数据。然后,你学习了如何将 JSON 数据插入到表中。你创建了一个自定义的 Python 函数来从 JSON 数据中提取特定字段,并使用此函数根据 JSON 字段中的值过滤数据。最后,你学习了如何使用自定义的 Python 函数更新 JSON 字段中的值。这些技能为在 SQLite 数据库中有效地管理 JSON 数据奠定了基础。