专注于分布式系统架构AI辅助开发工具(Claude
Code中文周刊)

密钥体系与加密通信 - 第5章:设备导入与通信

智谱 GLM,支持多语言、多任务推理。从写作到代码生成,从搜索到知识问答,AI 生产力的中国解法。

第5章:设备导入与密钥通信

🎯 本章目标

学完这一章,你将理解:
– 客户如何导入自己的设备
– 设备注册时的密钥交换流程
– 设备与证书的绑定关系
– 设备间的加密通信

预计学习时间: 40分钟


5.1 设备导入场景

业务场景

客户A已经激活了平台,现在要导入设备:

客户A的环境:
├── 平台已安装并激活
├── 客户证书已导入
├── 北京地域证书已创建
└── 准备导入100台设备

核心问题

问题1:客户如何导入设备?
– 手动输入设备信息?
– 设备自动注册?
– 批量导入?

问题2:设备如何证明自己的身份?
– 设备有出厂密钥吗?
– 如何防止假冒设备?

问题3:设备与平台如何建立加密通信?
– 使用什么加密算法?
– 密钥如何交换?


5.2 设备导入流程设计

三种导入方式

方式1:手动导入(适合少量设备)
├── 管理员在平台输入设备信息
├── 平台生成设备证书
└── 管理员配置设备

方式2:设备自注册(适合大量设备)
├── 设备连接平台
├── 设备提供出厂凭证
├── 平台验证并签发证书
└── 设备自动配置

方式3:批量导入(适合预配置)
├── 管理员上传设备清单
├── 平台批量生成证书
└── 设备首次连接时激活

5.3 方式1:手动导入(推荐)

完整流程

class ManualDeviceImport:
    """
    手动设备导入系统
    """

    def __init__(self, customer_id, region_name, key_system):
        self.customer_id = customer_id
        self.region_name = region_name
        self.key_system = key_system

    def import_device(self, device_info):
        """
        手动导入设备

        device_info = {
            "serial_number": "SN-2024-001",
            "model": "CLEAN-X1",
            "mac_address": "AA:BB:CC:DD:EE:FF"
        }
        """
        print(f"\n开始导入设备:{device_info['serial_number']}")

        # 第1步:生成设备ID
        device_id = self._generate_device_id(device_info)
        print(f"  [1/5] 生成设备ID:{device_id}")

        # 第2步:生成设备密钥对
        device_key = RSA.generate(2048)
        print(f"  [2/5] 生成设备密钥对")

        # 第3步:创建设备证书
        device_cert = self._create_device_cert(device_id, device_info, device_key)
        print(f"  [3/5] 创建设备证书")

        # 第4步:生成设备配置包
        device_package = self._create_device_package(
            device_id,
            device_key,
            device_cert
        )
        print(f"  [4/5] 生成设备配置包")

        # 第5步:保存到数据库
        self._save_device(device_id, device_package)
        print(f"  [5/5] 保存到数据库")

        print(f"\n✅ 设备导入成功!")
        print(f"   设备ID:{device_id}")
        print(f"   序列号:{device_info['serial_number']}")
        print(f"   状态:待激活")

        return device_package

    def _generate_device_id(self, device_info):
        """
        生成设备ID(基于序列号和MAC地址)
        """
        unique_str = f"{device_info['serial_number']}:{device_info['mac_address']}"
        hash_value = hashlib.sha256(unique_str.encode()).hexdigest()
        return f"DEV-{hash_value[:16].upper()}"

    def _create_device_cert(self, device_id, device_info, device_key):
        """
        创建设备证书(用地域私钥签名)
        """
        # 获取地域密钥
        region_id = f"{self.customer_id}:{self.region_name}"
        region_key = self.key_system.region_keys[region_id]

        # 创建证书
        cert = {
            "type": "device",
            "device_id": device_id,
            "customer_id": self.customer_id,
            "region_name": self.region_name,
            "serial_number": device_info["serial_number"],
            "model": device_info["model"],
            "mac_address": device_info["mac_address"],
            "public_key": device_key.publickey().export_key().decode(),
            "issued_at": datetime.now().isoformat(),
            "status": "pending"
        }

        # 签名
        cert_data = json.dumps({k: v for k, v in cert.items() if k != "signature"}, sort_keys=True)
        hash_obj = SHA256.new(cert_data.encode())
        signature = pkcs1_15.new(region_key["private_key"]).sign(hash_obj)
        cert["signature"] = base64.b64encode(signature).decode()

        return cert

    def _create_device_package(self, device_id, device_key, device_cert):
        """
        创建设备配置包
        """
        # 获取完整证书链
        region_id = f"{self.customer_id}:{self.region_name}"
        region_cert = self.key_system.region_keys[region_id]["certificate"]
        customer_cert = self.key_system.customer_master_keys[self.customer_id]["certificate"]

        package = {
            "device_id": device_id,
            "private_key": device_key.export_key().decode(),
            "certificate": device_cert,
            "certificate_chain": [
                device_cert,
                region_cert,
                customer_cert
            ],
            "platform_config": {
                "api_endpoint": "https://platform.local",
                "heartbeat_interval": 60,
                "data_upload_interval": 300
            }
        }

        return package

    def _save_device(self, device_id, device_package):
        """
        保存设备信息到数据库
        """
        # 实际应该保存到数据库
        # 这里简化为内存存储
        pass

