Source code for ferenda.fulltextindex

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import json
import math
import re
import shutil
import sys

import six
from six.moves.urllib_parse import quote
import requests
import requests.exceptions
from bs4 import BeautifulSoup

from ferenda import util, errors


class FulltextIndex(object):

    @staticmethod
    def connect(indextype, location, repos=[]):
        """Open a fulltext index (creating it if it
        doesn't already exists).

        :param location: Type of fulltext index ("WHOOSH" or "ELASTICSEARCH")
        :type  location: str
        :param location: The file path of the fulltext index.
        :type  location: str
    """
        # create correct subclass and return it
        return {'WHOOSH': WhooshIndex,
                'ELASTICSEARCH': ElasticSearchIndex}[indextype](location, repos)

    def __init__(self, location, repos):
        self.location = location
        if self.exists():
            self.index = self.open()
        else:
            self.index = self.create(self.get_default_schema(), repos)

    def __del__(self):
        self.close()

    def get_default_schema(self):
        return {'uri': Identifier(),
                'repo': Label(),
                'basefile': Label(),
                'title': Text(boost=4),
                'identifier': Label(boost=16),
                'text': Text()}

    def exists(self):
        """Whether the fulltext index exists."""
        raise NotImplementedError  # pragma: no cover

    def create(self, schema, repos):
        """Creates a fulltext index using the provided default schema."""
        raise NotImplementedError  # pragma: no cover

    def destroy(self):
        """Destroys the index, if created."""
        raise NotImplementedError  # pragma: no cover

    def open(self):
        """Opens the index so that it can be queried."""
        raise NotImplementedError  # pragma: no cover

    def schema(self):
        """Returns the schema that actually is in use. A schema is a dict
           where the keys are field names and the values are any
           subclass of
           :py:class:`ferenda.fulltextindex.IndexedType`
        """
        raise NotImplementedError  # pragma: no cover

    def update(self, uri, repo, basefile, title, identifier, text, **kwargs):
        """Insert (or update) a resource in the fulltext index. A resource may
        be an entire document, but it can also be any part of a
        document that is referenceable (i.e. a document node that has
        ``@typeof`` and ``@about`` attributes). A document with 100
        sections can be stored as 100 independent resources, as long
        as each section has a unique key in the form of a URI.

        :param uri: URI for the resource
        :type  uri: str
        :param repo: The alias for the document repository that the resource is part of
        :type  repo: str
        :param basefile: The basefile which contains resource
        :type  basefile: str
        :param title: User-displayable title of resource (if applicable).
                      Should not contain the same information as
                      ``identifier``. 
        :type  title: str
        :param identifier: User-displayable short identifier for resource (if applicable)
        :type  identifier: str
        :type  text: The full textual content of the resource, as a plain string.
        :type  text: str

        .. note::

           Calling this method may not directly update the fulltext
           index -- you need to call
           :meth:`~ferenda.FulltextIndex.commit` or
           :meth:`~ferenda.FulltextIndex.close` for that.

        """
        raise NotImplementedError  # pragma: no cover

    def commit(self):
        """Commit all pending updates to the fulltext index."""
        raise NotImplementedError  # pragma: no cover

    def close(self):
        """Commits all pending updates and closes the index."""
        raise NotImplementedError  # pragma: no cover

    def doccount(self):
        """Returns the number of currently indexed (non-deleted) documents."""
        raise NotImplementedError  # pragma: no cover

    def query(self, q, **kwargs):
        """Perform a free text query against the full text index, optionally
           restricted with parameters for individual fields.

        :param q: Free text query, using the selected full text index's
                  prefered query syntax
        :type  q: str
        :param \*\*kwargs: any parameter will be used to match a
                         similarly-named field
        :type \*\*kwargs: dict
        :returns: matching documents, each document as a dict of fields
        :rtype: list

        .. note::

           The *kwargs* parameters do not yet do anything -- only
           simple full text queries are possible.

        """
        raise NotImplementedError  # pragma: no cover


