HTTPS中的数字认证原理

本文最后更新于:2024年7月20日 晚上

HTTPS中的数字认证原理

笔者在近期学习了数字认证技术相关的东西,对其中公钥基础设施PKI进行较为深入的挖掘,于是写下此篇博客。🙈

https与http

在开始CA相关的东西时,我们首先应该了解https和http。

HTTP(Hypertext Transfer Protocol)和HTTPS(HTTP Secure)是两种常见的用于在客户端和服务器之间传输数据的协议。从网络协议的底层来说,两者并无区别,但是https实现了更加安全的传输过程。

具体有下面的区别:

  1. 默认端口:http是80,https是443
  2. 传输内容:http是明文传输,https是加密传输
  3. 建立连接方式:http是直接连接,https需要SSL/TLS握手过程

数字认证原理

公钥基础设施PKI

举一个简单的例子,一个用户在浏览器里访问网站时,如何才能判别访问的网站是安全可信的呢?

此时只有引入绝对公正的第三方(一般为政府)作为证明者,证明所访问的网站是可信的,这就是公钥基础设施的由来。

我们已经知道,https在为客户端和服务端建立连接之前需要进行SSL/TLS握手过程,那么这个过程的实现基础就是PKI。

那么PKI中,用什么作为信任根或者说信任凭证呢,就是我们常说的证书啦😀

数字证书是最常见的证书类型,它包含了公钥、实体的身份信息以及证书颁发机构的签名。理所当然的,证书使用的是非对称密码体制。正常情况下,系统会安装CA的根证书用于验证其签发的证书

目前的浏览器会对用户进行提醒,如果网站的证书过期,会提示有风险。

证书颁发机构CA&&注册机构RA

RA是注册机构(Registration Authority)的缩写。

在公钥基础设施(PKI)中,注册机构是与证书颁发机构(CA)合作的实体,负责收集、审核和验证实体的身份信息,并向CA提供这些信息以用于数字证书的签发。注册机构在PKI中发挥重要的角色,它们与CA一起确保数字证书的可信性和准确性。注册机构负责收集申请者的身份信息,可能包括个人身份证明、组织文件、域名所有权验证等。注册机构会对这些信息进行审核和验证,确保其真实性和合法性。

CA是证书颁发机构(Certificate Authority)的缩写。

证书颁发机构是一个受信任的实体,负责签发和管理数字证书,以验证实体的身份和信息的真实性。在公钥基础设施(PKI)中,CA是PKI体系结构的核心组成部分之一。它们使用自己的私钥对数字证书进行签名,并为实体(如网站、个人或设备)颁发包含公钥和身份信息的数字证书。

CA的主要职责包括:

  1. 身份验证:CA会对申请数字证书的实体进行身份验证,以确保其合法性和真实性。这可以包括验证个人的身份证明、组织的注册文件、域名所有权等,当然也可以通过RA进行代理。
  2. 签发证书:一旦CA验证了申请者的身份信息,它会使用自己的私钥和根证书对数字证书进行签名,并将证书发送给申请者。该证书包含了实体的公钥、身份信息以及CA的签名。
  3. 证书管理:CA负责管理已签发的数字证书。这包括证书的更新、吊销和过期处理等操作。CA还维护一个公共的证书撤销列表(CRL),其中包含已吊销的证书的信息。
  4. 可信性建立:由于CA是受信任的实体,其签名可以用于验证数字证书的真实性和完整性。其他实体可以通过验证证书的签名和查看CA的信任列表来确认证书的有效性。

证书的颁发&验证

原理

从上文已经知道,证书的颁发和验证是通过CA来完成的,那么下面具体讲一讲实现过程。

一般来说,在单向认证过程中只需要用户验证服务端证书,只有服务端需要请求CA颁发证书。那么服务端,具体来讲就是一个网站,需要向CA申请一个证书,首先发送证书签发请求,然后通过CA的身份验证合法性后,继续向CA发送一个证书请求文件CSR,CA在接收到证书请求文件后,会使用根证书和私钥对其进行签名,然后就会生成一个证书文件CRT,返回给服务端。

大概过程如下:

证书的验证是通过系统安装的CA机构的根证书来验证的。

那么双向认证是基于单向认证的,很好理解,笔者将双向认证的过程也画了一张图😋:

代码实现

笔者知道,上面的文字叙述过于繁琐,那么下面笔者上代码,这是笔者用Python和OpenSSL库开发的一个的基于双向认证的CA签发模拟程序。

大概有如下效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import subprocess
import socket
import concurrent.futures
from pathlib import Path

CA_host = "x.x.x.x"
CA_port = 54321
CA_download_port = 12345
cert_num = 0