5.4 设备激活流程

设备首次连接

class DeviceActivation:
    """
    设备激活系统
    """

    def __init__(self, platform_public_key):
        self.platform_public_key = platform_public_key

    def activate_device(self, device_package):
        """
        设备激活流程
        """
        print("\n=== 设备激活流程 ===\n")

        device_id = device_package["device_id"]
        print(f"设备ID:{device_id}")

        # 第1步:加载设备私钥
        print("\n[步骤1] 加载设备私钥...")
        device_private_key = RSA.import_key(device_package["private_key"])
        print("✅ 设备私钥加载成功")

        # 第2步:验证证书链
        print("\n[步骤2] 验证证书链...")
        if not self._verify_cert_chain(device_package["certificate_chain"]):
            raise Exception("证书链验证失败")
        print("✅ 证书链验证成功")

        # 第3步:连接平台
        print("\n[步骤3] 连接平台...")
        connection = self._connect_to_platform(device_package)
        print(f"✅ 连接成功:{connection['endpoint']}")

        # 第4步:设备认证
        print("\n[步骤4] 设备认证...")
        auth_result = self._authenticate_device(device_private_key, device_package)
        print(f"✅ 认证成功:{auth_result['session_id']}")

        # 第5步:建立加密通道
        print("\n[步骤5] 建立加密通道...")
        secure_channel = self._establish_secure_channel(auth_result)
        print(f"✅ 加密通道建立成功:{secure_channel['algorithm']}")

        print("\n=== 设备激活完成 ===\n")

        return {
            "device_id": device_id,
            "status": "active",
            "session_id": auth_result["session_id"],
            "secure_channel": secure_channel
        }

    def _verify_cert_chain(self, cert_chain):
        """
        验证证书链
        """
        # 设备证书 ← 地域证书 ← 客户证书
        device_cert = cert_chain[0]
        region_cert = cert_chain[1]
        customer_cert = cert_chain[2]

        # 验证设备证书
        region_public_key = RSA.import_key(region_cert["public_key"])
        cert_data = json.dumps({k: v for k, v in device_cert.items() if k != "signature"}, sort_keys=True)
        hash_obj = SHA256.new(cert_data.encode())
        signature = base64.b64decode(device_cert["signature"])

        try:
            pkcs1_15.new(region_public_key).verify(hash_obj, signature)
        except:
            return False

        # 验证地域证书
        customer_public_key = RSA.import_key(customer_cert["public_key"])
        cert_data = json.dumps({k: v for k, v in region_cert.items() if k != "signature"}, sort_keys=True)
        hash_obj = SHA256.new(cert_data.encode())
        signature = base64.b64decode(region_cert["signature"])

        try:
            pkcs1_15.new(customer_public_key).verify(hash_obj, signature)
        except:
            return False

        # 验证客户证书
        cert_data = json.dumps({k: v for k, v in customer_cert.items() if k != "signature"}, sort_keys=True)
        hash_obj = SHA256.new(cert_data.encode())
        signature = base64.b64decode(customer_cert["signature"])

        try:
            pkcs1_15.new(self.platform_public_key).verify(hash_obj, signature)
        except:
            return False

        return True

    def _connect_to_platform(self, device_package):
        """
        连接到平台
        """
        endpoint = device_package["platform_config"]["api_endpoint"]
        return {
            "endpoint": endpoint,
            "connected_at": datetime.now().isoformat()
        }

    def _authenticate_device(self, device_private_key, device_package):
        """
        设备认证(挑战-响应)
        """
        # 平台发送挑战
        challenge = secrets.token_bytes(32)

        # 设备用私钥签名
        hash_obj = SHA256.new(challenge)
        signature = pkcs1_15.new(device_private_key).sign(hash_obj)

        # 平台验证签名(这里简化)
        session_id = secrets.token_hex(16)

        return {
            "session_id": session_id,
            "authenticated_at": datetime.now().isoformat()
        }

    def _establish_secure_channel(self, auth_result):
        """
        建立加密通道(混合加密)
        """
        # 生成对称密钥(用于数据传输)
        session_key = get_random_bytes(32)  # AES-256

        return {
            "algorithm": "AES-256-GCM",
            "session_key": base64.b64encode(session_key).decode(),
            "established_at": datetime.now().isoformat()
        }

