JSON 处理

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,Python 提供了内置的 json 模块来处理 JSON 数据。

0x01. 基本使用

Python 与 JSON 类型对应

"""
Python 类型       JSON 类型
-----------      ---------
dict             object
list, tuple      array
str              string
int, float       number
True             true
False            false
None             null
"""

import json

# Python 数据
data = {
    'name': 'Alice',
    'age': 25,
    'scores': [90, 85, 92],
    'active': True,
    'address': None
}

# 编码为 JSON 字符串
json_str = json.dumps(data)
print(json_str)
# {"name": "Alice", "age": 25, "scores": [90, 85, 92], "active": true, "address": null}

# 解码为 Python 对象
python_obj = json.loads(json_str)
print(python_obj)
# {'name': 'Alice', 'age': 25, 'scores': [90, 85, 92], 'active': True, 'address': None}

文件操作

import json

# 写入 JSON 文件
data = {'name': 'Alice', 'age': 25}
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 读取 JSON 文件
with open('data.json', 'r', encoding='utf-8') as f:
    data = json.load(f)
    print(data)

0x02. 格式化输出

import json

data = {
    'name': 'Alice',
    'age': 25,
    'scores': [90, 85, 92],
    'address': {
        'city': '北京',
        'street': '中关村'
    }
}

# 缩进格式化
print(json.dumps(data, indent=2))

# 自定义缩进和分隔符
print(json.dumps(data, indent=4, separators=(',', ': ')))

# 排序键
print(json.dumps(data, indent=2, sort_keys=True))

# 中文处理
print(json.dumps(data, ensure_ascii=False, indent=2))
# {"name": "Alice", "age": 25, "address": {"city": "北京", ...}}

0x03. 编码选项

import json
from datetime import datetime
from decimal import Decimal

# 自定义编码器
class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        if isinstance(obj, Decimal):
            return float(obj)
        if isinstance(obj, set):
            return list(obj)
        return super().default(obj)

# 使用自定义编码器
data = {
    'time': datetime.now(),
    'price': Decimal('19.99'),
    'tags': {'python', 'json'}
}

json_str = json.dumps(data, cls=CustomEncoder, indent=2)
print(json_str)

# 使用 default 参数
def custom_encoder(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f'Object of type {type(obj)} is not JSON serializable')

json_str = json.dumps(data, default=custom_encoder)

0x04. 解码选项

import json
from datetime import datetime

# 自定义解码
json_str = '{"time": "2024-01-15T10:30:00", "name": "Alice"}'

def custom_decoder(obj):
    for key, value in obj.items():
        if key == 'time':
            obj[key] = datetime.fromisoformat(value)
    return obj

data = json.loads(json_str, object_hook=custom_decoder)
print(data['time'])  # 2024-01-15 10:30:00

# 使用 object_pairs_hook
def handle_duplicates(pairs):
    result = {}
    for key, value in pairs:
        if key in result:
            if isinstance(result[key], list):
                result[key].append(value)
            else:
                result[key] = [result[key], value]
        else:
            result[key] = value
    return result

json_str = '{"a": 1, "a": 2, "b": 3}'
data = json.loads(json_str, object_pairs_hook=handle_duplicates)
print(data)  # {'a': [1, 2], 'b': 3}

# 解析类
class User:
    def __init__(self, name, age):
        self.name = name
        self.age = age

    def __repr__(self):
        return f'User(name={self.name}, age={self.age})'

def decode_user(obj):
    if 'name' in obj and 'age' in obj:
        return User(obj['name'], obj['age'])
    return obj

json_str = '{"name": "Alice", "age": 25}'
user = json.loads(json_str, object_hook=decode_user)
print(user)  # User(name=Alice, age=25)

0x05. 处理大型 JSON

流式处理

import json