# 根证书生成
def Gen_rootCA():
# 生成私钥
subprocess.run(['openssl', 'genrsa', '-des3', '-passout', 'pass:3xsh0re', '-out', 'rootCA.key', '2048'])
# 指定证书主题字段信息
subject_info = "/C=CN/ST=Beijing/L=Haidian/O=USTB/OU=USTB_CA/CN=USTB.CA"
# 生成自签名证书
subprocess.run(
['openssl', 'req', '-new', '-x509', '-passin', 'pass:3xsh0re', '-key', 'rootCA.key', '-days', '365', '-out',
'rootCA.crt', '-subj', subject_info], capture_output=True)

print("\033[32m[+]\033[0m自签名证书生成完成")


# 为申请者生成证书
def Sign_Cert():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as CA_socket:
CA_socket.bind(("0.0.0.0", CA_port))
CA_socket.listen(1)
print('\033[32m[+]\033[0m等待客户端连接...')
while True:
conn, addr = CA_socket.accept()
print(f'\033[32m[+]\033[0m自{addr[0]}的申请者已连接')

# 生成证书号
global cert_num
sig = f'{addr[0].replace(".", "")}_{addr[1]}_{cert_num}'
cert_num += 1
with conn:
conn.settimeout(2)
csr_data = b''
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
csr_data += data
pass
except socket.timeout:
break
# 生成 req.csr 文件
with open(f'req_{sig}.csr', 'wb') as csr_file:
csr_file.write(csr_data)
print('\033[32m[+]\033[0m申请文件CSR接收成功')
# 在这里处理证书请求文件的数据
command = [
'openssl', 'x509',
'-req', '-CA', './rootCA.crt',
'-CAkey', 'rootCA.key',
'-CAcreateserial',
'-in', f'./req_{sig}.csr',
'-passin', 'pass:3xsh0re',
'-out', f'./req_{sig}.crt',
'-days', '365'
]
print("\033[32m[+]\033[0m正在查验申请者资质......")
print("\033[32m[+]\033[0m打印签发信息:")
try:
subprocess.run(command, stdout=subprocess.DEVNULL)
print(f"\033[32m[+]\033[0m来自{addr[0]}的申请者的证书签署完成")
except subprocess.CalledProcessError as e:
print("\033[31m[-]当前系统没有安装OpenSSL库\033[0m")

# 传送CRT
with open(f'./req_{sig}.crt', 'rb') as file:
while True:
data = file.read(1024)
if not data:
break
if conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0:
conn.sendall(data)
print('\033[32m[+]\033[0m证书发送完成!')
conn.close()
print('\033[32m[+]\033[0m本次签发结束!\n-------------------------------------------------------')


# CA提供根证书下载
def Download_rootCA():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as CA_Download_socket:
CA_Download_socket.bind(("0.0.0.0", CA_download_port))
CA_Download_socket.listen(1)
print('\033[32m[+]rootCA下载端口开放中...\033[0m')
while True:
conn, addr = CA_Download_socket.accept()
print(f'\033[32m[+]\033[0m自{addr[0]}的下载者已连接')
# 传送rootCA.crt
with open(f'./rootCA.crt', 'rb') as file:
while True:
conn.settimeout(2)
data = file.read(1024)
if not data:
break
if conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0:
conn.sendall(data)
print('\033[32m[+]\033[0mrootCA.crt发送完成!\n---------------------------------------------------')
conn.close()
pass


# Client请求签发证书
def Client_Request_Cert(username, passwd):
# 生成私钥
command = ['openssl', 'genrsa', '-des3', '-passout', f'pass:{passwd}', '-out', f'{username}_req.key', '2048']
subprocess.run(command)
# 生成证书请求文件CSR
subject_info = f"/C=CN/ST=Beijing/L=Haidian/O=USTB_{username}/OU=USTBer/CN=Client_{username}"
command2 = ['openssl', 'req', '-new', '-key', f'{username}_req.key', '-passin', f'pass:{passwd}', '-out',
f'{username}_req.csr', '-days', '365', '-subj', subject_info]
subprocess.run(command2, capture_output=True)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((CA_host, CA_port))
print('\033[32m[+]\033[0m已连接至CA服务器')
print('\033[32m[+]\033[0m正在向CA发送签发请求.....')

with open(f'./{username}_req.csr', 'rb') as file:
while True:
data = file.read(1024)
if not data:
break
client_socket.sendall(data)
print('\033[32m[+]\033[0mCSR文件发送完成!\n'
'\033[32m[+]\033[0m正在等待CA签发......')

crt_data = b''
while True:
data = client_socket.recv(1024)
if len(data) == 0:
break
crt_data += data
# 生成 req.crt 文件
with open(f'{username}_req.crt', 'wb') as csr_file:
csr_file.write(crt_data)
print(f'\033[32m[+]\033[0m证书{username}_req.crt制作完成,可在当前文件夹下查看')