5.5 设备间加密通信

通信架构

设备 ←→ 平台:混合加密
├── 密钥交换:RSA-2048
└── 数据传输:AES-256-GCM

设备 ←→ 设备:端到端加密
├── 通过平台中转
└── 平台无法解密

完整实现

class SecureDeviceCommunication:
    """
    设备安全通信系统
    """

    def __init__(self, device_id, device_private_key, session_key):
        self.device_id = device_id
        self.device_private_key = device_private_key
        self.session_key = base64.b64decode(session_key)

    def send_data(self, data):
        """
        发送加密数据到平台
        """
        # 使用AES-256-GCM加密
        cipher = AES.new(self.session_key, AES.MODE_GCM)
        ciphertext, tag = cipher.encrypt_and_digest(json.dumps(data).encode())

        encrypted_package = {
            "device_id": self.device_id,
            "timestamp": datetime.now().isoformat(),
            "nonce": base64.b64encode(cipher.nonce).decode(),
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "tag": base64.b64encode(tag).decode()
        }

        return encrypted_package

    def receive_data(self, encrypted_package):
        """
        接收并解密平台数据
        """
        # 解密
        nonce = base64.b64decode(encrypted_package["nonce"])
        ciphertext = base64.b64decode(encrypted_package["ciphertext"])
        tag = base64.b64decode(encrypted_package["tag"])

        cipher = AES.new(self.session_key, AES.MODE_GCM, nonce=nonce)
        plaintext = cipher.decrypt_and_verify(ciphertext, tag)

        return json.loads(plaintext.decode())

    def send_to_device(self, target_device_id, data, target_public_key):
        """
        发送端到端加密消息给其他设备
        """
        # 生成临时对称密钥
        temp_key = get_random_bytes(32)

        # 用对称密钥加密数据
        cipher = AES.new(temp_key, AES.MODE_GCM)
        ciphertext, tag = cipher.encrypt_and_digest(json.dumps(data).encode())

        # 用目标设备公钥加密对称密钥
        target_public_key_obj = RSA.import_key(target_public_key)
        cipher_rsa = PKCS1_OAEP.new(target_public_key_obj)
        encrypted_key = cipher_rsa.encrypt(temp_key)

        # 用自己的私钥签名
        hash_obj = SHA256.new(ciphertext)
        signature = pkcs1_15.new(self.device_private_key).sign(hash_obj)

        e2e_package = {
            "from_device": self.device_id,
            "to_device": target_device_id,
            "encrypted_key": base64.b64encode(encrypted_key).decode(),
            "nonce": base64.b64encode(cipher.nonce).decode(),
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "tag": base64.b64encode(tag).decode(),
            "signature": base64.b64encode(signature).decode()
        }

        return e2e_package