# 按行读取 JSON(JSON Lines 格式)
def read_json_lines(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

# 使用
for record in read_json_lines('data.jsonl'):
    print(record)

# 写入 JSON Lines
def write_json_lines(filename, records):
    with open(filename, 'w', encoding='utf-8') as f:
        for record in records:
            f.write(json.dumps(record, ensure_ascii=False) + '\n')

# 使用
records = [
    {'id': 1, 'name': 'Alice'},
    {'id': 2, 'name': 'Bob'},
]
write_json_lines('output.jsonl', records)

增量解析

import json
from json import JSONDecoder, JSONDecodeError

def decode_json_objects(json_str):
    """解析多个 JSON 对象"""
    decoder = JSONDecoder()
    pos = 0
    while pos < len(json_str):
        try:
            obj, end = decoder.raw_decode(json_str, pos)
            yield obj
            pos = end
        except JSONDecodeError:
            pos += 1

# 使用
json_str = '{"a": 1} {"b": 2} {"c": 3}'
for obj in decode_json_objects(json_str):
    print(obj)

0x06. 实用工具

JSON 比较

import json

def compare_json(json1, json2):
    """比较两个 JSON 对象"""
    if type(json1) != type(json2):
        return False

    if isinstance(json1, dict):
        if set(json1.keys()) != set(json2.keys()):
            return False
        return all(compare_json(json1[k], json2[k]) for k in json1)

    if isinstance(json1, list):
        if len(json1) != len(json2):
            return False
        return all(compare_json(a, b) for a, b in zip(json1, json2))

    return json1 == json2

# 使用
data1 = {'a': 1, 'b': [2, 3]}
data2 = {'a': 1, 'b': [2, 3]}
data3 = {'a': 1, 'b': [2, 4]}

print(compare_json(data1, data2))  # True
print(compare_json(data1, data3))  # False

JSON 路径查询

import json
from typing import Any

def get_by_path(data: Any, path: str, default=None):
    """通过路径获取值
    路径格式: 'key1.key2[0].key3'
    """
    keys = path.replace('[', '.[').split('.')
    current = data

    for key in keys:
        if key.startswith('[') and key.endswith(']'):
            # 数组索引
            try:
                index = int(key[1:-1])
                current = current[index]
            except (ValueError, IndexError, TypeError):
                return default
        else:
            # 字典键
            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                return default

    return current

# 使用
data = {
    'users': [
        {'name': 'Alice', 'scores': [90, 85]},
        {'name': 'Bob', 'scores': [88, 92]}
    ]
}

print(get_by_path(data, 'users[0].name'))       # Alice
print(get_by_path(data, 'users[1].scores[0]'))   # 88
print(get_by_path(data, 'users[2].name', 'N/A')) # N/A

JSON 转换

import json
from collections import OrderedDict

def flatten_json(data, parent_key='', sep='.'):
    """展平嵌套 JSON"""
    items = []
    if isinstance(data, dict):
        for k, v in data.items():
            new_key = f'{parent_key}{sep}{k}' if parent_key else k
            items.extend(flatten_json(v, new_key, sep).items())
    elif isinstance(data, list):
        for i, v in enumerate(data):
            new_key = f'{parent_key}{sep}{i}' if parent_key else str(i)
            items.extend(flatten_json(v, new_key, sep).items())
    else:
        items.append((parent_key, data))
    return dict(items)

def unflatten_json(data, sep='.'):
    """还原展平的 JSON"""
    result = {}
    for key, value in data.items():
        parts = key.split(sep)
        current = result
        for part in parts[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]
        current[parts[-1]] = value
    return result

# 使用
nested = {
    'user': {
        'name': 'Alice',
        'address': {
            'city': 'Beijing'
        }
    }
}

flat = flatten_json(nested)
print(flat)
# {'user.name': 'Alice', 'user.address.city': 'Beijing'}

original = unflatten_json(flat)
print(original)
# {'user': {'name': 'Alice', 'address': {'city': 'Beijing'}}}

0x07. 配置管理

import json
from pathlib import Path
from typing import Any, Dict

class JSONConfig:
    """JSON 配置文件管理器"""

    def __init__(self, config_path: str):
        self.config_path = Path(config_path)
        self.config: Dict[str, Any] = {}
        self.load()

    def load(self):
        """加载配置"""
        if self.config_path.exists():
            with open(self.config_path, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save(self):
        """保存配置"""
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, ensure_ascii=False, indent=2)

    def get(self, key: str, default=None):
        """获取配置值"""
        keys = key.split('.')
        value = self.config
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        return value

    def set(self, key: str, value: Any):
        """设置配置值"""
        keys = key.split('.')
        config = self.config
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        config[keys[-1]] = value
        self.save()

# 使用
config = JSONConfig('config.json')
config.set('database.host', 'localhost')
config.set('database.port', 5432)

print(config.get('database.host'))  # localhost
print(config.get('database.port'))  # 5432

参考

  1. Python 官方文档 - json
  2. JSON 官方网站
  3. JSON Lines 格式