使用codecs自定義編/解碼方案

BorisGoldie 7年前發布 | 16K 次閱讀 加密解密 Python Python開發

相信很多使用Python的同學熟悉編解碼,比如:

In : print u'\U0001F3F4'.encode('utf-8')
?
In : '哈哈'.decode('utf8')
Out: u'\u54c8\u54c8'

這樣可以在字符串和unicode之前轉換,不過細心的同學可能發現了,我使用了「utf-8」和「utf8」,這2個詞長得很像。事實上都能正常使用是由于他們都是「utf_8」的別名,這些別名的對應關系可以用如下方法找到:

In : import encodings

In : encodings.aliases.aliases['utf8']
Out: 'utf_8'

In : '哈哈'.decode('u8')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf8_ucs2')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf8_ucs4')
Out: u'\u54c8\u54c8'

encodings是標準庫中自帶的編碼庫,其中包含了上百個標準的編碼轉換方案。但是通常并不需要使用encodings,而是使用codecs。

codecs包含了編碼解碼器的注冊和其他基本的類,開發者還可以通過codecs提供的接口自定義編/解碼方案,也就是可以創造一個新的編解碼轉換方案,使用encode(‘XX’)和decode(‘XX’)的方式使用。今天我給大家演示直接進行Fernet對稱加密的例子。

互聯網安全的重要性不必在復述了,大家都應該接觸過一些加密技術,可能聽過M2Crypto、PyCrypto、Cryptography之類的庫。在這里歪個樓,現在的主流是使用Cryptography,它的出現就是為了替代之前的那些庫,具體的可以看官方文檔。我們使用Cryptography提供的Fernet類來實現,首先實現一個Codec類:

import codecs

from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken

KEY = Fernet.generate_key()
f = Fernet(KEY)


classFernetCodec(codecs.Codec):
    defencode(self, input, errors='fernet.strict'):
        return f.encrypt(input), len(input)

    defdecode(self, input, errors='fernet.strict'):
        try:
            return f.decrypt(input), len(input)
        except InvalidToken:
            error = codecs.lookup_error(errors)
            return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
                                            'Invalid Token'))

當然也不必在類中實現encode和decode方法,單獨的2個函數也可以。我這里是為了給之后的演示到的類復用。

如果你看過內置的字符串encode的方法,它的文檔說還接收第二個參數,讓你告訴它當出現出錯的時候如何去處理,默認是strict,直接就會拋出來錯誤。

其余可選的還有ignore、replace、xmlcharrefreplace:

In [35]: '\x80abc'.decode('utf-8', 'strict')
---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
<ipython-input-35-974ebc908d50> in <module>()
----> 1 '\x80abc'.decode('utf-8', 'strict')

/Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)
     14
     15 def decode(input, errors='strict'):
---> 16     return codecs.utf_8_decode(input, errors, True)
     17
     18 class IncrementalEncoder(codecs.IncrementalEncoder):

UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte

In [36]: '\x80abc'.decode('utf-8', 'replace')
Out[36]: u'\uffabc'

In [37]: '\x80abc'.decode('utf-8', 'ignore')
Out[37]: u'abc'

事實上Python還內置了其他的選項,如backslashreplace、namereplace、surrogatepass、surrogateescape等,有興趣的可以看 源碼實現 .

我也會定義2種錯誤函數,因為在解密(執行decrypt)的時候可能會報InvalidToken錯誤,但是InvalidToken不包含任何參數,而對錯誤處理的時候需要知道起始和結束的位置,所以我就直接拋一個UnicodeDecodeError錯誤了。

接著我們定義遞增式和流式的編碼類:

class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
    pass


class IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
    pass


class StreamReader(FernetCodec, codecs.StreamReader):
    pass


class StreamWriter(FernetCodec, codecs.StreamWriter):
    pass

由于這個要加密的字符串不會很長,沒有必要實現這些類,我就簡單的繼承然后pass了。接著開開放入口:

defgetregentry():
    return codecs.CodecInfo(
        name='fernet',
        encode=FernetCodec().encode,
        decode=FernetCodec().decode,
        incrementalencoder=IncrementalEncoder,
        incrementaldecoder=IncrementalDecoder,
        streamwriter=StreamWriter,
        streamreader=StreamReader,
    )