# Client验证证书,若验证通过则在当前文件夹下生成服务端公钥
def Client_Verify():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((CA_host, CA_download_port))
crt_data = b''
while True:
data = client_socket.recv(1024)
if len(data) == 0:
break
crt_data += data
# 生成 req.crt 文件
with open('rootCA.crt', 'wb') as csr_file:
csr_file.write(crt_data)
print(f'\033[32m[+]\033[0mrootCA.crt下载完成,请在当前目录下查看')
verify_command = ['openssl', 'verify', '-CAfile', './rootCA.crt', f'Server_req.crt']
result = subprocess.run(verify_command, capture_output=True, text=True)
if "OK" in result.stdout.strip():
print("\033[32m[+]服务器证书验证成功!\033[0m")
gen_server_pk_command = ['openssl x509 -in Server_req.crt -pubkey -noout > server_pk.pem']
print("\033[32m[+]已经在当前文件夹下生成服务端公钥server_pk.pem!\033[0m")
return 1
else:
print("\033[31m[-]验证失败!!!\033[0m")
return 0


# Server请求签发证书
def Server_Request_Cert():
try:
# 生成私钥
command = ['openssl', 'genrsa', '-des3', '-passout', f'pass:USTBServer', '-out', f'Server_req.key', '2048']
subprocess.run(command, capture_output=True)
# 生成证书请求文件CSR
subject_info = f"/C=CN/ST=Beijing/L=Haidian/O=USTB_Server/OU=Server/CN=USTB_Server"
command2 = ['openssl', 'req', '-new', '-key', f'Server_req.key', '-passin', f'pass:USTBServer', '-out',
f'Server_req.csr', '-days', '365', '-subj', subject_info]
subprocess.run(command2, capture_output=True)
print('\033[32m[+]\033[0mCSR文件生成成功!')
except subprocess.CalledProcessError as e:
print("\033[31m[-]私钥生成失败!!!\033[0m")
print("\033[31m[-]查看当前系统是否安装OpenSSL库!!!\033[0m")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((CA_host, CA_port))
print('\033[32m[+]\033[0m已连接至CA服务器')
print('\033[32m[+]\033[0m正在向CA发送签发请求.....')

with open(f'./Server_req.csr', 'rb') as file:
while True:
data = file.read(1024)
if not data:
break
client_socket.sendall(data)
print('\033[32m[+]\033[0mCSR文件发送完成!\n'
'\033[32m[+]\033[0m正在等待CA签发......')

crt_data = b''
while True:
data = client_socket.recv(1024)
if len(data) == 0:
break
crt_data += data
# 生成 req.crt 文件
with open(f'Server_req.crt', 'wb') as csr_file:
csr_file.write(crt_data)
print(f'\033[32m[+]\033[0m证书Server_req.crt制作完成,可在当前文件夹下查看')

# Server 请求验证客户端证书
def Server_Verify(username):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((CA_host, CA_download_port))
crt_data = b''
while True:
data = client_socket.recv(1024)
if len(data) == 0:
break
crt_data += data
# 生成 req.crt 文件
with open('rootCA.crt', 'wb') as csr_file:
csr_file.write(crt_data)
print(f'\033[32m[+]\033[0mrootCA.crt下载完成,请在当前目录下查看')
verify_command = ['openssl', 'verify', '-CAfile', './rootCA.crt', f'{username}_req.crt']
result = subprocess.run(verify_command, capture_output=True, text=True)
if "OK" in result.stdout.strip():
print("\033[32m[+]目标客户端证书验证成功!\033[0m")
return 1
else:
print("\033[31m[-]验证失败!!!\033[0m")
return 0

# CA端
def CA():
print('\033[34m _ _ ____ _____ ____ ____ _ \033[0m\n'
'\033[34m| | | / ___|_ _| __ ) / ___| / \ \033[0m\n'
'\033[34m| | | \___ \ | | | _ \ | | / _ \ \033[0m\n'
'\033[34m| |_| |___) || | | |_) | | |___ / ___ \ \033[0m\n'
'\033[34m\___/ |____/ |_| |____/___\____/_/ \_\ \033[0m\n')
print("\t\t\t\t\033[34m-------created by 3xsh0re\033[0m")
root_ca_file = Path("rootCA.crt")
if root_ca_file.is_file():
print("\033[32m[+]rootCA.crt已经生成\033[0m")
else:
# 生成根证书
Gen_rootCA()
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交线程1的执行
thread1 = executor.submit(Download_rootCA)
# 提交线程2的执行
thread2 = executor.submit(Sign_Cert)
# 等待两个线程执行完成
concurrent.futures.wait([thread1, thread2])


# Client端请求CA颁发证书
Client_Request_Cert("3xsh0re", "123456")
# 用户验证Server
# Client_Verify()

# Server端请求CA颁发证书
# Server_Request_Cert()
# Server验证用户ZZR
Server_Verify("3xsh0re")

# CA端
# CA()

下面是笔者和几个小伙伴用数字认证原理改造的一个Python聊天室项目

基于CA和SSL改造的ChatRoom

关于SSL/TLS我后来写了更详细的分析,点击跳转


HTTPS中的数字认证原理
https://3xsh0re.github.io/2023/12/09/HTTPS中的数字认证原理/
作者
3xsh0re
发布于
2023年12月9日
许可协议