import click import os import sys from functools import reduce import operator as op import pathlib from .virtual_prefixes import register_prefixes from .utils import ProxyCommand CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def enable_debug(): import logging logger = logging.getLogger('peewee') logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) @click.group(context_settings = CONTEXT_SETTINGS) @click.option('--debug', '-d', is_flag=True, default=False) def cli(debug): if debug or 'DEBUG' in os.environ: enable_debug() class ServerCommand(ProxyCommand): def _get_proxy(self): from .server import server_group return server_group @cli.group(cls=ServerCommand) def server(): """Flask server handling""" @cli.group() def db(): """Database Management""" pass @db.command() def init(): """Initialize the database, if not done already.""" from .model import create_tables, db create_tables() with db.atomic(): register_prefixes() @db.command() @click.confirmation_option(prompt="Are you sure you want to drop the database?") def drop(): """Completely drop all tables.""" from .model import drop_tables drop_tables() @cli.group() def tag(): """Handling of tags""" pass @cli.group() def prefix(): """Handling of prefixes of tags""" pass @tag.command('list') @click.argument('pattern', required = False) @click.option('--full', is_flag=True, help="Print implications") def list_tags(pattern, full): from .model import Tag print("Tags") print("====") print() query = Tag.select().where(~Tag.default) if pattern: query = query.where(Tag.name.contains(pattern) | Tag.prefix.contains(pattern)) for t in query: descr = t.description or '' if descr: descr = '-- ' + descr if full: impls = ', '.join(str(i.implies_tag) for i in t.implications if not i.implies_tag.default) if impls: descr = descr + ' --> ' + impls print(' *', t, descr) @prefix.command('list') def list_prefixes(): from .model import Prefix print("Prefixes") print("========") print() for p in Prefix.select(): print(" * %s" % p) @prefix.command('add') @click.argument('name') @click.argument('description', required = False) def add_prefix(name, description): from .model import Prefix prefix = Prefix.try_create(name = name, description = description) if prefix is None: print("Prefix already existed:", name) class PrefixTagType(click.ParamType): name = 'prefixed tag' def convert(self, value, param, ctx): from .bl import PrefixTag try: prefix, tag = value.split(':', 1) except ValueError: return PrefixTag(value) else: return PrefixTag(tag, prefix) TAG = PrefixTagType() @prefix.command('edit') @click.argument('name') @click.argument('implies', type = TAG, nargs = -1) @click.option('--description') def edit_prefix(name, implies, description): from .model import Prefix, db try: prefix = Prefix.get(name = name) except Prefix.DoesNotExist: raise click.UsageError(f"Prefix '{name}' does not exist.") with db.atomic(): add_implications(prefix.default_tag, implies) if description: prefix.description = description prefix.save() def fetch_tags(tag_list, ignore_missing=False): if not tag_list: return [] from .model import Tag from .peewee_ext import sqlite_tuple_in fetched_tags = Tag.select().where(sqlite_tuple_in((Tag.prefix, Tag.name), tag_list)).execute() if len(fetched_tags) < len(tag_list): print("Some tags were not present:", ', '.join(set(map(str, tag_list)).difference(map(str, fetched_tags)))) if not ignore_missing: raise click.ClickException("Not all tags present") return fetched_tags @tag.command('add') @click.argument('name', type = TAG) @click.argument('description', required = False) def add_tag(name, description): tag, created = name.create() if not created: print("Tag already existed:", tag) def add_implications(tag, implies): from .model import TagImplications from peewee import IntegrityError for i in fetch_tags(implies): try: TagImplications.create(tag = tag, implies_tag = i) except IntegrityError: print("Implication onto '%s' already existing. Skipping." % i) @tag.command('edit') @click.argument('name', type = TAG) @click.argument('implies', type = TAG, nargs = -1) @click.option('--description') def edit_tag(name, implies, description): from .model import Tag, db try: tag = name.load() except Tag.DoesNotExist: raise click.UsageError("Tag '%s' does not exist." % name) with db.atomic(): add_implications(tag, implies) if description: tag.description = description tag.save() @cli.group() def doc(): """Document handling""" pass @doc.command('add') @click.argument('file', type=click.File(mode = 'rb')) @click.argument('tags', type=TAG, nargs=-1) @click.option('--create-tags', '-c', is_flag = True) @click.option('--ignore-missing-tags', '-i', is_flag = True) def add_doc(file, tags, create_tags, ignore_missing_tags): """Add a new document together with the given tags.""" from .model import db, Document import magic mimetype = magic.from_file(file.name, mime=True) with db.atomic(): if tags: if create_tags: tags = [tag.create()[0] for tag in tags] else: tags = fetch_tags(tags, ignore_missing_tags) Document.create_from_file(file, tags, direction = Document.Direction.IN, mimetype = mimetype) @doc.command('store') @click.argument('id', type=int) @click.argument('path', type=click.Path(file_okay=True, dir_okay=True, writable=True, allow_dash=True)) @click.option('--parents', '-p', is_flag=True, help = "Create intermediate directories, if needed.") def store_doc(id, path, parents): from .model import Document try: d = Document.get(id = id) except Document.DoesNotExist: print("Document with ID #{id} does not exist") return if path in (b'-', '-'): click.get_binary_stream('stdout').write(d.content.blob) else: p = pathlib.Path(path) if p.is_dir(): p = p / d.path.name if parents: p.parent.mkdir(parents=True) elif not p.parent.exists(): raise click.UsageError(f"Directory {p.parent} does not exist (use option '-p'?).") with p.open(mode='wb') as f: f.write(d.content.blob) @doc.command('open') @click.argument('id', type=int) def open_doc(id): from .model import Document import tempfile try: d = Document.get(id = id) except Document.DoesNotExist: print(f"Document with ID #{id} does not exist") return with tempfile.TemporaryDirectory() as tmpdir: filename = pathlib.Path(tmpdir, d.path.name) with filename.open('xb') as outfile: outfile.write(d.content.blob) click.launch(str(filename), wait=True) @doc.command('find') @click.argument('tags', type=TAG, nargs=-1) @click.option('--full', is_flag=True, help="Print tags") def find_doc(tags, full): from .model import Document, DocumentTag, TagClosure, Tag virtual_tags = [] normal_tags = [] for t in tags: if t.is_virtual(): virtual_tags.append(t) else: normal_tags.append(t) query = Document.select() if normal_tags: tag_query = None for t in fetch_tags(normal_tags): desc = TagClosure.descendants(t, include_node=True).select(Tag.id) subq = DocumentTag.select(DocumentTag.document_id).where(DocumentTag.tag << desc) if tag_query is None: tag_query = subq else: tag_query = tag_query & subq query = query.where(Document.id << tag_query) if virtual_tags: query = query.where(reduce(op.and_, (p.virtual_query for p in virtual_tags))) for doc in query.iterator(): if full: tags = ' --> ' + ', '.join(str(t.tag) for t in doc.tags) print(f"* ID {doc.id} -- {doc.original_path}{tags}")