第5章:设备导入与密钥通信
🎯 本章目标
学完这一章,你将理解:
– 客户如何导入自己的设备
– 设备注册时的密钥交换流程
– 设备与证书的绑定关系
– 设备间的加密通信
预计学习时间: 40分钟
5.1 设备导入场景
业务场景
客户A已经激活了平台,现在要导入设备:
客户A的环境:
├── 平台已安装并激活
├── 客户证书已导入
├── 北京地域证书已创建
└── 准备导入100台设备
核心问题
问题1:客户如何导入设备?
– 手动输入设备信息?
– 设备自动注册?
– 批量导入?
问题2:设备如何证明自己的身份?
– 设备有出厂密钥吗?
– 如何防止假冒设备?
问题3:设备与平台如何建立加密通信?
– 使用什么加密算法?
– 密钥如何交换?
5.2 设备导入流程设计
三种导入方式
方式1:手动导入(适合少量设备)
├── 管理员在平台输入设备信息
├── 平台生成设备证书
└── 管理员配置设备
方式2:设备自注册(适合大量设备)
├── 设备连接平台
├── 设备提供出厂凭证
├── 平台验证并签发证书
└── 设备自动配置
方式3:批量导入(适合预配置)
├── 管理员上传设备清单
├── 平台批量生成证书
└── 设备首次连接时激活
5.3 方式1:手动导入(推荐)
完整流程
class ManualDeviceImport:
"""
手动设备导入系统
"""
def __init__(self, customer_id, region_name, key_system):
self.customer_id = customer_id
self.region_name = region_name
self.key_system = key_system
def import_device(self, device_info):
"""
手动导入设备
device_info = {
"serial_number": "SN-2024-001",
"model": "CLEAN-X1",
"mac_address": "AA:BB:CC:DD:EE:FF"
}
"""
print(f"\n开始导入设备:{device_info['serial_number']}")
# 第1步:生成设备ID
device_id = self._generate_device_id(device_info)
print(f" [1/5] 生成设备ID:{device_id}")
# 第2步:生成设备密钥对
device_key = RSA.generate(2048)
print(f" [2/5] 生成设备密钥对")
# 第3步:创建设备证书
device_cert = self._create_device_cert(device_id, device_info, device_key)
print(f" [3/5] 创建设备证书")
# 第4步:生成设备配置包
device_package = self._create_device_package(
device_id,
device_key,
device_cert
)
print(f" [4/5] 生成设备配置包")
# 第5步:保存到数据库
self._save_device(device_id, device_package)
print(f" [5/5] 保存到数据库")
print(f"\n✅ 设备导入成功!")
print(f" 设备ID:{device_id}")
print(f" 序列号:{device_info['serial_number']}")
print(f" 状态:待激活")
return device_package
def _generate_device_id(self, device_info):
"""
生成设备ID(基于序列号和MAC地址)
"""
unique_str = f"{device_info['serial_number']}:{device_info['mac_address']}"
hash_value = hashlib.sha256(unique_str.encode()).hexdigest()
return f"DEV-{hash_value[:16].upper()}"
def _create_device_cert(self, device_id, device_info, device_key):
"""
创建设备证书(用地域私钥签名)
"""
# 获取地域密钥
region_id = f"{self.customer_id}:{self.region_name}"
region_key = self.key_system.region_keys[region_id]
# 创建证书
cert = {
"type": "device",
"device_id": device_id,
"customer_id": self.customer_id,
"region_name": self.region_name,
"serial_number": device_info["serial_number"],
"model": device_info["model"],
"mac_address": device_info["mac_address"],
"public_key": device_key.publickey().export_key().decode(),
"issued_at": datetime.now().isoformat(),
"status": "pending"
}
# 签名
cert_data = json.dumps({k: v for k, v in cert.items() if k != "signature"}, sort_keys=True)
hash_obj = SHA256.new(cert_data.encode())
signature = pkcs1_15.new(region_key["private_key"]).sign(hash_obj)
cert["signature"] = base64.b64encode(signature).decode()
return cert
def _create_device_package(self, device_id, device_key, device_cert):
"""
创建设备配置包
"""
# 获取完整证书链
region_id = f"{self.customer_id}:{self.region_name}"
region_cert = self.key_system.region_keys[region_id]["certificate"]
customer_cert = self.key_system.customer_master_keys[self.customer_id]["certificate"]
package = {
"device_id": device_id,
"private_key": device_key.export_key().decode(),
"certificate": device_cert,
"certificate_chain": [
device_cert,
region_cert,
customer_cert
],
"platform_config": {
"api_endpoint": "https://platform.local",
"heartbeat_interval": 60,
"data_upload_interval": 300
}
}
return package
def _save_device(self, device_id, device_package):
"""
保存设备信息到数据库
"""
# 实际应该保存到数据库
# 这里简化为内存存储
pass
5.4 设备激活流程
设备首次连接
class DeviceActivation:
"""
设备激活系统
"""
def __init__(self, platform_public_key):
self.platform_public_key = platform_public_key
def activate_device(self, device_package):
"""
设备激活流程
"""
print("\n=== 设备激活流程 ===\n")
device_id = device_package["device_id"]
print(f"设备ID:{device_id}")
# 第1步:加载设备私钥
print("\n[步骤1] 加载设备私钥...")
device_private_key = RSA.import_key(device_package["private_key"])
print("✅ 设备私钥加载成功")
# 第2步:验证证书链
print("\n[步骤2] 验证证书链...")
if not self._verify_cert_chain(device_package["certificate_chain"]):
raise Exception("证书链验证失败")
print("✅ 证书链验证成功")
# 第3步:连接平台
print("\n[步骤3] 连接平台...")
connection = self._connect_to_platform(device_package)
print(f"✅ 连接成功:{connection['endpoint']}")
# 第4步:设备认证
print("\n[步骤4] 设备认证...")
auth_result = self._authenticate_device(device_private_key, device_package)
print(f"✅ 认证成功:{auth_result['session_id']}")
# 第5步:建立加密通道
print("\n[步骤5] 建立加密通道...")
secure_channel = self._establish_secure_channel(auth_result)
print(f"✅ 加密通道建立成功:{secure_channel['algorithm']}")
print("\n=== 设备激活完成 ===\n")
return {
"device_id": device_id,
"status": "active",
"session_id": auth_result["session_id"],
"secure_channel": secure_channel
}
def _verify_cert_chain(self, cert_chain):
"""
验证证书链
"""
# 设备证书 ← 地域证书 ← 客户证书
device_cert = cert_chain[0]
region_cert = cert_chain[1]
customer_cert = cert_chain[2]
# 验证设备证书
region_public_key = RSA.import_key(region_cert["public_key"])
cert_data = json.dumps({k: v for k, v in device_cert.items() if k != "signature"}, sort_keys=True)
hash_obj = SHA256.new(cert_data.encode())
signature = base64.b64decode(device_cert["signature"])
try:
pkcs1_15.new(region_public_key).verify(hash_obj, signature)
except:
return False
# 验证地域证书
customer_public_key = RSA.import_key(customer_cert["public_key"])
cert_data = json.dumps({k: v for k, v in region_cert.items() if k != "signature"}, sort_keys=True)
hash_obj = SHA256.new(cert_data.encode())
signature = base64.b64decode(region_cert["signature"])
try:
pkcs1_15.new(customer_public_key).verify(hash_obj, signature)
except:
return False
# 验证客户证书
cert_data = json.dumps({k: v for k, v in customer_cert.items() if k != "signature"}, sort_keys=True)
hash_obj = SHA256.new(cert_data.encode())
signature = base64.b64decode(customer_cert["signature"])
try:
pkcs1_15.new(self.platform_public_key).verify(hash_obj, signature)
except:
return False
return True
def _connect_to_platform(self, device_package):
"""
连接到平台
"""
endpoint = device_package["platform_config"]["api_endpoint"]
return {
"endpoint": endpoint,
"connected_at": datetime.now().isoformat()
}
def _authenticate_device(self, device_private_key, device_package):
"""
设备认证(挑战-响应)
"""
# 平台发送挑战
challenge = secrets.token_bytes(32)
# 设备用私钥签名
hash_obj = SHA256.new(challenge)
signature = pkcs1_15.new(device_private_key).sign(hash_obj)
# 平台验证签名(这里简化)
session_id = secrets.token_hex(16)
return {
"session_id": session_id,
"authenticated_at": datetime.now().isoformat()
}
def _establish_secure_channel(self, auth_result):
"""
建立加密通道(混合加密)
"""
# 生成对称密钥(用于数据传输)
session_key = get_random_bytes(32) # AES-256
return {
"algorithm": "AES-256-GCM",
"session_key": base64.b64encode(session_key).decode(),
"established_at": datetime.now().isoformat()
}
5.5 设备间加密通信
通信架构
设备 ←→ 平台:混合加密
├── 密钥交换:RSA-2048
└── 数据传输:AES-256-GCM
设备 ←→ 设备:端到端加密
├── 通过平台中转
└── 平台无法解密
完整实现
class SecureDeviceCommunication:
"""
设备安全通信系统
"""
def __init__(self, device_id, device_private_key, session_key):
self.device_id = device_id
self.device_private_key = device_private_key
self.session_key = base64.b64decode(session_key)
def send_data(self, data):
"""
发送加密数据到平台
"""
# 使用AES-256-GCM加密
cipher = AES.new(self.session_key, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(json.dumps(data).encode())
encrypted_package = {
"device_id": self.device_id,
"timestamp": datetime.now().isoformat(),
"nonce": base64.b64encode(cipher.nonce).decode(),
"ciphertext": base64.b64encode(ciphertext).decode(),
"tag": base64.b64encode(tag).decode()
}
return encrypted_package
def receive_data(self, encrypted_package):
"""
接收并解密平台数据
"""
# 解密
nonce = base64.b64decode(encrypted_package["nonce"])
ciphertext = base64.b64decode(encrypted_package["ciphertext"])
tag = base64.b64decode(encrypted_package["tag"])
cipher = AES.new(self.session_key, AES.MODE_GCM, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return json.loads(plaintext.decode())
def send_to_device(self, target_device_id, data, target_public_key):
"""
发送端到端加密消息给其他设备
"""
# 生成临时对称密钥
temp_key = get_random_bytes(32)
# 用对称密钥加密数据
cipher = AES.new(temp_key, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(json.dumps(data).encode())
# 用目标设备公钥加密对称密钥
target_public_key_obj = RSA.import_key(target_public_key)
cipher_rsa = PKCS1_OAEP.new(target_public_key_obj)
encrypted_key = cipher_rsa.encrypt(temp_key)
# 用自己的私钥签名
hash_obj = SHA256.new(ciphertext)
signature = pkcs1_15.new(self.device_private_key).sign(hash_obj)
e2e_package = {
"from_device": self.device_id,
"to_device": target_device_id,
"encrypted_key": base64.b64encode(encrypted_key).decode(),
"nonce": base64.b64encode(cipher.nonce).decode(),
"ciphertext": base64.b64encode(ciphertext).decode(),
"tag": base64.b64encode(tag).decode(),
"signature": base64.b64encode(signature).decode()
}
return e2e_package
5.6 完整示例
端到端流程
# ========== 平台方:准备环境 ==========
platform_root_key = RSA.generate(4096)
key_system = HierarchicalKeySystem()
# 创建客户和地域
customer_cert = key_system.create_customer("CUST-A", "某医院")
region_cert = key_system.create_region_key("CUST-A", "北京")
# ========== 客户方:导入设备 ==========
importer = ManualDeviceImport("CUST-A", "北京", key_system)
device_info = {
"serial_number": "SN-2024-001",
"model": "CLEAN-X1",
"mac_address": "AA:BB:CC:DD:EE:FF"
}
device_package = importer.import_device(device_info)
# ========== 设备方:激活设备 ==========
activator = DeviceActivation(platform_root_key.publickey())
activation_result = activator.activate_device(device_package)
# ========== 设备方:发送数据 ==========
comm = SecureDeviceCommunication(
device_package["device_id"],
RSA.import_key(device_package["private_key"]),
activation_result["secure_channel"]["session_key"]
)
# 发送清扫数据
cleaning_data = {
"type": "cleaning_report",
"area": 150.5,
"duration": 3600,
"battery": 85
}
encrypted_data = comm.send_data(cleaning_data)
print(f"\n✅ 数据已加密发送")
print(f" 密文长度:{len(encrypted_data['ciphertext'])} 字节")
5.7 本章小结
核心要点
- 设备导入:手动导入、自注册、批量导入三种方式
- 设备激活:证书链验证 + 挑战响应认证
- 加密通信:混合加密(RSA + AES)
- 端到端加密:设备间直接加密,平台无法解密
通信安全
┌─────────────────────────────────┐
│ 设备通信安全体系 │
├─────────────────────────────────┤
│ 1. 身份认证:证书链 + 挑战响应 │
│ 2. 密钥交换:RSA-2048 │
│ 3. 数据加密:AES-256-GCM │
│ 4. 完整性:GCM认证标签 │
│ 5. 防重放:时间戳 + Nonce │
└─────────────────────────────────┘
🤔 思考题
-
安全题:为什么要用混合加密,而不是只用RSA或只用AES?
-
场景题:如果设备的私钥泄露了,应该如何处理?
-
设计题:如何实现设备的”离线激活”(设备无法联网)?
📚 下一章预告
第6章我们将学习实战案例:
– 医院多院区部署
– 工厂跨国部署
– 混合云部署
每个案例都有完整的代码实现!
本章关键词
– 设备导入
– 设备激活
– 证书链验证
– 挑战响应
– 混合加密
– 端到端加密
– AES-GCM








最新评论
照片令人惊艳。万分感谢 温暖。
氛围绝佳。由衷感谢 感受。 你的博客让人一口气读完。敬意 真诚。
实用的 杂志! 越来越好!
又到年底了,真快!
研究你的文章, 我体会到美好的心情。
感谢激励。由衷感谢
好久没见过, 如此温暖又有信息量的博客。敬意。
很稀有, 这么鲜明的文字。谢谢。