# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
import collections
import inspect
import six
from six import text_type as str
from ferenda.errors import FSMStateError
[docs]class FSMParser():
"""A configurable finite state machine (FSM) for parsing documents
with nested structure. You provide a set of *recognizers*, a set
of *constructors*, a *transition table* and a *stream* of document
text chunks, and it returns a hierarchical document object
structure.
See :doc:`../fsmparser`.
"""
def __init__(self):
self.debug = False
self.transitions = None # set by set_transitions
self.recognizers = None # set by set_recognizers() or set_transitions()
self.reader = None # set by parse()
# somewhat magic
self.initial_state = None
self.initial_constructor = None
# pseudo-internal
self._state_stack = []
def _debug(self, msg):
"""Prints a debug message, indented to show how far down in the nested structure we are"""
if self.debug:
stack = inspect.stack()
calling_frame = [x[3] for x in stack][1]
relative_depth = len(self._state_stack)
# print("%s[%s(%r)] %s" % (". " * relative_depth, calling_frame, self._state_stack, msg))
state = "/".join(self._state_stack)
print("%s/%s(): %s" % (state, calling_frame, msg))
[docs] def set_recognizers(self, *args):
"""Set the list of functions (or other callables) used in
order to recognize symbols from the stream of text
chunks. Recognizers are tried in the order specified here."""
self.recognizers = args
[docs] def set_transitions(self, transitions):
"""Set the transition table for the state matchine.
:param transitions: The transition table, in the form of a mapping between two tuples. The first tuple should be the current state (or a list of possible current states) and a callable function that determines if a particular symbol is recognized ``(currentstate, recognizer)``. The second tuple should be a constructor function (or `False```) and the new state to transition into.
"""
self.transitions = {}
for (before, after) in transitions.items():
(before_states, recognizer) = before
if not callable(after):
(constructor, after_state) = after
assert (constructor == False) or callable(
constructor), "Specified constructor %r not callable" % constructor
assert callable(recognizer), "Specified recognizer %r not callable" % recognizer
if (not isinstance(before_states, (list, tuple))):
before_states = [before_states]
for before_state in before_states:
if callable(after):
self._debug("%r,%s() -> %s()" %
(before_state, recognizer.__name__, after.__name__))
elif callable(after[0]):
self._debug("%r,%s() -> %s(), %r" %
(before_state, recognizer.__name__, after[0].__name__, after[1]))
else:
self._debug("%r,%s() -> %r, %r" %
(before_state, recognizer.__name__, after[0], after[1]))
self.transitions[(before_state, recognizer)] = after
[docs] def parse(self, chunks):
"""Parse a document in the form of an iterable of suitable
chunks -- often lines or elements. each chunk should be a
string or a string-like obje ct. Some examples::
p = FSMParser()
reader = TextReader("foo.txt")
body = p.parse(reader.getiterator(reader.readparagraph),"body", make_body)
body = p.parse(BeautifulSoup("foo.html").find_all("#main p"), "body", make_body)
body = p.parse(ElementTree.parse("foo.xml").find(".//paragraph"), "body", make_body)
:param chunks: The document to be parsed, as a list or any other
iterable of text-like objects.
:param initialstate: The initial state for the machine. The
state must be present in the transition
table. This could be any object, but strings are
preferrable as they make error messages
easier to understand.
:param initialconstructor: A function that creates a document
root object, and then fills it with
child objects using
.make_children()
:type initialconstructor: callable
:returns: A document object tree.
"""
self._debug("Starting parse")
self.reader = Peekable(chunks)
self._state_stack = [self.initial_state]
return self.initial_constructor(self)
[docs] def analyze_symbol(self):
"""Internal function used by make_children()"""
try:
rawchunk = self.reader.peek()
chunk = str(rawchunk)
# chunk = repr(rawchunk)
if len(chunk) > 90:
# chunk = chunk[:25] + "[...]" + chunk[-10:]
seg = (chunk[:25], chunk[-10:])
try:
chunk = "%s [...] %s" % seg
except UnicodeDecodeError:
chunk = "%r [...] %r" % seg
else:
chunk = chunk
except StopIteration:
self._debug("We're done!")
return None
applicable_tmp = [x[1] for x in self.transitions.keys() if x[0] == self._state_stack[-1]]
# Create correct sorting of applicable_recognizers
applicable_recognizers = []
for recognizer in self.recognizers:
if recognizer in applicable_tmp:
applicable_recognizers.append(recognizer)
applicable_display = ", ".join([x.__name__ for x in applicable_recognizers])
for recognizer in applicable_recognizers:
if recognizer(self):
self._debug("Tested '%s' against %s -> %s " %
(chunk, applicable_display,
recognizer.__name__))
# self._debug("%r -> %s" % (chunk, recognizer.__name__))
return recognizer
raise FSMStateError("No recognizer match for %s (tried %s)" % (chunk, applicable_display))
[docs] def transition(self, currentstate, symbol):
"""Internal function used by make_children()"""
assert (currentstate, symbol) in self.transitions, "(%r, %r) should be in self.transitions" % (currentstate, symbol)
t = self.transitions[(currentstate, symbol)]
if callable(t):
return t(symbol, self._state_stack)
else:
return t
[docs] def make_child(self, constructor, childstate):
"""Internal function used by make_children(), which calls one
of the constructors defined in the transition table."""
if not childstate:
childstate = self._state_stack[-1]
# self._debug("calling child constructor %s" % constructor.__name__)
else:
# self._debug("calling child constructor %s in state %r" %
# (constructor.__name__, childstate))
pass
self._state_stack.append(childstate)
ret = constructor(self)
self._state_stack.pop() # do something with this?
return ret
[docs] def make_children(self, parent):
"""Creates child nodes for the current (parent) document node.
:param parent: The parent document node, as any list-like object
(preferrably a subclass of
:py:class:`ferenda.elements.CompoundElement`)
:returns: The same ``parent`` object.
"""
self._debug("Making children for %s" % parent.__class__.__name__)
while True: # we'll break out of this when transition()
# returns a constructor that is False
symbol = self.analyze_symbol()
if symbol is None: # no more symbols
self._debug("We're done!")
return parent
(constructor, newstate) = self.transition(self._state_stack[-1],
symbol)
if constructor is False:
self._debug("transition(%r,%s()) -> (False,%r)" %
(self._state_stack[-1], symbol.__name__, newstate))
else:
self._debug("transition(%r,%s()) -> (%s(),%r)" %
(self._state_stack[-1], symbol.__name__,
constructor.__name__, newstate))
# if transition() indicated that we should change state,
# first find out whether the constructor will call
# make_child, creating a new stack frame. This is
# indicated by the callable having the 'newstate'
# attribute (set by the @ferenda.decorators.newstate
# decorator)
if newstate and not hasattr(constructor, 'newstate'):
self._debug("Changing top of state stack (%r->%r)" %
(self._state_stack[-1], newstate))
self._state_stack[-1] = newstate
if constructor:
element = self.make_child(constructor, newstate)
if element is not None:
parent.append(element)
else:
# special weird hack - set the state we'll be
# returning to by manipulating self._state_stack
# FIXME: we have no regular test case for this path,
# but integrationRFC excercises it
if newstate:
self._debug("Changing the state we'll return to (self._state_stack[-2])")
self._debug(" (from %r to %r)" % (self._state_stack[-2], newstate))
self._state_stack[-2] = newstate
return parent
# inspired by recipe 19.18 in the python cookbook. A implementation detail helper for FSMParser.
class Peekable(six.Iterator):
def __init__(self, iterable):
self._iterable = iter(iterable)
self._cache = collections.deque()
def __iter__(self):
return self
def _fillcache(self):
while len(self._cache) < 1:
self._cache.append(six.advance_iterator(self._iterable))
def __next__(self):
self._fillcache()
result = self._cache.popleft()
return result
# useful alias
next = __next__
def peek(self):
self._fillcache()
result = self._cache[0]
return result