summaryrefslogtreecommitdiff
path: root/archivist/peewee_ext.py
blob: 011f80d4c7bbbaad40dc15d5afee030e7594cb4d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from peewee import Field, SQL
from playhouse.fields import CompressedField as _CompressedField, CharField as _CharField

from itertools import starmap
from functools import reduce
import operator as op

def sqlite_tuple_in(fields, values):
    """SQLite does not support (foo, bar) IN ((1,2),(3,4)).
    So we construct a '(foo = 1 AND bar = 2) OR (foo = 3 AND bar = 4)' monstrum."""
    subqueries = (reduce(op.and_, starmap(op.eq, zip(fields, value_tuple))) for value_tuple in values)
    return reduce(op.or_, subqueries)

class CharField(_CharField):
    def __init__(self, *args, **kwargs):
        constraints = kwargs.pop('constraints', [])
        nocase = kwargs.pop('nocase', False)
        if nocase:
            constraints.append(SQL('COLLATE NOCASE'))
        super().__init__(*args, constraints=constraints, **kwargs)

class CompressedField(_CompressedField):
    def db_value(self, value):
        return value if value is None else self.compress(value)

    def python_value(self, value):
        return value if value is None else self.decompress(value)

class EnumField(Field):
    db_field = 'enum'

    def __init__(self, enum_class, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.enum_class = enum_class

    def _enum_value(self, value):
        if isinstance(value, str):
            try:
                return self.enum_class[value.upper()]
            except KeyError:
                pass

        try:
            return self.enum_class(int(value))
        except ValueError:
            raise ValueError("%r is not a valid %s" % (value, self.enum_class.__name__))

    def db_value(self, value):
        if value is None:
            return value

        if isinstance(value, self.enum_class):
            return value.value

        # force check of enum value
        return self._enum_value(value).value

    def python_value(self, value):
        return value if value is None else self._enum_value(value)

class SQLIndex(SQL):
    """Ugly hack to allow arbitrary index creation."""
    def __init__(self, field, sql):
        self.field = field
        super().__init__(sql)

    def clone_base(self):
        return SQLWithField(field, self.value)

    @property
    def db_column(self):
        return self.field

    def as_entity(self):
        return self