1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
| import subprocess import socket import concurrent.futures from pathlib import Path
CA_host = "x.x.x.x" CA_port = 54321 CA_download_port = 12345 cert_num = 0
def Gen_rootCA(): subprocess.run(['openssl', 'genrsa', '-des3', '-passout', 'pass:3xsh0re', '-out', 'rootCA.key', '2048']) subject_info = "/C=CN/ST=Beijing/L=Haidian/O=USTB/OU=USTB_CA/CN=USTB.CA" subprocess.run( ['openssl', 'req', '-new', '-x509', '-passin', 'pass:3xsh0re', '-key', 'rootCA.key', '-days', '365', '-out', 'rootCA.crt', '-subj', subject_info], capture_output=True)
print("\033[32m[+]\033[0m自签名证书生成完成")
def Sign_Cert(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as CA_socket: CA_socket.bind(("0.0.0.0", CA_port)) CA_socket.listen(1) print('\033[32m[+]\033[0m等待客户端连接...') while True: conn, addr = CA_socket.accept() print(f'\033[32m[+]\033[0m自{addr[0]}的申请者已连接')
global cert_num sig = f'{addr[0].replace(".", "")}_{addr[1]}_{cert_num}' cert_num += 1 with conn: conn.settimeout(2) csr_data = b'' while True: try: data = conn.recv(1024) if len(data) == 0: break csr_data += data pass except socket.timeout: break with open(f'req_{sig}.csr', 'wb') as csr_file: csr_file.write(csr_data) print('\033[32m[+]\033[0m申请文件CSR接收成功') command = [ 'openssl', 'x509', '-req', '-CA', './rootCA.crt', '-CAkey', 'rootCA.key', '-CAcreateserial', '-in', f'./req_{sig}.csr', '-passin', 'pass:3xsh0re', '-out', f'./req_{sig}.crt', '-days', '365' ] print("\033[32m[+]\033[0m正在查验申请者资质......") print("\033[32m[+]\033[0m打印签发信息:") try: subprocess.run(command, stdout=subprocess.DEVNULL) print(f"\033[32m[+]\033[0m来自{addr[0]}的申请者的证书签署完成") except subprocess.CalledProcessError as e: print("\033[31m[-]当前系统没有安装OpenSSL库\033[0m")
with open(f'./req_{sig}.crt', 'rb') as file: while True: data = file.read(1024) if not data: break if conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0: conn.sendall(data) print('\033[32m[+]\033[0m证书发送完成!') conn.close() print('\033[32m[+]\033[0m本次签发结束!\n-------------------------------------------------------')
def Download_rootCA(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as CA_Download_socket: CA_Download_socket.bind(("0.0.0.0", CA_download_port)) CA_Download_socket.listen(1) print('\033[32m[+]rootCA下载端口开放中...\033[0m') while True: conn, addr = CA_Download_socket.accept() print(f'\033[32m[+]\033[0m自{addr[0]}的下载者已连接') with open(f'./rootCA.crt', 'rb') as file: while True: conn.settimeout(2) data = file.read(1024) if not data: break if conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0: conn.sendall(data) print('\033[32m[+]\033[0mrootCA.crt发送完成!\n---------------------------------------------------') conn.close() pass
def Client_Request_Cert(username, passwd): command = ['openssl', 'genrsa', '-des3', '-passout', f'pass:{passwd}', '-out', f'{username}_req.key', '2048'] subprocess.run(command) subject_info = f"/C=CN/ST=Beijing/L=Haidian/O=USTB_{username}/OU=USTBer/CN=Client_{username}" command2 = ['openssl', 'req', '-new', '-key', f'{username}_req.key', '-passin', f'pass:{passwd}', '-out', f'{username}_req.csr', '-days', '365', '-subj', subject_info] subprocess.run(command2, capture_output=True)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((CA_host, CA_port)) print('\033[32m[+]\033[0m已连接至CA服务器') print('\033[32m[+]\033[0m正在向CA发送签发请求.....')
with open(f'./{username}_req.csr', 'rb') as file: while True: data = file.read(1024) if not data: break client_socket.sendall(data) print('\033[32m[+]\033[0mCSR文件发送完成!\n' '\033[32m[+]\033[0m正在等待CA签发......')
crt_data = b'' while True: data = client_socket.recv(1024) if len(data) == 0: break crt_data += data with open(f'{username}_req.crt', 'wb') as csr_file: csr_file.write(crt_data) print(f'\033[32m[+]\033[0m证书{username}_req.crt制作完成,可在当前文件夹下查看')
def Client_Verify(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((CA_host, CA_download_port)) crt_data = b'' while True: data = client_socket.recv(1024) if len(data) == 0: break crt_data += data with open('rootCA.crt', 'wb') as csr_file: csr_file.write(crt_data) print(f'\033[32m[+]\033[0mrootCA.crt下载完成,请在当前目录下查看') verify_command = ['openssl', 'verify', '-CAfile', './rootCA.crt', f'Server_req.crt'] result = subprocess.run(verify_command, capture_output=True, text=True) if "OK" in result.stdout.strip(): print("\033[32m[+]服务器证书验证成功!\033[0m") gen_server_pk_command = ['openssl x509 -in Server_req.crt -pubkey -noout > server_pk.pem'] print("\033[32m[+]已经在当前文件夹下生成服务端公钥server_pk.pem!\033[0m") return 1 else: print("\033[31m[-]验证失败!!!\033[0m") return 0
def Server_Request_Cert(): try: command = ['openssl', 'genrsa', '-des3', '-passout', f'pass:USTBServer', '-out', f'Server_req.key', '2048'] subprocess.run(command, capture_output=True) subject_info = f"/C=CN/ST=Beijing/L=Haidian/O=USTB_Server/OU=Server/CN=USTB_Server" command2 = ['openssl', 'req', '-new', '-key', f'Server_req.key', '-passin', f'pass:USTBServer', '-out', f'Server_req.csr', '-days', '365', '-subj', subject_info] subprocess.run(command2, capture_output=True) print('\033[32m[+]\033[0mCSR文件生成成功!') except subprocess.CalledProcessError as e: print("\033[31m[-]私钥生成失败!!!\033[0m") print("\033[31m[-]查看当前系统是否安装OpenSSL库!!!\033[0m") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((CA_host, CA_port)) print('\033[32m[+]\033[0m已连接至CA服务器') print('\033[32m[+]\033[0m正在向CA发送签发请求.....')
with open(f'./Server_req.csr', 'rb') as file: while True: data = file.read(1024) if not data: break client_socket.sendall(data) print('\033[32m[+]\033[0mCSR文件发送完成!\n' '\033[32m[+]\033[0m正在等待CA签发......')
crt_data = b'' while True: data = client_socket.recv(1024) if len(data) == 0: break crt_data += data with open(f'Server_req.crt', 'wb') as csr_file: csr_file.write(crt_data) print(f'\033[32m[+]\033[0m证书Server_req.crt制作完成,可在当前文件夹下查看')
def Server_Verify(username): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((CA_host, CA_download_port)) crt_data = b'' while True: data = client_socket.recv(1024) if len(data) == 0: break crt_data += data with open('rootCA.crt', 'wb') as csr_file: csr_file.write(crt_data) print(f'\033[32m[+]\033[0mrootCA.crt下载完成,请在当前目录下查看') verify_command = ['openssl', 'verify', '-CAfile', './rootCA.crt', f'{username}_req.crt'] result = subprocess.run(verify_command, capture_output=True, text=True) if "OK" in result.stdout.strip(): print("\033[32m[+]目标客户端证书验证成功!\033[0m") return 1 else: print("\033[31m[-]验证失败!!!\033[0m") return 0
def CA(): print('\033[34m _ _ ____ _____ ____ ____ _ \033[0m\n' '\033[34m| | | / ___|_ _| __ ) / ___| / \ \033[0m\n' '\033[34m| | | \___ \ | | | _ \ | | / _ \ \033[0m\n' '\033[34m| |_| |___) || | | |_) | | |___ / ___ \ \033[0m\n' '\033[34m\___/ |____/ |_| |____/___\____/_/ \_\ \033[0m\n') print("\t\t\t\t\033[34m-------created by 3xsh0re\033[0m") root_ca_file = Path("rootCA.crt") if root_ca_file.is_file(): print("\033[32m[+]rootCA.crt已经生成\033[0m") else: Gen_rootCA() with concurrent.futures.ThreadPoolExecutor() as executor: thread1 = executor.submit(Download_rootCA) thread2 = executor.submit(Sign_Cert) concurrent.futures.wait([thread1, thread2])
Client_Request_Cert("3xsh0re", "123456")
Server_Verify("3xsh0re")
|