| 1 | """ |
|---|
| 2 | test special properties (eg. column_property, ...) |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from sqlalchemy import select, func |
|---|
| 6 | from sqlalchemy.orm import column_property |
|---|
| 7 | from elixir import * |
|---|
| 8 | |
|---|
| 9 | def setup(): |
|---|
| 10 | metadata.bind = 'sqlite://' |
|---|
| 11 | |
|---|
| 12 | |
|---|
| 13 | class TestSpecialProperties(object): |
|---|
| 14 | def teardown(self): |
|---|
| 15 | cleanup_all(True) |
|---|
| 16 | |
|---|
| 17 | def test_lifecycle(self): |
|---|
| 18 | class A(Entity): |
|---|
| 19 | name = Field(String(20)) |
|---|
| 20 | |
|---|
| 21 | assert isinstance(A.name, Field) |
|---|
| 22 | |
|---|
| 23 | setup_all() |
|---|
| 24 | |
|---|
| 25 | assert not isinstance(A.name, Field) |
|---|
| 26 | |
|---|
| 27 | def test_generic_property(self): |
|---|
| 28 | class Tag(Entity): |
|---|
| 29 | score1 = Field(Float) |
|---|
| 30 | score2 = Field(Float) |
|---|
| 31 | |
|---|
| 32 | score = GenericProperty( |
|---|
| 33 | lambda c: column_property( |
|---|
| 34 | (c.score1 * c.score2).label('score'))) |
|---|
| 35 | setup_all(True) |
|---|
| 36 | |
|---|
| 37 | t1 = Tag(score1=5.0, score2=3.0) |
|---|
| 38 | t2 = Tag(score1=10.0, score2=2.0) |
|---|
| 39 | |
|---|
| 40 | session.commit() |
|---|
| 41 | session.expunge_all() |
|---|
| 42 | |
|---|
| 43 | for tag in Tag.query.all(): |
|---|
| 44 | assert tag.score == tag.score1 * tag.score2 |
|---|
| 45 | |
|---|
| 46 | def test_column_property(self): |
|---|
| 47 | class Tag(Entity): |
|---|
| 48 | score1 = Field(Float) |
|---|
| 49 | score2 = Field(Float) |
|---|
| 50 | |
|---|
| 51 | score = ColumnProperty(lambda c: c.score1 * c.score2) |
|---|
| 52 | |
|---|
| 53 | setup_all(True) |
|---|
| 54 | |
|---|
| 55 | t1 = Tag(score1=5.0, score2=3.0) |
|---|
| 56 | t2 = Tag(score1=10.0, score2=2.0) |
|---|
| 57 | |
|---|
| 58 | session.commit() |
|---|
| 59 | session.expunge_all() |
|---|
| 60 | |
|---|
| 61 | for tag in Tag.query.all(): |
|---|
| 62 | assert tag.score == tag.score1 * tag.score2 |
|---|
| 63 | |
|---|
| 64 | def test_column_property_eagerload_and_reuse(self): |
|---|
| 65 | class Tag(Entity): |
|---|
| 66 | score1 = Field(Float) |
|---|
| 67 | score2 = Field(Float) |
|---|
| 68 | |
|---|
| 69 | user = ManyToOne('User') |
|---|
| 70 | |
|---|
| 71 | score = ColumnProperty(lambda c: c.score1 * c.score2) |
|---|
| 72 | |
|---|
| 73 | class User(Entity): |
|---|
| 74 | name = Field(String(16)) |
|---|
| 75 | category = ManyToOne('Category') |
|---|
| 76 | tags = OneToMany('Tag', lazy=False) |
|---|
| 77 | score = ColumnProperty(lambda c: |
|---|
| 78 | select([func.sum(Tag.score)], |
|---|
| 79 | Tag.user_id == c.id).as_scalar()) |
|---|
| 80 | |
|---|
| 81 | class Category(Entity): |
|---|
| 82 | name = Field(String(16)) |
|---|
| 83 | users = OneToMany('User', lazy=False) |
|---|
| 84 | |
|---|
| 85 | score = ColumnProperty(lambda c: |
|---|
| 86 | select([func.avg(User.score)], |
|---|
| 87 | User.category_id == c.id |
|---|
| 88 | ).as_scalar()) |
|---|
| 89 | setup_all(True) |
|---|
| 90 | |
|---|
| 91 | u1 = User(name='joe', tags=[Tag(score1=5.0, score2=3.0), |
|---|
| 92 | Tag(score1=55.0, score2=1.0)]) |
|---|
| 93 | |
|---|
| 94 | u2 = User(name='bar', tags=[Tag(score1=5.0, score2=4.0), |
|---|
| 95 | Tag(score1=50.0, score2=1.0), |
|---|
| 96 | Tag(score1=15.0, score2=2.0)]) |
|---|
| 97 | |
|---|
| 98 | c1 = Category(name='dummy', users=[u1, u2]) |
|---|
| 99 | |
|---|
| 100 | session.commit() |
|---|
| 101 | session.expunge_all() |
|---|
| 102 | |
|---|
| 103 | category = Category.query.one() |
|---|
| 104 | assert category.score == 85 |
|---|
| 105 | for user in category.users: |
|---|
| 106 | assert user.score == sum([tag.score for tag in user.tags]) |
|---|
| 107 | for tag in user.tags: |
|---|
| 108 | assert tag.score == tag.score1 * tag.score2 |
|---|
| 109 | |
|---|
| 110 | def test_has_property(self): |
|---|
| 111 | class Tag(Entity): |
|---|
| 112 | has_field('score1', Float) |
|---|
| 113 | has_field('score2', Float) |
|---|
| 114 | has_property('score', |
|---|
| 115 | lambda c: column_property( |
|---|
| 116 | (c.score1 * c.score2).label('score'))) |
|---|
| 117 | |
|---|
| 118 | setup_all(True) |
|---|
| 119 | |
|---|
| 120 | t1 = Tag(score1=5.0, score2=3.0) |
|---|
| 121 | t1 = Tag(score1=10.0, score2=2.0) |
|---|
| 122 | |
|---|
| 123 | session.commit() |
|---|
| 124 | session.expunge_all() |
|---|
| 125 | |
|---|
| 126 | for tag in Tag.query.all(): |
|---|
| 127 | assert tag.score == tag.score1 * tag.score2 |
|---|
| 128 | |
|---|
| 129 | def test_deferred(self): |
|---|
| 130 | class A(Entity): |
|---|
| 131 | name = Field(String(20)) |
|---|
| 132 | stuff = Field(Text, deferred=True) |
|---|
| 133 | |
|---|
| 134 | setup_all(True) |
|---|
| 135 | |
|---|
| 136 | A(name='foo') |
|---|
| 137 | session.commit() |
|---|
| 138 | |
|---|
| 139 | def test_synonym(self): |
|---|
| 140 | class Person(Entity): |
|---|
| 141 | name = Field(String(50), required=True) |
|---|
| 142 | _email = Field(String(20), colname='email', synonym='email') |
|---|
| 143 | |
|---|
| 144 | def _set_email(self, email): |
|---|
| 145 | Person.email_values.append(email) |
|---|
| 146 | self._email = email |
|---|
| 147 | |
|---|
| 148 | def _get_email(self): |
|---|
| 149 | Person.email_gets += 1 |
|---|
| 150 | return self._email |
|---|
| 151 | |
|---|
| 152 | email = property(_get_email, _set_email) |
|---|
| 153 | email_values = [] |
|---|
| 154 | email_gets = 0 |
|---|
| 155 | |
|---|
| 156 | setup_all(True) |
|---|
| 157 | |
|---|
| 158 | mrx = Person(name='Mr. X', email='x@y.com') |
|---|
| 159 | |
|---|
| 160 | assert mrx.email == 'x@y.com' |
|---|
| 161 | assert Person.email_gets == 1 |
|---|
| 162 | |
|---|
| 163 | mrx.email = "x@z.com" |
|---|
| 164 | |
|---|
| 165 | assert Person.email_values == ['x@y.com', 'x@z.com'] |
|---|
| 166 | |
|---|
| 167 | session.commit() |
|---|
| 168 | session.expunge_all() |
|---|
| 169 | |
|---|
| 170 | # test the synonym itself (ie querying) |
|---|
| 171 | p = Person.get_by(email='x@z.com') |
|---|
| 172 | |
|---|
| 173 | assert p.name == 'Mr. X' |
|---|
| 174 | |
|---|
| 175 | def test_synonym_class(self): |
|---|
| 176 | class Person(Entity): |
|---|
| 177 | name = Field(String(30)) |
|---|
| 178 | primary_email = Field(String(100)) |
|---|
| 179 | email_address = Synonym('primary_email') |
|---|
| 180 | |
|---|
| 181 | class User(Person): |
|---|
| 182 | user_name = Synonym('name') |
|---|
| 183 | password = Field(String(20)) |
|---|
| 184 | |
|---|
| 185 | setup_all(True) |
|---|
| 186 | |
|---|
| 187 | alexandre = Person( |
|---|
| 188 | name = u'Alexandre da Silva', |
|---|
| 189 | email_address = u'x@y.com' |
|---|
| 190 | ) |
|---|
| 191 | johann = User( |
|---|
| 192 | name = 'Johann Felipe Voigt', |
|---|
| 193 | email_address = 'y@z.com', |
|---|
| 194 | password = 'unencrypted' |
|---|
| 195 | ) |
|---|
| 196 | |
|---|
| 197 | session.commit(); session.expunge_all() |
|---|
| 198 | |
|---|
| 199 | p = Person.get_by(name='Alexandre da Silva') |
|---|
| 200 | assert p.primary_email == 'x@y.com' |
|---|
| 201 | |
|---|
| 202 | u = User.get_by(user_name='Johann Felipe Voigt') |
|---|
| 203 | assert u.email_address == 'y@z.com' |
|---|
| 204 | |
|---|
| 205 | u.email_address = 'new@z.com' |
|---|
| 206 | session.commit(); session.expunge_all() |
|---|
| 207 | |
|---|
| 208 | p = Person.get_by(name='Johann Felipe Voigt') |
|---|
| 209 | assert p.primary_email == 'new@z.com' |
|---|
| 210 | |
|---|
| 211 | def test_setattr(self): |
|---|
| 212 | class A(Entity): |
|---|
| 213 | pass |
|---|
| 214 | |
|---|
| 215 | A.name = Field(String(30)) |
|---|
| 216 | |
|---|
| 217 | setup_all(True) |
|---|
| 218 | |
|---|
| 219 | a1 = A(name='a1') |
|---|
| 220 | |
|---|
| 221 | session.commit(); session.expunge_all() |
|---|
| 222 | |
|---|
| 223 | a = A.query.one() |
|---|
| 224 | |
|---|
| 225 | assert a.name == 'a1' |
|---|