from peewee import Field, SQL from playhouse.fields import CompressedField as _CompressedField, CharField as _CharField from itertools import starmap from functools import reduce import operator as op def sqlite_tuple_in(fields, values): """SQLite does not support (foo, bar) IN ((1,2),(3,4)). So we construct a '(foo = 1 AND bar = 2) OR (foo = 3 AND bar = 4)' monstrum.""" subqueries = (reduce(op.and_, starmap(op.eq, zip(fields, value_tuple))) for value_tuple in values) return reduce(op.or_, subqueries) class CharField(_CharField): def __init__(self, *args, **kwargs): constraints = kwargs.pop('constraints', []) nocase = kwargs.pop('nocase', False) if nocase: constraints.append(SQL('COLLATE NOCASE')) super().__init__(*args, constraints=constraints, **kwargs) class CompressedField(_CompressedField): def db_value(self, value): return value if value is None else self.compress(value) def python_value(self, value): return value if value is None else self.decompress(value) class EnumField(Field): db_field = 'enum' def __init__(self, enum_class, *args, **kwargs): super().__init__(*args, **kwargs) self.enum_class = enum_class def _enum_value(self, value): if isinstance(value, str): try: return self.enum_class[value.upper()] except KeyError: pass try: return self.enum_class(int(value)) except ValueError: raise ValueError("%r is not a valid %s" % (value, self.enum_class.__name__)) def db_value(self, value): if value is None: return value if isinstance(value, self.enum_class): return value.value # force check of enum value return self._enum_value(value).value def python_value(self, value): return value if value is None else self._enum_value(value) class SQLIndex(SQL): """Ugly hack to allow arbitrary index creation.""" def __init__(self, field, sql): self.field = field super().__init__(sql) def clone_base(self): return SQLWithField(field, self.value) @property def db_column(self): return self.field def as_entity(self): return self