使用codecs自定義編/解碼方案
相信很多使用Python的同學熟悉編解碼,比如:
In : print u'\U0001F3F4'.encode('utf-8')
?
In : '哈哈'.decode('utf8')
Out: u'\u54c8\u54c8'
這樣可以在字符串和unicode之前轉換,不過細心的同學可能發現了,我使用了「utf-8」和「utf8」,這2個詞長得很像。事實上都能正常使用是由于他們都是「utf_8」的別名,這些別名的對應關系可以用如下方法找到:
In : import encodings
In : encodings.aliases.aliases['utf8']
Out: 'utf_8'
In : '哈哈'.decode('u8')
Out: u'\u54c8\u54c8'
In : '哈哈'.decode('utf')
Out: u'\u54c8\u54c8'
In : '哈哈'.decode('utf8_ucs2')
Out: u'\u54c8\u54c8'
In : '哈哈'.decode('utf8_ucs4')
Out: u'\u54c8\u54c8'
encodings是標準庫中自帶的編碼庫,其中包含了上百個標準的編碼轉換方案。但是通常并不需要使用encodings,而是使用codecs。
codecs包含了編碼解碼器的注冊和其他基本的類,開發者還可以通過codecs提供的接口自定義編/解碼方案,也就是可以創造一個新的編解碼轉換方案,使用encode(‘XX’)和decode(‘XX’)的方式使用。今天我給大家演示直接進行Fernet對稱加密的例子。
互聯網安全的重要性不必在復述了,大家都應該接觸過一些加密技術,可能聽過M2Crypto、PyCrypto、Cryptography之類的庫。在這里歪個樓,現在的主流是使用Cryptography,它的出現就是為了替代之前的那些庫,具體的可以看官方文檔。我們使用Cryptography提供的Fernet類來實現,首先實現一個Codec類:
import codecs
from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken
KEY = Fernet.generate_key()
f = Fernet(KEY)
classFernetCodec(codecs.Codec):
defencode(self, input, errors='fernet.strict'):
return f.encrypt(input), len(input)
defdecode(self, input, errors='fernet.strict'):
try:
return f.decrypt(input), len(input)
except InvalidToken:
error = codecs.lookup_error(errors)
return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
'Invalid Token'))
當然也不必在類中實現encode和decode方法,單獨的2個函數也可以。我這里是為了給之后的演示到的類復用。
如果你看過內置的字符串encode的方法,它的文檔說還接收第二個參數,讓你告訴它當出現出錯的時候如何去處理,默認是strict,直接就會拋出來錯誤。
其余可選的還有ignore、replace、xmlcharrefreplace:
In [35]: '\x80abc'.decode('utf-8', 'strict')
---------------------------------------------------------------------------
UnicodeDecodeError Traceback (most recent call last)
<ipython-input-35-974ebc908d50> in <module>()
----> 1 '\x80abc'.decode('utf-8', 'strict')
/Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)
14
15 def decode(input, errors='strict'):
---> 16 return codecs.utf_8_decode(input, errors, True)
17
18 class IncrementalEncoder(codecs.IncrementalEncoder):
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
In [36]: '\x80abc'.decode('utf-8', 'replace')
Out[36]: u'\uffabc'
In [37]: '\x80abc'.decode('utf-8', 'ignore')
Out[37]: u'abc'
事實上Python還內置了其他的選項,如backslashreplace、namereplace、surrogatepass、surrogateescape等,有興趣的可以看 源碼實現 .
我也會定義2種錯誤函數,因為在解密(執行decrypt)的時候可能會報InvalidToken錯誤,但是InvalidToken不包含任何參數,而對錯誤處理的時候需要知道起始和結束的位置,所以我就直接拋一個UnicodeDecodeError錯誤了。
接著我們定義遞增式和流式的編碼類:
class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
pass
class IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
pass
class StreamReader(FernetCodec, codecs.StreamReader):
pass
class StreamWriter(FernetCodec, codecs.StreamWriter):
pass
由于這個要加密的字符串不會很長,沒有必要實現這些類,我就簡單的繼承然后pass了。接著開開放入口:
defgetregentry():
return codecs.CodecInfo(
name='fernet',
encode=FernetCodec().encode,
decode=FernetCodec().decode,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
streamwriter=StreamWriter,
streamreader=StreamReader,
)
其實incrementalencoder、streamwriter這些參數不傳遞也是可以的,默認是None,今天只是為了讓大家知道是有這部分接口的。然后是注冊這個入口,為了提供更好的性能,我創建一個函數加上緩存功能:
_cache = {}
_unknown = '--unknown--'
defsearch_function(encoding):
import encodings
encoding = encodings.normalize_encoding(encoding)
entry = _cache.get(encoding, _unknown)
if entry is not _unknown:
return entry
if encoding == 'fernet':
entry = getregentry()
_cache[encoding] = entry
return entry
codecs.register(search_function)
這里簡單介紹下normalize_encoding的意義,之前我說到了別名,我們再感受下:
In : u'哈哈'.encode('utf-8')
Out: '\xe5\x93\x88\xe5\x93\x88'
In : u'哈哈'.encode('utf_8')
Out: '\xe5\x93\x88\xe5\x93\x88'
這種中劃線和下劃線最后無差別對待,就是通過這個函數標準化的。
codecs模塊底層維護了一個搜索函數的列表,通過調用codecs.register方法就把上述函數append進去了。
最后我們注冊2個錯誤處理的函數:
defstrict_errors(exc):
if isinstance(exc, UnicodeDecodeError):
raise TypeError('Invalid Token')
deffallback_errors(exc):
s = []
for c in exc.object[exc.start:exc.end]: # 只是為了演示提供的接口,實施上就是返回原來的input
s.append(c)
return ''.join(s), exc.end
codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)
默認使用的是fernet.strict這種處理方案,也可以使用fallback模式。ok,我們現在感受一下:
In [1]: import fernet
In [2]: input = 'hah'
In [3]: output = input.encode('fernet')
In [4]: output # 已經是加密后的結果了
Out[4]: 'gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA=='
In [5]: output.decode('fernet') # 解密后還原成原來的字符串
Out[5]: 'hah'
In [6]: input.decode('fernet')
'fernet' codec can't decode bytes in position 0-3: Invalid Token
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-6-1c20dc246c52> in <module>()
----> 1 input.decode('fernet')
/Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)
20 error = codecs.lookup_error(errors)
21 return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
---> 22 'Invalid Token'))
23
24
/Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)
68 print exc
69 if isinstance(exc, UnicodeDecodeError):
---> 70 raise TypeError('Invalid Token')
71
72
TypeError: Invalid Token # 拋了個自定義錯誤
In [7]: input.decode('fernet', 'fernet.fallback')
Out[7]: 'hah' # 解密不成功,返回原來的字符串
好了,自定義的加解密方案完成了,整理全部的代碼如下:
import codecs
from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken
KEY = Fernet.generate_key()
f = Fernet(KEY)
_cache = {}
_unknown = '--unknown--'
classFernetCodec(codecs.Codec):
defencode(self, input, errors='fernet.strict'):
return f.encrypt(input), len(input)
defdecode(self, input, errors='fernet.strict'):
try:
return f.decrypt(input), len(input)
except InvalidToken:
error = codecs.lookup_error(errors)
return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
'Invalid Token'))
classIncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
pass
classIncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
pass
classStreamReader(FernetCodec, codecs.StreamReader):
pass
classStreamWriter(FernetCodec, codecs.StreamWriter):
pass
defgetregentry():
return codecs.CodecInfo(
name='fernet',
encode=FernetCodec().encode,
decode=FernetCodec().decode,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
streamwriter=StreamWriter,
streamreader=StreamReader,
)
defsearch_function(encoding):
import encodings
encoding = encodings.normalize_encoding(encoding)
entry = _cache.get(encoding, _unknown)
if entry is not _unknown:
return entry
if encoding == 'fernet':
entry = getregentry()
_cache[encoding] = entry
return entry
defstrict_errors(exc):
if isinstance(exc, UnicodeDecodeError):
raise TypeError('Invalid Token')
deffallback_errors(exc):
s = []
for c in exc.object[exc.start:exc.end]:
s.append(c)
return ''.join(s), exc.end
codecs.register(search_function)
codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)
來自:http://www.dongwm.com/archives/使用codecs自定義編-解碼方案/