# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import *
from future import standard_library
standard_library.install_aliases()
from datetime import date, datetime, MAXYEAR, MINYEAR
from urllib.parse import quote
from copy import deepcopy
import itertools
import json
import math
import re
import shutil
import tempfile
import requests
import requests.exceptions
from bs4 import BeautifulSoup
from ferenda import util, errors
import logging
[docs]class FulltextIndex(object):
"""This is the abstract base class for a fulltext index. You use it by
calling the static method FulltextIndex.connect, passing a
string representing the underlying fulltext engine you wish to
use. It returns a subclass on which you then call further
methods.
"""
indextypes = {} # this is repopulated at the very end of this
# module, when the classes we need to specify are
# defined.
[docs] @classmethod
def connect(cls, indextype, location, repos):
"""Open a fulltext index (creating it if it doesn't already exists).
:param location: Type of fulltext index ("WHOOSH" or "ELASTICSEARCH")
:type location: str
:param location: The file path of the fulltext index.
:type location: str
"""
# create correct subclass and return it
return cls.indextypes[indextype](location, repos)
def __init__(self, location, repos):
self.location = location
if self.exists():
self.index = self.open()
else:
assert repos, "Attempt to create a fulltext index, but no repos were provided, index schema would be empty"
self.index = self.create(repos)
self.log = logging.getLogger("ferenda.fulltextindex")
def __del__(self):
self.close()
[docs] def make_schema(self, repos):
s = self.get_default_schema()
for repo in repos:
g = repo.make_graph() # for qname lookup
for facet in repo.facets():
if facet.dimension_label:
fld = facet.dimension_label
else:
fld = g.qname(facet.rdftype).replace(":", "_")
idxtype = facet.indexingtype
if fld in s:
# multiple repos can provide the same indexed
# properties ONLY if the indextype match
if s[fld] != idxtype:
raise errors.SchemaConflictError(
"Repo %s wanted to add a field named %s, but it was already present with a different IndexType" %
(repo, fld))
else:
s[fld] = idxtype
return s
[docs] def get_default_schema(self):
return {'uri': Identifier(),
'repo': Label(),
# 'rdftype': Label(),
'basefile': Label(),
# 'title': Text(boost=4),
# 'identifier': Label(boost=16),
'text': Text()
}
[docs] def exists(self):
"""Whether the fulltext index exists."""
raise NotImplementedError # pragma: no cover
[docs] def create(self, repos):
"""Creates a fulltext index using the provided schema."""
raise NotImplementedError # pragma: no cover
[docs] def destroy(self):
"""Destroys the index, if created."""
raise NotImplementedError # pragma: no cover
[docs] def open(self):
"""Opens the index so that it can be queried."""
raise NotImplementedError # pragma: no cover
[docs] def schema(self):
"""Returns the schema that actually is in use. A schema is a dict
where the keys are field names and the values are any
subclass of
:py:class:`ferenda.fulltextindex.IndexedType`
"""
raise NotImplementedError # pragma: no cover
[docs] def update(self, uri, repo, basefile, text, **kwargs):
"""Insert (or update) a resource in the fulltext index. A resource may
be an entire document, but it can also be any part of a
document that is referenceable (i.e. a document node that has
``@typeof`` and ``@about`` attributes). A document with 100
sections can be stored as 100 independent resources, as long
as each section has a unique key in the form of a URI.
:param uri: URI for the resource
:type uri: str
:param repo: The alias for the document repository that the resource is part of
:type repo: str
:param basefile: The basefile which contains resource
:type basefile: str
:param title: User-displayable title of resource (if applicable).
Should not contain the same information as
``identifier``.
:type title: str
:param identifier: User-displayable short identifier for resource (if applicable)
:type identifier: str
:type text: The full textual content of the resource, as a plain string.
:type text: str
.. note::
Calling this method may not directly update the fulltext
index -- you need to call
:meth:`~ferenda.FulltextIndex.commit` or
:meth:`~ferenda.FulltextIndex.close` for that.
"""
raise NotImplementedError # pragma: no cover
[docs] def commit(self):
"""Commit all pending updates to the fulltext index."""
raise NotImplementedError # pragma: no cover
[docs] def close(self):
"""Commits all pending updates and closes the index."""
raise NotImplementedError # pragma: no cover
[docs] def doccount(self):
"""Returns the number of currently indexed (non-deleted) documents."""
raise NotImplementedError # pragma: no cover
[docs] def query(self, q=None, pagenum=1, pagelen=10, ac_query=False, exclude_types=None, **kwargs):
"""Perform a free text query against the full text index, optionally
restricted with parameters for individual fields.
:param q: Free text query, using the selected full text index's
prefered query syntax
:type q: str
:param \*\*kwargs: any parameter will be used to match a
similarly-named field
:type \*\*kwargs: dict
:returns: matching documents, each document as a dict of fields
:rtype: list
.. note::
The *kwargs* parameters do not yet do anything -- only
simple full text queries are possible.
"""
raise NotImplementedError # pragma: no cover
# subclasses can override fieldmapping, and have
# to_native_field/from_native_field work on their overridden
# fieldmapping
fieldmapping = ()
"""A tuple of ``(abstractfield, nativefield)`` tuples. Each
``abstractfield`` should be a instance of a IndexedType-derived
class. Each ``nativefield`` should be whatever kind of object that
is used with the native fullltextindex API.
The methods :py:meth:`to_native_field` and
:py:meth:`from_native_field` uses this tuple of tuples to convert
fields.
"""
[docs] def to_native_field(self, fieldobject):
"""Given a abstract field (an instance of a IndexedType-derived
class), convert to the corresponding native type for the
fulltextindex in use.
"""
for abstractfield, nativefield in self.fieldmapping:
if fieldobject == abstractfield:
return nativefield
raise errors.SchemaMappingError(
"Field %s cannot be mapped to a native field" %
fieldobject)
[docs] def from_native_field(self, fieldobject):
"""Given a fulltextindex native type, convert to the corresponding
IndexedType object."""
for abstractfield, nativefield in self.fieldmapping:
# whoosh field objects do not implement __eq__ sanely --
# whoosh.fields.ID() == whoosh.fields.DATETIME() is true
# -- so we do an extra check on the type as well.
if (isinstance(fieldobject, type(nativefield)) and
fieldobject == nativefield):
return abstractfield
raise errors.SchemaMappingError("Native field %s cannot be mapped" % fieldobject)
[docs]class IndexedType(object):
"""Base class for a fulltext searchengine-independent representation
of indexed data. By using IndexType-derived classes to
represent the schema, it becomes possible to switch out search
engines without affecting the rest of the code.
"""
def __eq__(self, other):
return (isinstance(other, self.__class__)
and self.__dict__ == other.__dict__)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(v for k, v in sorted(self.__dict__.items())))
def __init__(self, **kwargs):
self.__dict__ = dict(kwargs)
def __repr__(self):
# eg '<Label boost=16>' or '<Identifier>'
dictrepr = "".join((" %s=%s" % (k, v) for k, v in sorted(self.__dict__.items())))
return ("<%s%s>" % (self.__class__.__name__, dictrepr))
[docs]class Identifier(IndexedType):
"""An identifier is a string, normally in the form of a URI, which uniquely identifies an indexed document."""
pass
[docs]class Datetime(IndexedType):
pass
[docs]class Text(IndexedType):
pass
[docs]class Label(IndexedType):
pass
[docs]class Keyword(IndexedType):
"""A keyword is a single string from a controlled vocabulary."""
pass
[docs]class Boolean(IndexedType):
pass
class Integer(IndexedType):
pass
[docs]class URI(IndexedType):
"""Any URI (except the URI that identifies a indexed document -- use Identifier for that)."""
pass
[docs]class Resource(IndexedType):
"""A fulltextindex.Resource is a URI that also has a human-readable
label.
"""
# eg. a particular object/subject with it's own rdfs:label,
# foaf:name, skos:prefLabel etc
[docs]class SearchModifier(object):
def __init__(self, *values):
self.values = values
[docs]class Less(SearchModifier):
def __init__(self, max):
super(Less, self).__init__(*[max])
self.max = max
[docs]class More(SearchModifier):
def __init__(self, min):
super(More, self).__init__(*[min])
self.min = min
[docs]class Between(SearchModifier):
def __init__(self, min, max):
super(Between, self).__init__(*[min, max])
self.min = min
self.max = max
class RegexString(str):
# Parameters of this type are interpreted as using regexp
# semantics, not globbing semantics. I.e. "foo.*" instead of "foo*"
pass
class Results(list):
# this is just so that we can add arbitrary attributes to a
# list-like object.
pass
import whoosh.index
import whoosh.fields
import whoosh.analysis
import whoosh.query
import whoosh.qparser
import whoosh.writing
import whoosh.highlight
from ferenda.elements import html
class ElementsFormatter(whoosh.highlight.Formatter):
"""Returns a tree of ferenda.elements representing the formatted hit."""
def __init__(self, wrapelement=html.P, hitelement=html.Strong,
classname="match", between=" ... "):
self.wrapelement = wrapelement
self.hitelement = hitelement
self.classname = classname
self.between = between
def format(self, fragments, replace=False):
res = self.wrapelement()
first = True
for fragment in fragments:
if not first:
res.append(self.between)
res.extend(self.format_fragment(fragment, replace=replace))
first = False
return res
re_collapse = re.compile("\s+").sub
def format_fragment(self, fragment, replace):
output = []
index = fragment.startchar
text = fragment.text
for t in fragment.matches:
if t.startchar > index:
output.append(self.re_collapse(" ", text[index:t.startchar]))
hittext = whoosh.highlight.get_text(text, t, False)
output.append(self.hitelement([hittext], **{'class': self.classname}))
index = t.endchar
if index < len(text):
output.append(self.re_collapse(" ", text[index:fragment.endchar]))
return output
class WhooshIndex(FulltextIndex):
fieldmapping = ((Identifier(), whoosh.fields.ID(unique=True, stored=True)),
(Label(), whoosh.fields.ID(stored=True)),
(Label(boost=16), whoosh.fields.ID(field_boost=16, stored=True)),
(Text(boost=4), whoosh.fields.TEXT(field_boost=4, stored=True,
analyzer=whoosh.analysis.StemmingAnalyzer(
))),
(Text(boost=2), whoosh.fields.TEXT(field_boost=2, stored=True,
analyzer=whoosh.analysis.StemmingAnalyzer(
))),
(Text(), whoosh.fields.TEXT(stored=True,
analyzer=whoosh.analysis.StemmingAnalyzer())),
(Datetime(), whoosh.fields.DATETIME(stored=True)),
(Boolean(), whoosh.fields.BOOLEAN(stored=True)),
(URI(), whoosh.fields.ID(stored=True, field_boost=1.1)),
(Keyword(), whoosh.fields.KEYWORD(stored=True)),
(Resource(), whoosh.fields.IDLIST(stored=True)),
)
def __init__(self, location, repos):
self._writer = None
super(WhooshIndex, self).__init__(location, repos)
self._multiple = {}
# Initialize self._multiple so that we know which fields may
# contain multiple values. FIXME: v. similar to the code in
# make_schema
for repo in repos:
g = repo.make_graph() # for qname lookup
for facet in repo.facets():
if facet.dimension_label:
fld = facet.dimension_label
else:
fld = g.qname(facet.rdftype).replace(":", "_")
self._multiple[fld] = facet.multiple_values
def exists(self):
return whoosh.index.exists_in(self.location)
def open(self):
return whoosh.index.open_dir(self.location)
def create(self, repos):
schema = self.make_schema(repos)
whoosh_fields = {}
for key, fieldtype in schema.items():
whoosh_fields[key] = self.to_native_field(fieldtype)
schema = whoosh.fields.Schema(**whoosh_fields)
util.mkdir(self.location)
return whoosh.index.create_in(self.location, schema)
def destroy(self):
shutil.rmtree(self.location)
def schema(self):
used_schema = {}
for fieldname, field_object in self.index.schema.items():
used_schema[fieldname] = self.from_native_field(field_object)
return used_schema
def update(self, uri, repo, basefile, text, **kwargs):
if not self._writer:
self._writer = self.index.writer()
s = self.schema()
for key in kwargs:
# special-handling of the Resource type -- this is provided as
# a dict with 'iri' and 'label' keys, and we flatten it to a
# 2-element list (stored in an IDLIST)
if isinstance(s[key], Resource):
# might be multiple values, in which case we create a
# n-element list, still stored as IDLIST
if isinstance(kwargs[key], list):
# or if self._multiple[key]:
kwargs[key] = list(
itertools.chain.from_iterable([(x['iri'], x['label'])for x in kwargs[key]]))
else:
kwargs[key] = [kwargs[key]['iri'],
kwargs[key]['label']]
elif isinstance(s[key], Datetime):
if (isinstance(kwargs[key], date) and
not isinstance(kwargs[key], datetime)):
# convert date to datetime
kwargs[key] = datetime(kwargs[key].year,
kwargs[key].month,
kwargs[key].day)
self._writer.update_document(uri=uri,
repo=repo,
basefile=basefile,
text=text,
**kwargs)
def commit(self):
if self._writer:
self._writer.commit()
if not isinstance(self._writer, whoosh.writing.BufferedWriter):
# A bufferedWriter can be used again after commit(), a regular writer cannot
self._writer = None
def close(self):
self.commit()
self.index.close()
def doccount(self):
return self.index.doc_count()
def query(self, q=None, pagenum=1, pagelen=10, ac_query=False, exclude_types=None, **kwargs):
# 1: Filter on all specified fields (exact or by using ranges)
filter = []
for k, v in kwargs.items():
if isinstance(v, SearchModifier):
# Create a Range query
if isinstance(v.values[0], datetime):
cls = whoosh.query.DateRange
max = datetime(MAXYEAR, 12, 31)
min = datetime(MINYEAR, 1, 1)
else:
cls = whoosh.query.NumericRange
max = datetime(2**31)
min = datetime(0)
if isinstance(v, Less):
start = min
end = v.max
elif isinstance(v, More):
start = v.min
end = max
elif isinstance(v, Between):
start = v.min
end = v.max
filter.append(cls(k, start, end))
elif isinstance(v, str) and "*" in v:
filter.append(whoosh.query.Wildcard(k, v))
else:
# exact field match
#
# Things to handle: Keyword, Boolean, Resource (must
# be able to match on iri only)
filter.append(whoosh.query.Term(k, v))
# 3: If freetext param given, query on that
freetext = None
if q or not kwargs:
if not q:
q = "*"
searchfields = []
for fldname, fldtype in self.index.schema.items():
if isinstance(fldtype, whoosh.fields.TEXT):
searchfields.append(fldname)
mparser = whoosh.qparser.MultifieldParser(searchfields,
self.index.schema)
freetext = mparser.parse(q)
if filter:
if freetext:
filter.append(freetext)
query = whoosh.query.And(filter)
elif freetext:
query = freetext
else:
raise ValueError("Neither q or kwargs specified")
with self.index.searcher() as searcher:
page = searcher.search_page(query, pagenum, pagelen)
res = self._convert_result(page)
pager = {'pagenum': pagenum,
'pagecount': page.pagecount,
'firstresult': page.offset + 1,
'lastresult': page.offset + page.pagelen,
'totalresults': page.total}
return res, pager
def _convert_result(self, res):
# converts a whoosh.searching.ResultsPage object to a plain
# list of dicts
l = Results()
hl = whoosh.highlight.Highlighter(formatter=ElementsFormatter())
resourcefields = []
for key, fldobj in self.schema().items():
if isinstance(fldobj, Resource):
resourcefields.append(key)
for hit in res:
fields = hit.fields()
highlighted = hl.highlight_hit(hit, "text", fields['text'])
if highlighted:
fields['text'] = highlighted
else:
del fields['text']
# de-marschal Resource objects from list to dict
for key in resourcefields:
if key in fields:
# need to return a list of dicts if
# multiple_values was specified, and a simple dict
# otherwise... (note that just examining if
# len(fields[key]) == 2 isn't enough)
if self._multiple[key]:
fields[key] = [{'iri': x[0], 'label': x[1]}
for x in zip(fields[key][0::2], fields[key][1::2])]
else:
fields[key] = {'iri': fields[key][0],
'label': fields[key][1]}
l.append(fields)
return l
# Base class for a HTTP-based API (eg. ElasticSearch) the base class
# delegate the formulation of queries, updates etc to concrete
# subclasses, expected to return a formattted query/payload etc, and
# be able to decode responses to queries, but the base class handles
# the actual HTTP call, inc error handling.
class RemoteIndex(FulltextIndex):
defaultheaders = {}
# The only real implementation of RemoteIndex has its own exists
# implementation, no need for a general fallback impl.
# def exists(self):
# pass
def create(self, repos):
relurl, payload = self._create_schema_payload(repos)
# print("\ncreate: PUT %s\n%s\n" % (self.location + relurl, payload))
res = requests.put(self.location + relurl, payload, headers=self.defaultheaders)
try:
res.raise_for_status()
except Exception as e:
raise Exception("%s: %s" % (res.status_code, res.text))
def schema(self):
relurl, payload = self._get_schema_payload()
res = requests.get(self.location + relurl) # payload is
# probably never
# used
# print("GET %s" % relurl)
# print(json.dumps(res.json(), indent=4))
return self._decode_schema(res)
def update(self, uri, repo, basefile, text, **kwargs):
relurl, payload = self._update_payload(
uri, repo, basefile, text, **kwargs)
# print("update: PUT %s\n%s\n" % (self.location + relurl, payload[:80]))
res = requests.put(self.location + relurl, payload, headers=self.defaultheaders)
try:
res.raise_for_status()
except requests.exceptions.HTTPError as e:
raise errors.IndexingError(str(e) + ": '%s'" % res.text)
def doccount(self):
relurl, payload = self._count_payload()
if payload:
res = requsts.post(self.location + relurl, payload, headers=self.defaultheaders)
else:
res = requests.get(self.location + relurl)
return self._decode_count_result(res)
def query(self, q=None, pagenum=1, pagelen=10, ac_query=False, exclude_types=None, boost_types=None, **kwargs):
relurl, payload = self._query_payload(q, pagenum, pagelen, ac_query, exclude_types, boost_types, **kwargs)
if payload:
# print("query: POST %s:\n%s" % (self.location + relurl, payload))
res = requests.post(self.location + relurl, payload, headers=self.defaultheaders)
# print("Recieved:\n%s" % (json.dumps(res.json(),indent=4)))
else:
res = requests.get(self.location + relurl)
try:
res.raise_for_status()
except Exception as e:
raise errors.SearchingError("%s: %s" % (res.status_code, res.text))
return self._decode_query_result(res, pagenum, pagelen)
def destroy(self):
reluri, payload = self._destroy_payload()
res = requests.delete(self.location + reluri)
# these don't make no sense for a remote index accessed via HTTP/REST
def open(self):
pass
def commit(self):
pass
def close(self):
pass
class ElasticSearchIndex(RemoteIndex):
defaultheaders = {"Content-Type": "application/json"}
# maps our field classes to concrete ES field properties
fieldmapping = ((Identifier(),
{"type": "text", "store": True, "analyzer": "lowercase_keyword"}), # uri -- using type=text with analyzer=keyword (instead of type=keyword) enables us to use regex queries on this field, which is nice for autocomplete
(Label(),
{"type": "keyword"}), # repo, basefile
(Label(boost=16),
{"type": "text", "boost": 16.0, "analyzer": "my_analyzer", "fields": {
"keyword": {"type": "text", "analyzer": "lowercase_keyword"}
}}), # identifier
(Text(boost=4),
{"type": "text", "boost": 4.0}), # title
(Text(boost=2),
{"type": "text", "boost": 2.0}), # abstract
(Text(),
{"type": "text", "analyzer": "my_analyzer", "store": True}), # text
(Datetime(),
{"type": "date", "format": "dateOptionalTime"}),
(Boolean(),
{"type": "boolean"}),
(Resource(),
{"properties": {"iri": {"type": "keyword"},
"label": {"type": "keyword"}}}),
(Keyword(),
{"type": "keyword", "copy_to": ["keyword"]}),
(URI(),
{"type": "keyword", "boost": 1.1, "norms": True}),
(Integer(),
{"type": "long"}),
)
term_excludes = "excludes" # this key changed name
# "exclude"->"excludes" from 2.* to
# 5.*
fragment_size = 150
# a list of fieldnames (possibly with boost factors)
default_fields = ("label^3", "text")
def __init__(self, location, repos):
self._writer = None
self._repos = repos
super(ElasticSearchIndex, self).__init__(location, repos)
def close(self):
return self.commit()
def commit(self):
if not self._writer:
return # no pending changes to commit
self._writer.seek(0)
res = requests.put(self.location + "/_bulk", data=self._writer, headers=self.defaultheaders)
self._writer.close()
self._writer = None
try:
res.raise_for_status()
except requests.exceptions.HTTPError as e:
raise errors.IndexingError(str(e) + ": '%s'" % res.text)
# if the errors field is set to True, errors might have
# occurred even though the status code was 200
if res.json().get("errors"):
raise errors.IndexingError("%s errors when committing, first was %r" %
(len(res.json()["items"]),
res.json()["items"][0]))
# make sure everything is really comitted (available for
# search) before continuing? TODO: Check if this slows
# multi-basefile (and multi-threaded) indexing down noticably,
# we could just do it at the end of relate_all_teardown.
r = requests.post(self.location + "_refresh")
r.raise_for_status()
def exists(self):
r = requests.get(self.location + "_mapping/")
if r.status_code == 404:
return False
else:
return True
def _update_payload(self, uri, repo, basefile, text, **kwargs):
safe = ''
# quote (in python 2) only handles characters from 0x0 - 0xFF,
# and basefile might contain characters outside of that (eg
# u'MO\u0308D/P11463-12', which is MÖD/P11463-12 on a system
# which uses unicode normalization form NFD). To be safe,
# encodethe string to utf-8 beforehand (Which is what quote on
# python 3 does anyways)
if "#" in uri:
repo = repo + "_child"
relurl = "%s/%s" % (repo, quote(basefile.encode("utf-8"), safe=safe)) # eg type, id
if "#" in uri:
relurl += uri.split("#", 1)[1]
payload = {"uri": uri,
"basefile": basefile,
"text": text}
payload.update(kwargs)
return relurl, json.dumps(payload, default=util.json_default_date)
def update(self, uri, repo, basefile, text, **kwargs):
if not self._writer:
self._writer = tempfile.TemporaryFile()
relurl, payload = self._update_payload(
uri, repo, basefile, text, **kwargs)
metadata = {"index": {"_type": repo, "_id": basefile}}
extra = ""
if "#" in uri:
metadata["index"]['_type'] = repo + "_child"
metadata["index"]['_id'] += uri.split("#", 1)[1]
metadata["index"]['parent'] = basefile
extra = " (parent: %s)" % basefile
# print("index: %s, id: %s, uri: %s %s" % (metadata["index"]['_type'],
# metadata["index"]['_id'],
# uri, extra))
# print("Label: %s" % kwargs['label'])
# print("Text: %s" % text[:72])
# print("---------------------------------------")
metadata = json.dumps(metadata) + "\n"
assert "\n" not in payload, "payload contains newlines, must be encoded for bulk API"
self._writer.write(metadata.encode("utf-8"))
self._writer.write(payload.encode("utf-8"))
self._writer.write(b"\n")
def _query_payload(self, q, pagenum=1, pagelen=10, ac_query=False,
exclude_types=None, boost_types=None, **kwargs):
if kwargs.get("type"):
types = [kwargs.get("type")]
else:
types = [repo.alias for repo in self._repos if repo.config.relate]
if ac_query:
relurl = "_search?from=%s&size=%s" % ((pagenum - 1) * pagelen,
pagelen)
else:
# use a multitype search to specify the types we want so that
# we don't go searching in the foo_child types, only parent
# types.
relurl = "%s/_search?from=%s&size=%s" % (",".join(types),
(pagenum - 1) * pagelen,
pagelen)
# 1: Filter on all specified fields
filterterms = {}
filterregexps = {}
schema = self.schema()
for k, v in kwargs.items():
if isinstance(v, SearchModifier):
continue
if k in ("type", "repo"):
k = "_type"
elif k.endswith(".keyword"):
pass # leave as-is, don't try to look this up in schema
elif isinstance(schema[k], Resource):
# also map k to "%s.iri" % k if k is Resource
k += ".iri"
if isinstance(v, RegexString):
filterregexps[k] = str(v)
elif isinstance(v, str) and "*" in v:
# if v contains "*", make it a {'regexp': '.*/foo'} instead of a {'term'}
# also transform * to .* and escape '#' and '.'
filterregexps[k] = v.replace(".", "\\.").replace("#", "\\#").replace("*", ".*")
else:
filterterms[k] = v
# 2: Create filterranges if SearchModifier objects are used
filterranges = {}
for k, v in kwargs.items():
if not isinstance(v, SearchModifier):
continue
if isinstance(v, Less):
filterranges[k] = {"lt": v.max}
elif isinstance(v, More):
filterranges[k] = {"gt": v.min}
elif isinstance(v, Between):
filterranges[k] = {"lt": v.max,
"gt": v.min}
# 3: If freetext param given, search on that
match = {}
inner_hits = {"_source": {self.term_excludes: "text"}}
highlight = None
if q:
if not ac_query:
# NOTE: we need to specify highlight parameters for each
# subquery when using has_child, see
# https://github.com/elastic/elasticsearch/issues/14999 --
# still seems to be an issue with ES5.*
match['fields'] = self.default_fields
match['query'] = q
match['default_operator'] = "and"
match['analyzer'] = 'my_analyzer'
highlight = {'fields': {'text': {},
'label': {}},
'fragment_size': self.fragment_size,
'number_of_fragments': 2
}
inner_hits["highlight"] = highlight
# now, explode the match query into a big OR query for
# matching each possible _child type (until someone solves
# http://stackoverflow.com/questions/38946547 for me)
submatches = [{"simple_query_string": deepcopy(match)}]
for t in types:
submatches.append(
{"has_child": {"type": t + "_child",
"inner_hits": inner_hits,
"query": {
"bool": {
"must": {"simple_query_string": deepcopy(match)},
# some documents are put into the index
# purely to support ac_query
# (autocomplete). We don't need them in
# our main search results.
"must_not": {"term": {"role": "autocomplete"}}
}}}})
match = {"bool": {"should": submatches}}
else:
# ac_query -- need to work in inner_hits somehow
# also: sort by order if present
pass
else:
match = {"bool": {}}
if boost_types:
boost_functions = []
for _type, boost in boost_types:
boost_functions.append({"filter": {"term": {"_type": _type}},
"weight": boost})
if filterterms or filterregexps or filterranges:
filters = []
for key, val in (("term", filterterms),
("regexp", filterregexps),
("range", filterranges)):
filters.extend([{key: {k: v}} for (k, v) in val.items()])
if len(filters) > 1:
match["bool"]["must"] = {"bool": {"must": filters}}
else:
match["bool"]["must"] = filters[0]
if exclude_types:
match["bool"]["must_not"] = []
for exclude_type in exclude_types:
match["bool"]["must_not"].append({"type": {"value": exclude_type}})
if boost_types:
payload = {'query': {'function_score': {'functions': boost_functions,
'query': match}}}
else:
payload = {'query': match}
if not ac_query:
payload['aggs'] = self._aggregation_payload()
if q and "must" in match["bool"]:
# fixes staffanm/lagen.nu#69 by making sure that documents
# that matches the filter query (as a must clause) but
# does not score anything in the should query aren't
# counted. This shouldn't be used in AC queries since they
# only use filters, not freetext query parameters
#
# since we express our filter as a must clause (not a
# filter clause) it will add 1 to the score. We therefore
# require something more than just 1 in score.
payload["min_score"] = 1.01
# Don't include the full text of every document in every hit
if not ac_query:
payload['_source'] = {self.term_excludes: ['text']}
# extra workaround, solution adapted from comments in
# https://github.com/elastic/elasticsearch/issues/14999 --
# revisit once Elasticsearch 2.4 is released.
if highlight:
payload['highlight'] = deepcopy(highlight)
# if q:
# payload['highlight']['highlight_query'] = {'match': {'_all': q}}
# for autocomplete queries when not using any "natural
# language" queries (ie. only query based on a identifer like
# "TF 2:" -- in these cases we'd like to use natural order of
# the results if available
if ac_query and q is None:
payload['sort'] = [{"order": "asc"},
"_score"]
# temporary workaround -- the feature/sfs-history branch
# causes a lot of extra info in the index, inc expired
# versions, so we filter those
match['bool']['must_not'].append({"term": {"role": "expired"}})
return relurl, json.dumps(payload, indent=4, default=util.json_default_date)
def _aggregation_payload(self):
aggs = {'type': {'terms': {'field': '_type', 'size': 100}}}
for repo in self._repos:
if not repo.config.relate:
continue
for facet in repo.facets():
if (facet.dimension_label in ('creator', 'issued') and
facet.dimension_label not in aggs and
facet.dimension_type in ('year', 'ref', 'type')):
if facet.dimension_type == "year":
agg = {'date_histogram': {'field': facet.dimension_label,
'interval': 'year',
'format': 'yyyy',
'min_doc_count': 1}}
else:
agg = {'terms': {'field': facet.dimension_label, 'size': 100}}
aggs[facet.dimension_label] = agg
return aggs
def _decode_query_result(self, response, pagenum, pagelen):
# attempt to decode iso-formatted datetimes
# ("2013-02-14T14:06:00"). Note that this will incorrectly
# decode anything that looks like a ISO date, even though it
# might be typed as a string. We have no typing information
# (at this stage -- we could look at self.schema() though)
jsonresp = json.loads(response.text,
object_hook=util.make_json_date_object_hook())
res = Results()
for hit in jsonresp['hits']['hits']:
h = self._decode_query_result_hit(hit)
if "inner_hits" in hit:
for inner_hit_type in hit["inner_hits"].keys():
for inner_hit in hit["inner_hits"][inner_hit_type]["hits"]["hits"]:
if not "innerhits" in h:
h["innerhits"] = []
h["innerhits"].append(self._decode_query_result_hit(inner_hit))
res.append(h)
pager = {'pagenum': pagenum,
'pagecount': int(math.ceil(jsonresp['hits']['total'] / float(pagelen))),
'firstresult': (pagenum - 1) * pagelen + 1,
'lastresult': (pagenum - 1) * pagelen + len(jsonresp['hits']['hits']),
'totalresults': jsonresp['hits']['total']}
setattr(res, 'pagenum', pager['pagenum'])
setattr(res, 'pagecount', pager['pagecount'])
setattr(res, 'lastresult', pager['lastresult'])
setattr(res, 'totalresults', pager['totalresults'])
if 'aggregations' in jsonresp:
setattr(res, 'aggregations', jsonresp['aggregations'])
return res, pager
def _decode_query_result_hit(self, hit):
h = hit['_source']
h['repo'] = hit['_type']
if 'highlight' in hit:
for hlfield in ('text', 'label'):
if hlfield in hit['highlight']:
# wrap highlighted field in P, convert to
# elements.
hltext = re.sub("\s+", " ", " ... ".join([x.strip() for x in hit['highlight'][hlfield]]))
hltext = hltext.replace("<em>", "<strong class='match'>").replace("</em>", " </strong>")
# FIXME: BeautifulSoup/lxml returns empty soup if
# first char is '§' or some other non-ascii char (like
# a smart quote). Padding with a space makes problem
# disappear, but need to find root cause.
soup = BeautifulSoup("<p> %s</p>" % hltext, "lxml")
h[hlfield] = html.elements_from_soup(soup.html.body.p)
return h
def _count_payload(self):
return "_count", None
def _decode_count_result(self, response):
if response.status_code == 404:
return 0
else:
return response.json()['count']
def _get_schema_payload(self):
return "_mapping", None
def _decode_schema(self, response):
indexname = self.location.split("/")[-2]
mappings = response.json()[indexname]["mappings"]
schema = {}
# flatten the existing types (pay no mind to duplicate fields):
for typename, mapping in mappings.items():
for fieldname, fieldobject in mapping["properties"].items():
if fieldname == 'keyword':
# our copy_to: keyword definition for the Keyword
# indexed type dynamically creates a new
# field. Skip that.
continue
try:
schema[fieldname] = self.from_native_field(fieldobject)
except errors.SchemaMappingError as e:
# raise errors.SchemaMappingError("%s/%s: %s" % (typename, fieldname, str(e)))
# try to recover by using the repo's own definition instead
for repo in self._repos:
if repo.alias == typename:
break
else:
raise errors.SchemaMappingError("%s/%s: %s" % (typename, fieldname, str(e)))
g = repo.make_graph() # for qname lookup
for facet in repo.facets():
if facet.dimension_label:
fld = facet.dimension_label
else:
fld = g.qname(facet.rdftype).replace(":", "_")
if fld == fieldname:
schema[fld] = facet.indexingtype
self.log.error("%s/%s: native field %s couldn't be mapped, fell back on repo.facet.indexingtype" % (typename, fieldname, str(e)))
break
else:
raise errors.SchemaMappingError("%s/%s: %s (no suitable fallback facet)" % (typename, fieldname, str(e)))
schema["repo"] = self.get_default_schema()['repo']
return schema
def _create_schema_payload(self, repos):
language = {'en': 'English',
'sv': 'Swedish'}.get(repos[0].lang, "English")
payload = {
# cargo cult configuration
"settings": {"number_of_shards": 1,
"analysis": {
"analyzer": {
"my_analyzer": {
"filter": ["lowercase", "snowball"],
"tokenizer": "standard",
"type": "custom"
},
"lowercase_keyword": {
"tokenizer": "keyword",
"filter": ["lowercase"]
}
},
"filter": {
"snowball": {
"type": "snowball",
"language": language
}
}
}
},
"mappings": {}
}
for repo in repos:
if not repo.config.relate:
continue
g = repo.make_graph() # for qname lookup
es_fields = {}
schema = self.get_default_schema()
childschema = self.get_default_schema()
for facet in repo.facets():
if facet.dimension_label:
fld = facet.dimension_label
else:
fld = g.qname(facet.rdftype).replace(":", "_")
idxtype = facet.indexingtype
schema[fld] = idxtype
if not facet.toplevel_only:
childschema[fld] = idxtype
for key, fieldtype in schema.items():
if key == "repo":
continue # not really needed for ES, as type == repo.alias
es_fields[key] = self.to_native_field(fieldtype)
es_child_fields = {}
for key, fieldtype in childschema.items():
if key == "repo": continue
es_child_fields[key] = self.to_native_field(fieldtype)
# _source enabled so we can get the text back
payload["mappings"][repo.alias] = {"_source": {"enabled": True},
"_all": {"analyzer": "my_analyzer",
"store": True},
"properties": es_fields}
childmapping = {"_source": {"enabled": True},
"_all": {"analyzer": "my_analyzer",
"store": True},
"_parent": {"type": repo.alias},
"properties": es_child_fields
}
payload["mappings"][repo.alias+"_child"] = childmapping
return "", json.dumps(payload, indent=4)
def _destroy_payload(self):
return "", None
class ElasticSearch2x (ElasticSearchIndex):
# "Legacy" versions of ElasticSearch has a simpler text type ("string") and no keyword type
fieldmapping = ((Identifier(),
{"type": "string", "index": "not_analyzed", "store": True}), # uri
(Label(),
{"type": "string", "index": "not_analyzed", }), # repo, basefile
(Label(boost=16),
{"type": "string", "boost": 16.0, "index": "not_analyzed", "norms": {"enabled": True}}), # identifier
(Text(boost=4),
{"type": "string", "boost": 4.0, "index": "not_analyzed", "norms": {"enabled": True}}), # title
(Text(boost=2),
{"type": "string", "boost": 2.0, "index": "not_analyzed", "norms": {"enabled": True}}), # abstract
(Text(),
{"type": "string", "analyzer": "my_analyzer", "store": True}), # text
(Datetime(),
{"type": "date", "format": "dateOptionalTime"}),
(Boolean(),
{"type": "boolean"}),
(Resource(),
{"properties": {"iri": {"type": "string", "index": "not_analyzed"},
"label": {"type": "string", "index": "not_analyzed"}}}),
(Keyword(),
{"type": "string", "copy_to": ["keyword"]}),
(URI(),
{"type": "string", "index": "not_analyzed", "boost": 1.1, "norms": {"enabled": True}}),
)
term_excludes = "exclude"
# This override uses the old style filtering, which uses a
# filtered query as the top level query
# (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl-filtered-query.html),
# which was deprecated and removed in ES5
# http://stackoverflow.com/questions/40519806/no-query-registered-for-filtered
#
# NOTE: The "new" logic in the superclass ought to work on ES2
# servers as well, so maybe we should just remove this
# implementation.
def _query_payload(self, q, pagenum=1, pagelen=10, **kwargs):
if kwargs.get("repo"):
types = [kwargs.get("repo")]
else:
types = [repo.alias for repo in self._repos if repo.config.relate]
# use a multitype search to specify the types we want so that
# we don't go searching in the foo_child types, only parent
# types.
relurl = "%s/_search?from=%s&size=%s" % (",".join(types),
(pagenum - 1) * pagelen,
pagelen)
# 1: Filter on all specified fields
filterterms = {}
filterregexps = {}
schema = self.schema()
for k, v in kwargs.items():
if isinstance(v, SearchModifier):
continue
if k in ("type", "repo"): # FIXME: maybe should only be "repo"
k = "_type"
elif isinstance(schema[k], Resource):
# also map k to "%s.iri" % k if k is Resource
k += ".iri"
if isinstance(v, str) and "*" in v:
# if v contains "*", make it a {'regexp': '.*/foo'} instead of a {'term'}
# also transform * to .*
filterregexps[k] = v.replace("*", ".*")
else:
filterterms[k] = v
# 2: Create filterranges if SearchModifier objects are used
filterranges = {}
for k, v in kwargs.items():
if not isinstance(v, SearchModifier):
continue
if isinstance(v, Less):
filterranges[k] = {"lt": v.max}
elif isinstance(v, More):
filterranges[k] = {"gt": v.min}
elif isinstance(v, Between):
filterranges[k] = {"lt": v.max,
"gt": v.min}
# 3: If freetext param given, search on that
match = {}
inner_hits = {"_source": {self.term_excludes: "text"}}
highlight = None
if q:
# NOTE: we need to specify highlight parameters for each
# subquery when using has_child, see
# https://github.com/elastic/elasticsearch/issues/14999
match['fields'] = ["label", "text"]
match['query'] = q
match['default_operator'] = "and"
match['analyzer'] = "my_analyzer"
highlight = {'fields': {'text': {},
'label': {}},
'fragment_size': 150,
'number_of_fragments': 2
}
inner_hits["highlight"] = highlight
# now, explode the match query into a big OR query for
# matching each possible _child type (until someone solves
# http://stackoverflow.com/questions/38946547 for me)
submatches = [{"simple_query_string": deepcopy(match)}]
if kwargs.get("repo"):
reponames = [kwargs.get("repo")]
else:
reponames = [repo.alias for repo in self._repos if repo.config.relate]
for reponame in reponames:
submatches.append(
{"has_child": {"type": reponame + "_child",
"inner_hits": inner_hits,
"query": {"simple_query_string": deepcopy(match)}
}})
match = {"bool": {"should": submatches}}
if filterterms or filterregexps or filterranges:
query = {"filtered":
{"filter": {}
}
}
filters = []
for key, val in (("term", filterterms),
("regexp", filterregexps),
("range", filterranges)):
filters.extend([{key: {k: v}} for (k, v) in val.items()])
if len(filters) > 1:
query["filtered"]["filter"]["bool"] = {"must": filters}
else:
query["filtered"]["filter"] = filters[0]
if match:
query["filtered"]["query"] = match
else:
query = match
payload = {'query': query,
'aggs': self._aggregation_payload()}
payload['_source'] = {self.term_excludes: ['text']}
payload['highlight'] = deepcopy(highlight)
return relurl, json.dumps(payload, indent=4, default=util.json_default_date)
FulltextIndex.indextypes = {'WHOOSH': WhooshIndex,
'ELASTICSEARCH': ElasticSearchIndex,
'ELASTICSEARCH2': ElasticSearch2x}