兼容openssl命令的AES-cbc对称加密模块封装

This commit is contained in:
chenzuoqing 2021-12-21 10:48:03 +08:00
parent bd20291004
commit 06c1d4f373
2 changed files with 110 additions and 0 deletions

View File

@ -6,4 +6,5 @@ flask-redis==0.4.0
Flask-RESTful==0.3.9
jsonschema==4.2.1
PyYAML==6.0
pycryptodome==3.12.0
requests==2.26.0

109
src/common/crypto.py Normal file
View File

@ -0,0 +1,109 @@
import io
import base64
from os import urandom
from hashlib import md5
from Crypto.Cipher import AES
"""pip3 install pycryptodome"""
class AESCipher:
"""
NOTE 这种方法现在已经不是很安全只是为了与 `openssl` 命令相兼容方便脚本解密
使用内存 BytesIO 写入加密解密内容模拟文件进行加密解密主要用到 AES-CBC 算法 salt
参考: https://stackoverflow.com/questions/16761458/how-to-decrypt-openssl-aes-encrypted-files-in-python
"""
block_size = AES.block_size
def __init__(self, content, password, salt_header='Salted__', key_length=32):
"""
注意: 若需要支持 `openssl` 命令必须指定 salt_header='Salted__'
如果只是 python 内部使用 salt_header 可以为空
"""
self.content = content
self.password = password
self.salt_header = salt_header
self.key_length = key_length
def derive_key_and_iv(self, salt):
"""计算得到 `key` 和 `iv` """
d = d_i = b''
while len(d) < self.key_length + self.block_size:
d_i = md5(d_i + str.encode(self.password) + salt).digest()
d += d_i
return d[:self.key_length], d[self.key_length:self.key_length + self.block_size]
def encrypt(self) -> bytes:
"""
返回的是加密后的 `base64` 二进制字符串bytes
对应 `openssl` 命令解法
从文件openssl aes-256-cbc -salt -in secret.txt -d -a -k 'password'
从输入echo "U2FsdGVkX1/5sFe6z6+H4CfQvnTZgCEV4yget0PI8XM=" | openssl aes-256-cbc -salt -d -a -k 'password'
"""
content = self.content.encode() if not isinstance(self.content, bytes) else self.content
# 字节IO模拟文件
in_file = io.BytesIO(content)
out_file = io.BytesIO()
salt = urandom(self.block_size - len(self.salt_header))
key, iv = self.derive_key_and_iv(salt)
cipher = AES.new(key, AES.MODE_CBC, iv)
out_file.write(str.encode(self.salt_header) + salt)
finished = False
while not finished:
chunk = in_file.read(1024 * self.block_size)
if len(chunk) == 0 or len(chunk) % self.block_size != 0:
padding_length = (self.block_size - len(chunk) % self.block_size) or self.block_size
chunk += str.encode(
padding_length * chr(padding_length))
finished = True
out_file.write(cipher.encrypt(chunk))
return base64.b64encode(out_file.getvalue())
def decrypt(self, content=None) -> bytes:
"""
密文 content 可以传入使用实例对象的密码返回解密后的明文 bytes
错误的密码salt_headerkey_size解密将报错
openssl 加密命令
从文件openssl aes-256-cbc -salt -in text.txt -a -k 'password'
从输入echo "abc" | openssl aes-256-cbc -salt -a -k 'password'
"""
content = content if content else self.content
content = content.encode() if not isinstance(content, bytes) else content
text = base64.b64decode(content)
# 字节IO模拟文件
in_file = io.BytesIO(text)
out_file = io.BytesIO()
salt = in_file.read(self.block_size)[len(self.salt_header):]
key, iv = self.derive_key_and_iv(salt)
cipher = AES.new(key, AES.MODE_CBC, iv)
next_chunk = b''
finished = False
while not finished:
chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * self.block_size))
if len(next_chunk) == 0:
padding_length = chunk[-1]
if padding_length < 1 or padding_length > self.block_size:
# 触发此错误原因可能是密码错误
raise ValueError("bad decrypt pad (%d)" % padding_length)
chunk = chunk[:-padding_length]
finished = True
out_file.write(chunk)
# 若为空可能是密码错误了,或者触发上面的异常
return out_file.getvalue()
if __name__ == '__main__':
print("++++++++++++++++++++++++++++++++++++++++++")
aes = AESCipher("hello world2223", '111111')
secret = aes.encrypt()
raw = aes.decrypt(secret)
print("密文:", secret)
print("明文:", raw)
print("++++++++++++++++++++++++++++++++++++++++++")
try:
# 错误的密码解密将报错
aes2 = AESCipher("", '222', salt_header='')
res = aes2.decrypt(secret)
print(res)
except ValueError:
print("密码错了!")