[docs]class IndexedType(object): """Base class for a fulltext searchengine-independent representation of indeaxed data. By using IndexType-derived classes to represent the schema, it becomes possible to switch out search engines without affecting the rest of the code. """ def __eq__(self, other): return (isinstance(other, self.__class__) and self.__dict__ == other.__dict__) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(tuple(v for k, v in sorted(self.__dict__.items()))) def __init__(self, **kwargs): self.__dict__ = dict(kwargs) def __repr__(self): # eg '<Label boost=16>' or '<Identifier>' dictrepr = "".join((" %s=%s" % (k, v) for k, v in sorted(self.__dict__.items()))) return ("<%s%s>" % (self.__class__.__name__, dictrepr))
[docs]class Identifier(IndexedType): pass
class Datetime(IndexedType): pass
[docs]class Text(IndexedType): pass
[docs]class Label(IndexedType): pass
[docs]class Keywords(IndexedType): pass
[docs]class Boolean(IndexedType): pass
[docs]class URI(IndexedType): pass
[docs]class SearchModifier(object): pass
[docs]class Less(SearchModifier): pass
[docs]class More(SearchModifier): pass
[docs]class Between(SearchModifier): pass
import whoosh.index import whoosh.fields import whoosh.analysis import whoosh.query import whoosh.qparser import whoosh.writing import whoosh.highlight from ferenda.elements import html class ElementsFormatter(whoosh.highlight.Formatter): """Returns a tree of ferenda.elements representing the formatted hit.""" def __init__(self, wrapelement=html.P, hitelement=html.Strong, classname="match", between=" ... "): self.wrapelement = wrapelement self.hitelement = hitelement self.classname = classname self.between = between def format(self, fragments, replace=False): res = self.wrapelement() first = True for fragment in fragments: if not first: res.append(self.between) res.extend(self.format_fragment(fragment, replace=replace)) first = False return res re_collapse = re.compile("\s+").sub def format_fragment(self, fragment, replace): output = [] index = fragment.startchar text = fragment.text for t in fragment.matches: if t.startchar > index: output.append(self.re_collapse(" ", text[index:t.startchar])) hittext = whoosh.highlight.get_text(text, t, False) output.append(self.hitelement([hittext], **{'class': self.classname})) index = t.endchar if index < len(text): output.append(self.re_collapse(" ", text[index:fragment.endchar])) return output class WhooshIndex(FulltextIndex): def __init__(self, location, repos): super(WhooshIndex, self).__init__(location, repos) self._schema = self.get_default_schema() self._writer = None def exists(self): return whoosh.index.exists_in(self.location) def open(self): return whoosh.index.open_dir(self.location) def create(self, schema, repos): # maps our field classes to concrete whoosh field instances mapped_field = {Identifier(): whoosh.fields.ID(unique=True, stored=True), Label(): whoosh.fields.ID(stored=True), Label(boost=16): whoosh.fields.ID(field_boost=16, stored=True), Text(boost=4): whoosh.fields.TEXT(field_boost=4, stored=True, analyzer=whoosh.analysis.StemmingAnalyzer( )), Text(): whoosh.fields.TEXT(stored=True, analyzer=whoosh.analysis.StemmingAnalyzer())} whoosh_fields = {} for key, fieldtype in self.get_default_schema().items(): whoosh_fields[key] = mapped_field[fieldtype] schema = whoosh.fields.Schema(**whoosh_fields) util.mkdir(self.location) return whoosh.index.create_in(self.location, schema) def destroy(self): shutil.rmtree(self.location) def schema(self): # FIXME: This should iterate through self.index (the # underlying whoosh index), convert each field to the # corresponding IndexedType objects. return self._schema def update(self, uri, repo, basefile, title, identifier, text, **kwargs): if not self._writer: self._writer = self.index.writer() # A whoosh document is not the same as a ferenda document. A # ferenda document may be indexed as several (tens, hundreds # or more) whoosh documents self._writer.update_document(uri=uri, repo=repo, basefile=basefile, title=title, identifier=identifier, text=text, **kwargs) def commit(self): if self._writer: self._writer.commit() if not isinstance(self._writer, whoosh.writing.BufferedWriter): # A bufferedWriter can be used again after commit(), a regular writer cannot self._writer = None def close(self): self.commit() self.index.close() def doccount(self): return self.index.doc_count() def query(self, q, pagenum=1, pagelen=10, **kwargs): searchfields = ['identifier', 'title', 'text'] mparser = whoosh.qparser.MultifieldParser(searchfields, self.index.schema) query = mparser.parse(q) with self.index.searcher() as searcher: page = searcher.search_page(query, pagenum, pagelen) res = self._convert_result(page) pager = {'pagenum': pagenum, 'pagecount': page.pagecount, 'firstresult': page.offset + 1, 'lastresult': page.offset + page.pagelen, 'totalresults': page.total} return res, pager def _convert_result(self, res): # converts a whoosh.searching.ResultsPage object to a plain # list of dicts l = [] hl = whoosh.highlight.Highlighter(formatter=ElementsFormatter()) for hit in res: fields = hit.fields() fields['text'] = hl.highlight_hit(hit, "text", fields['text']) l.append(hit.fields()) return l # Base class for a HTTP-based API (eg. ElasticSearch) the base class # delegate the formulation of queries, updates etc to concrete # subclasses, expected to return a formattted query/payload etc, and # be able to decode responses to queries, but the base class handles # the actual HTTP call, inc error handling. class RemoteIndex(FulltextIndex): # The only real implementation of RemoteIndex has its own exists # implementation, no need for a general fallback impl. # def exists(self): # pass def create(self, schema, repos): relurl, payload = self._create_schema_payload(self.get_default_schema(), repos) res = requests.put(self.location + relurl, payload) try: res.raise_for_status() except Exception as e: raise Exception("%s: %s" % (res.status_code, res.text)) def schema(self): relurl, payload = self._get_schema_payload() res = requests.get(self.location + relurl) # payload is probably never used return self._decode_schema(res) def update(self, uri, repo, basefile, title, identifier, text, **kwargs): relurl, payload = self._update_payload( uri, repo, basefile, title, identifier, text, **kwargs) res = requests.put(self.location + relurl, payload) try: res.raise_for_status() # print(json.dumps(res.json(), indent=4)) except requests.exceptions.HTTPError as e: raise errors.IndexingError(str(e) + ": '%s'" % res.text) def doccount(self): relurl, payload = self._count_payload() if payload: res = requsts.post(self.location + relurl, payload) else: res = requests.get(self.location + relurl) return self._decode_count_result(res) def query(self, q, pagenum=1, pagelen=10, **kwargs): relurl, payload = self._query_payload(q, pagenum, pagelen, **kwargs) if payload: # print("POSTing to %s:\n%s" % (relurl, payload)) res = requests.post(self.location + relurl, payload) # print("Recieved:\n%s" % (json.dumps(res.json(),indent=4))) else: res = requests.get(self.location + relurl) try: res.raise_for_status() except Exception as e: raise errors.SearchingError("%s: %s" % (res.status_code, res.text)) return self._decode_query_result(res, pagenum, pagelen) def destroy(self): reluri, payload = self._destroy_payload() res = requests.delete(self.location + reluri) # these don't make no sense for a remote index accessed via HTTP/REST def open(self): pass def commit(self): pass def close(self): pass class ElasticSearchIndex(RemoteIndex): def commit(self): r = requests.post(self.location + "_refresh") r.raise_for_status() def exists(self): r = requests.get(self.location + "_mapping/") if r.status_code == 404: return False else: return True def _update_payload(self, uri, repo, basefile, title, identifier, text, **kwargs): safe = '' if six.PY2: # urllib.quote in python 2.6 cannot handle unicode values # for the safe parameter (not even empty). urllib.quote in # python 2.7 handles it, but may fail later on. FIXME: We # should create a shim as ferenda.compat.quote and use # that safe = safe.encode('ascii') # pragma: no cover # quote (in python 2) only handles characters from 0x0 - 0xFF, # and basefile might contain characters outside of that (eg # u'MO\u0308D/P11463-12', which is MÖD/P11463-12 on a system # which uses unicode normalization form NFD). To be safe, # encodethe string to utf-8 beforehand (Which is what quote on # python 3 does anyways) relurl = "%s/%s" % (repo, quote(basefile.encode("utf-8"), safe=safe)) # eg type, id if "#" in uri: relurl += uri.split("#", 1)[1] payload = {"uri": uri, "basefile": basefile, "title": title, "identifier": identifier, "text": text} payload.update(kwargs) return relurl, json.dumps(payload) def _query_payload(self, q, pagenum=1, pagelen=10, **kwargs): # relurl = "_search?q=%s&size=%s&from=%s" % (quote(q), pagelen, (pagenum * pagelen) - pagelen) relurl = "_search?from=%s&size=%s" % ((pagenum - 1) * pagelen, pagelen) # FIXME: Only searches in text, not title or identifier. But # can't search on the _all field, because apparently that # field isn't set up to use the my_analyzer we've defined... payload = {'query': {'match': {'text': q}}, 'highlight': {'fields': {'text': {}}, 'pre_tags': ["<strong class='match'>"], 'post_tags': ["</strong>"], 'fragment_size': '40'}} return relurl, json.dumps(payload, indent=4) def _decode_query_result(self, response, pagenum, pagelen): json = response.json() res = [] for hit in json['hits']['hits']: h = hit['_source'] # wrap highlighted field in P, convert to elements hltext = " ... ".join([x.strip() for x in hit['highlight']['text']]) soup = BeautifulSoup("<p>%s</p>" % re.sub("\s+", " ", hltext)) h['text'] = html.elements_from_soup(soup.html.body.p) res.append(h) pager = {'pagenum': pagenum, 'pagecount': int(math.ceil(json['hits']['total'] / float(pagelen))), 'firstresult': (pagenum - 1) * pagelen + 1, 'lastresult': (pagenum - 1) * pagelen + len(json['hits']['hits']), 'totalresults': json['hits']['total']} return res, pager def _count_payload(self): return "_count", None def _decode_count_result(self, response): if response.status_code == 404: return 0 else: return response.json()['count'] # FIXME: This is cheating! def schema(self): return self.get_default_schema() def _get_schema_payload(self): return "", None def _decode_schema_payload(self, response): raise NotImplementedError # pragma: no cover # FIXME: For some reason, createing a schema/mapping makes PUTting # new documents to the index hang with the folloging error: # # UnavailableShardsException[[ferenda][1] [3] shardIt, [0] active : Timeout waiting for [1m] # # So we skip creating the schema as it isn't neccesary # def create(self, schema, repos): # pass def _create_schema_payload(self, schema, repos): schema = { # cargo cult configuration "settings": {"number_of_shards": 1, "analysis": { "analyzer": { "my_analyzer": { "filter": ["lowercase", "snowball"], "tokenizer": "standard", "type": "custom" } }, "filter": { "snowball": { "type": "snowball", "language": "English" } } } }, # "mappings": {"_all": {"properties": {"analyzer": "my_analyzer"}}} "mappings": {} } # maps our field classes to concrete ES field properties mapped_field = {Identifier(): {"type": "string", "index": "not_analyzed"}, # uri # repo, basefile (note: see below) Label(): {"type": "string", "index": "not_analyzed"}, # identifier Label(boost=16): {"type": "string", "boost": 16.0, "analyzer": "my_analyzer"}, # title Text(boost=4): {"type": "string", "boost": 4.0, "analyzer": "my_analyzer"}, Text(): {"type": "string", "analyzer": "my_analyzer"}} # text es_fields = {} for key, fieldtype in self.get_default_schema().items(): if key == "repo": continue # not really needed for ES, as type == repo.alias es_fields[key] = mapped_field[fieldtype] for repo in repos: schema["mappings"][repo.alias] = {"_source": {"enabled": True}, # so we can get the text back "properties": es_fields} return "", json.dumps(schema) def _destroy_payload(self): return "", None