5.6 完整示例

端到端流程

# ========== 平台方:准备环境 ==========
platform_root_key = RSA.generate(4096)
key_system = HierarchicalKeySystem()

# 创建客户和地域
customer_cert = key_system.create_customer("CUST-A", "某医院")
region_cert = key_system.create_region_key("CUST-A", "北京")

# ========== 客户方:导入设备 ==========
importer = ManualDeviceImport("CUST-A", "北京", key_system)

device_info = {
    "serial_number": "SN-2024-001",
    "model": "CLEAN-X1",
    "mac_address": "AA:BB:CC:DD:EE:FF"
}

device_package = importer.import_device(device_info)

# ========== 设备方:激活设备 ==========
activator = DeviceActivation(platform_root_key.publickey())
activation_result = activator.activate_device(device_package)

# ========== 设备方:发送数据 ==========
comm = SecureDeviceCommunication(
    device_package["device_id"],
    RSA.import_key(device_package["private_key"]),
    activation_result["secure_channel"]["session_key"]
)

# 发送清扫数据
cleaning_data = {
    "type": "cleaning_report",
    "area": 150.5,
    "duration": 3600,
    "battery": 85
}

encrypted_data = comm.send_data(cleaning_data)
print(f"\n✅ 数据已加密发送")
print(f"   密文长度:{len(encrypted_data['ciphertext'])} 字节")

5.7 本章小结

核心要点

  1. 设备导入:手动导入、自注册、批量导入三种方式
  2. 设备激活:证书链验证 + 挑战响应认证
  3. 加密通信:混合加密(RSA + AES)
  4. 端到端加密:设备间直接加密,平台无法解密

通信安全

┌─────────────────────────────────┐
│      设备通信安全体系            │
├─────────────────────────────────┤
│ 1. 身份认证:证书链 + 挑战响应   │
│ 2. 密钥交换:RSA-2048           │
│ 3. 数据加密:AES-256-GCM        │
│ 4. 完整性:GCM认证标签          │
│ 5. 防重放:时间戳 + Nonce       │
└─────────────────────────────────┘

🤔 思考题

  1. 安全题:为什么要用混合加密,而不是只用RSA或只用AES?

  2. 场景题:如果设备的私钥泄露了,应该如何处理?

  3. 设计题:如何实现设备的”离线激活”(设备无法联网)?


📚 下一章预告

第6章我们将学习实战案例
– 医院多院区部署
– 工厂跨国部署
– 混合云部署

每个案例都有完整的代码实现!


本章关键词
– 设备导入
– 设备激活
– 证书链验证
– 挑战响应
– 混合加密
– 端到端加密
– AES-GCM

赞(0)
未经允许不得转载:Toy Tech Blog » 密钥体系与加密通信 - 第5章:设备导入与通信
免费、开放、可编程的智能路由方案,让你的服务随时随地在线。

评论 抢沙发

十年稳如初 — LocVPS,用时间证明实力

10+ 年老牌云主机服务商,全球机房覆盖,性能稳定、价格厚道。

老品牌,更懂稳定的价值你的第一台云服务器,从 LocVPS 开始