其實incrementalencoder、streamwriter這些參數不傳遞也是可以的,默認是None,今天只是為了讓大家知道是有這部分接口的。然后是注冊這個入口,為了提供更好的性能,我創建一個函數加上緩存功能:

_cache = {}
_unknown = '--unknown--'

defsearch_function(encoding):
    import encodings
    encoding = encodings.normalize_encoding(encoding)
    entry = _cache.get(encoding, _unknown)
    if entry is not _unknown:
        return entry

    if encoding == 'fernet':
        entry = getregentry()
        _cache[encoding] = entry
        return entry


codecs.register(search_function)

這里簡單介紹下normalize_encoding的意義,之前我說到了別名,我們再感受下:

In : u'哈哈'.encode('utf-8')
Out: '\xe5\x93\x88\xe5\x93\x88'

In : u'哈哈'.encode('utf_8')
Out: '\xe5\x93\x88\xe5\x93\x88'

這種中劃線和下劃線最后無差別對待,就是通過這個函數標準化的。

codecs模塊底層維護了一個搜索函數的列表,通過調用codecs.register方法就把上述函數append進去了。

最后我們注冊2個錯誤處理的函數:

defstrict_errors(exc):
    if isinstance(exc, UnicodeDecodeError):
        raise TypeError('Invalid Token')


deffallback_errors(exc):
    s = []
    for c in exc.object[exc.start:exc.end]:  # 只是為了演示提供的接口,實施上就是返回原來的input
        s.append(c)
    return ''.join(s), exc.end

codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)

默認使用的是fernet.strict這種處理方案,也可以使用fallback模式。ok,我們現在感受一下:

In [1]: import fernet

In [2]: input = 'hah'

In [3]: output = input.encode('fernet')

In [4]: output  # 已經是加密后的結果了
Out[4]: 'gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA=='

In [5]: output.decode('fernet')  # 解密后還原成原來的字符串
Out[5]: 'hah'

In [6]: input.decode('fernet')
'fernet' codec can't decode bytes in position 0-3: Invalid Token
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-6-1c20dc246c52> in <module>()
----> 1 input.decode('fernet')

/Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)
     20             error = codecs.lookup_error(errors)
     21             return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
---> 22                                             'Invalid Token'))
     23
     24

/Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)
     68     print exc
     69     if isinstance(exc, UnicodeDecodeError):
---> 70         raise TypeError('Invalid Token')
     71
     72

TypeError: Invalid Token  # 拋了個自定義錯誤

In [7]: input.decode('fernet', 'fernet.fallback')
Out[7]: 'hah'  # 解密不成功,返回原來的字符串

好了,自定義的加解密方案完成了,整理全部的代碼如下:

import codecs

from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken

KEY = Fernet.generate_key()
f = Fernet(KEY)
_cache = {}
_unknown = '--unknown--'


classFernetCodec(codecs.Codec):
    defencode(self, input, errors='fernet.strict'):
        return f.encrypt(input), len(input)

    defdecode(self, input, errors='fernet.strict'):
        try:
            return f.decrypt(input), len(input)
        except InvalidToken:
            error = codecs.lookup_error(errors)
            return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
                                            'Invalid Token'))


classIncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
    pass


classIncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
    pass


classStreamReader(FernetCodec, codecs.StreamReader):
    pass


classStreamWriter(FernetCodec, codecs.StreamWriter):
    pass



defgetregentry():
    return codecs.CodecInfo(
        name='fernet',
        encode=FernetCodec().encode,
        decode=FernetCodec().decode,
        incrementalencoder=IncrementalEncoder,
        incrementaldecoder=IncrementalDecoder,
        streamwriter=StreamWriter,
        streamreader=StreamReader,
    )


defsearch_function(encoding):
    import encodings
    encoding = encodings.normalize_encoding(encoding)
    entry = _cache.get(encoding, _unknown)
    if entry is not _unknown:
        return entry

    if encoding == 'fernet':
        entry = getregentry()
        _cache[encoding] = entry
        return entry


defstrict_errors(exc):
    if isinstance(exc, UnicodeDecodeError):
        raise TypeError('Invalid Token')


deffallback_errors(exc):
    s = []
    for c in exc.object[exc.start:exc.end]:
        s.append(c)
    return ''.join(s), exc.end


codecs.register(search_function)
codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)

 

來自:http://www.dongwm.com/archives/使用codecs自定義編-解碼方案/

 

 本文由用戶 BorisGoldie 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!