From 06c1d4f373e395081a2acefc4b4e0cfb4450aa97 Mon Sep 17 00:00:00 2001 From: chenzuoqing Date: Tue, 21 Dec 2021 10:48:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9openssl=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E7=9A=84AES-cbc=E5=AF=B9=E7=A7=B0=E5=8A=A0=E5=AF=86=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/requirements.txt | 1 + src/common/crypto.py | 109 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 src/common/crypto.py diff --git a/docs/requirements.txt b/docs/requirements.txt index 4fd040b..dd2b464 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -6,4 +6,5 @@ flask-redis==0.4.0 Flask-RESTful==0.3.9 jsonschema==4.2.1 PyYAML==6.0 +pycryptodome==3.12.0 requests==2.26.0 diff --git a/src/common/crypto.py b/src/common/crypto.py new file mode 100644 index 0000000..23f416b --- /dev/null +++ b/src/common/crypto.py @@ -0,0 +1,109 @@ +import io +import base64 +from os import urandom +from hashlib import md5 +from Crypto.Cipher import AES + +"""pip3 install pycryptodome""" + + +class AESCipher: + """ + NOTE 这种方法现在已经不是很安全,只是为了与 `openssl` 命令相兼容,方便脚本解密 + 使用内存 BytesIO 写入加密、解密内容,模拟文件进行加密、解密,主要用到 AES-CBC 算法,加 salt + 参考: https://stackoverflow.com/questions/16761458/how-to-decrypt-openssl-aes-encrypted-files-in-python + """ + block_size = AES.block_size + + def __init__(self, content, password, salt_header='Salted__', key_length=32): + """ + 注意: 若需要支持 `openssl` 命令,必须指定 salt_header='Salted__' + 如果只是 python 内部使用 salt_header 可以为空 + """ + self.content = content + self.password = password + self.salt_header = salt_header + self.key_length = key_length + + def derive_key_and_iv(self, salt): + """计算得到 `key` 和 `iv` """ + d = d_i = b'' + while len(d) < self.key_length + self.block_size: + d_i = md5(d_i + str.encode(self.password) + salt).digest() + d += d_i + return d[:self.key_length], d[self.key_length:self.key_length + self.block_size] + + def encrypt(self) -> bytes: + """ + 返回的是加密后的 `base64` 二进制字符串,bytes + 对应 `openssl` 命令解法: + 从文件:openssl aes-256-cbc -salt -in secret.txt -d -a -k 'password' + 从输入:echo "U2FsdGVkX1/5sFe6z6+H4CfQvnTZgCEV4yget0PI8XM=" | openssl aes-256-cbc -salt -d -a -k 'password' + """ + content = self.content.encode() if not isinstance(self.content, bytes) else self.content + # 字节IO模拟文件 + in_file = io.BytesIO(content) + out_file = io.BytesIO() + salt = urandom(self.block_size - len(self.salt_header)) + key, iv = self.derive_key_and_iv(salt) + cipher = AES.new(key, AES.MODE_CBC, iv) + out_file.write(str.encode(self.salt_header) + salt) + finished = False + while not finished: + chunk = in_file.read(1024 * self.block_size) + if len(chunk) == 0 or len(chunk) % self.block_size != 0: + padding_length = (self.block_size - len(chunk) % self.block_size) or self.block_size + chunk += str.encode( + padding_length * chr(padding_length)) + finished = True + out_file.write(cipher.encrypt(chunk)) + return base64.b64encode(out_file.getvalue()) + + def decrypt(self, content=None) -> bytes: + """ + 密文 content 可以传入,使用实例对象的密码,返回解密后的明文 bytes + 错误的密码、salt_header、key_size解密将报错 + openssl 加密命令 + 从文件:openssl aes-256-cbc -salt -in text.txt -a -k 'password' + 从输入:echo "abc" | openssl aes-256-cbc -salt -a -k 'password' + """ + content = content if content else self.content + content = content.encode() if not isinstance(content, bytes) else content + text = base64.b64decode(content) + # 字节IO模拟文件 + in_file = io.BytesIO(text) + out_file = io.BytesIO() + salt = in_file.read(self.block_size)[len(self.salt_header):] + key, iv = self.derive_key_and_iv(salt) + cipher = AES.new(key, AES.MODE_CBC, iv) + next_chunk = b'' + finished = False + while not finished: + chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * self.block_size)) + if len(next_chunk) == 0: + padding_length = chunk[-1] + if padding_length < 1 or padding_length > self.block_size: + # 触发此错误原因可能是密码错误 + raise ValueError("bad decrypt pad (%d)" % padding_length) + chunk = chunk[:-padding_length] + finished = True + out_file.write(chunk) + # 若为空可能是密码错误了,或者触发上面的异常 + return out_file.getvalue() + + +if __name__ == '__main__': + print("++++++++++++++++++++++++++++++++++++++++++") + aes = AESCipher("hello world2223", '111111') + secret = aes.encrypt() + raw = aes.decrypt(secret) + print("密文:", secret) + print("明文:", raw) + print("++++++++++++++++++++++++++++++++++++++++++") + try: + # 错误的密码解密将报错 + aes2 = AESCipher("", '222', salt_header='') + res = aes2.decrypt(secret) + print(res) + except ValueError: + print("密码错了!")