- Timestamp:
- 02/02/09 17:25:51 (3 years ago)
- Location:
- elixir/trunk
- Files:
-
- 2 modified
-
elixir/ext/encrypted.py (modified) (3 diffs)
-
tests/test_encryption.py (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
elixir/trunk/elixir/ext/encrypted.py
r440 r446 25 25 ssn columns on save, update, and load. Different secrets can be specified on 26 26 an entity by entity basis, for added security. 27 28 **Important note**: instance attributes are encrypted in-place. This means that 29 if one of the encrypted attributes of an instance is accessed after the 30 instance has been flushed to the database (and thus encrypted), the value for 31 that attribute will be crypted in the in-memory object in addition to the 32 database row. 27 33 ''' 28 34 29 from Crypto.Cipher import Blowfish30 from elixir.statements import Statement31 from sqlalchemy.orm import MapperExtension, EXT_CONTINUE35 from Crypto.Cipher import Blowfish 36 from elixir.statements import Statement 37 from sqlalchemy.orm import MapperExtension, EXT_CONTINUE, EXT_STOP 32 38 33 39 try: … … 62 68 def __init__(self, entity, for_fields=[], with_secret='abcdef'): 63 69 64 def perform_encryption(instance, decrypt=False): 70 def perform_encryption(instance, encrypt=True): 71 encrypted = getattr(instance, '_elixir_encrypted', None) 72 if encrypted is encrypt: 73 # skipping encryption or decryption, as it is already done 74 return 75 else: 76 # marking instance as already encrypted/decrypted 77 instance._elixir_encrypted = encrypt 78 79 if encrypt: 80 func = encrypt_value 81 else: 82 func = decrypt_value 83 65 84 for column_name in for_fields: 66 85 current_value = getattr(instance, column_name) 67 86 if current_value: 68 if decrypt: 69 new_value = decrypt_value(current_value, with_secret) 70 else: 71 new_value = encrypt_value(current_value, with_secret) 72 setattr(instance, column_name, new_value) 87 setattr(instance, column_name, 88 func(current_value, with_secret)) 73 89 74 90 def perform_decryption(instance): 75 perform_encryption(instance, decrypt=True)91 perform_encryption(instance, encrypt=False) 76 92 77 93 class EncryptedMapperExtension(MapperExtension): … … 85 101 return EXT_CONTINUE 86 102 87 if SA05orlater: 88 def reconstruct_instance(self, mapper, instance): 89 perform_decryption(instance) 90 return True 91 EncryptedMapperExtension.reconstruct_instance = reconstruct_instance 92 else: 93 def populate_instance(self, mapper, selectcontext, row, instance, 94 *args, **kwargs): 95 mapper.populate_instance(selectcontext, instance, row, 96 *args, **kwargs) 97 perform_decryption(instance) 98 return True 99 EncryptedMapperExtension.populate_instance = populate_instance 103 if SA05orlater: 104 def reconstruct_instance(self, mapper, instance): 105 perform_decryption(instance) 106 # no special return value is required for 107 # reconstruct_instance, but you never know... 108 return EXT_CONTINUE 109 else: 110 def populate_instance(self, mapper, selectcontext, row, 111 instance, *args, **kwargs): 112 mapper.populate_instance(selectcontext, instance, row, 113 *args, **kwargs) 114 perform_decryption(instance) 115 # EXT_STOP because we already did populate the instance and 116 # the normal processing should not happen 117 return EXT_STOP 100 118 101 119 # make sure that the entity's mapper has our mapper extension -
elixir/trunk/tests/test_encryption.py
r349 r446 21 21 22 22 metadata.bind = 'sqlite:///' 23 setup_all() 23 24 24 25 … … 29 30 class TestEncryption(object): 30 31 def setup(self): 31 setup_all(True)32 create_all() 32 33 33 34 def teardown(self): 34 35 drop_all() 35 session.cl ear()36 session.close() 36 37 37 38 def test_encryption(self): 38 39 jonathan = Person( 39 name= u'Jonathan LaCour',40 password= u's3cr3tw0RD',41 ssn= u'123-45-6789'40 name='Jonathan LaCour', 41 password='s3cr3tw0RD', 42 ssn='123-45-6789' 42 43 ) 43 44 winston = Pet( … … 51 52 jonathan.pets = [winston, nelson] 52 53 53 session.commit(); session.clear() 54 session.commit() 55 session.clear() 54 56 55 57 p = Person.get_by(name='Jonathan LaCour') … … 63 65 p.password = 'N3wpAzzw0rd' 64 66 65 session.commit(); session.clear() 67 session.commit() 68 session.clear() 66 69 67 70 p = Person.get_by(name='Jonathan LaCour') 68 71 assert p.password == 'N3wpAzzw0rd' 69 p.name = 'Jon LaCour'70 72 71 session.commit(); session.clear() 73 def test_two_consecutive_updates(self): 74 jonathan = Person( 75 name='Jonathan LaCour', 76 password='s3cr3tw0RD', 77 ssn='123-45-6789' 78 ) 79 session.commit() 80 session.clear() 81 82 p = Person.get_by(name='Jonathan LaCour') 83 assert p.password == 's3cr3tw0RD' 84 p.name = 'JONATHAN LACOUR' 85 session.flush() 86 87 assert p.password == 'r\\x9d\\xa8\\xb4\\x8d|\\xffp\\xf5\\x0e' 88 89 p.name = 'Jonathan LaCour' 90 session.flush() 91 92 # check that it is not further encrypted 93 assert p.password == 'r\\x9d\\xa8\\xb4\\x8d|\\xffp\\xf5\\x0e' 94 95 session.commit() 96 session.clear() 97 98 p = Person.get_by(name='Jonathan LaCour') 99 assert p.password == 's3cr3tw0RD' 100
