Source code for ferenda.sources.legal.se.myndfskr

# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function

import os
import re
import logging
import codecs
from tempfile import mktemp
from xml.sax.saxutils import escape as xml_escape

from rdflib import Graph, URIRef, Literal
from bs4 import BeautifulSoup
import requests
import six

from ferenda import TextReader
from ferenda.sources.legal.se.legalref import LegalRef
from ferenda import util
from . import SwedishLegalSource


class MyndFskr(SwedishLegalSource):

    """A abstract base class for fetching and parsing regulations from
various swedish government agencies. These PDF documents often have
a similar structure both graphically and linguistically, enabling us
to parse them in a generalized way. (Downloading them often requires
special-case code, though.)"""
    source_encoding = "utf-8"
    downloaded_suffix = ".pdf"
    alias = 'myndfskr'

    def download(self, basefile=None):
        """Simple default implementation that downloads all PDF files
        from self.start_url that look like regulation document
        numbers."""
        resp = requests.get(self.start_url)
        # regex to search the link url, text or title for something
        # looking like a FS number
        re_fsnr = re.compile('(\d{4})[:/_-](\d+)(|\.\w+)$')
        tree = lxml.html.document_fromstring(resp.text)
        tree.make_links_absolute(url, resolve_base_href=True)
        for element, attribute, link, pos in tree.iterlinks():
            if link[-4:].lower() != ".pdf":
                continue
            done = False
            # print "Examining %s"  % link
            attrs = dict(link.attrs)
            flds = [link.url, link.text]
            if 'title' in attrs:
                flds.append(attrs['title'])
            for fld in flds:
                if re_fsnr.search(fld) and not done:
                    m = re_fsnr.search(fld)
                    # Make sure we end up with "2011:4" rather than
                    # "2011:04"
                    basefile = "%s:%s" % (m.group(1), int(m.group(2)))
                    self.download_single(basefile, usecache, link.absolute_url)
                    done = True

    def canonical_uri(self, basefile):
        # The canonical URI for these documents cannot always be
        # computed from the basefile. Find the primary subject of the
        # distilled RDF graph instead.
        if not os.path.exists(self.store.distilled_path(basefile)):
            return None

        g = Graph()
        g.parse(self.store.distilled_path(basefile))
        subjects = list(g.subject_objects(self.ns['rdf']['type']))

        if subjects:
            return str(subjects[0][0])
        else:
            self.log.warning(
                "No canonical uri in %s" % (self.distilled_path(basefile)))
            # fall back
            return super(MyndFskr, self).canonical_uri(basefile)

    def textreader_from_basefile(self, basefile, encoding):
        infile = self.store.downloaded_path(basefile)
        tmpfile = self.store.path(basefile, 'intermediate', '.pdf')
        outfile = self.store.path(basefile, 'intermediate', '.txt')
        util.copy_if_different(infile, tmpfile)
        util.runcmd("pdftotext %s" % tmpfile, require_success=True)
        util.robust_remove(tmpfile)

        return TextReader(outfile, encoding=encoding, linesep=TextReader.UNIX)

    def rpubl_uri_transform(self, s):
        # Inspired by
        # http://code.activestate.com/recipes/81330-single-pass-multiple-replace/
        table = {'å': 'aa',
                 'ä': 'ae',
                 'ö': 'oe'}
        r = re.compile("|".join(list(table.keys())))
        # return r.sub(lambda f: table[f.string[f.start():f.end()]], s.lower())
        return r.sub(lambda m: table[m.group(0)], s.lower())

    def download_resource_lists(self, resource_url, graph_path):
        hdr = self._addheaders()
        hdr['Accept'] = 'application/rdf+xml'
        resp = requests.get(resource_url, headers=hdr)
        g = Graph()
        g.parse(data=resp.text, format="xml")
        for subj in g.subjects(self.ns['rdf'].type,
                               self.ns['rpubl'].Forfattningssamling):
            resp = requests.get(str(subj), headers=hdr)
            resp.encoding = "utf-8"
            g.parse(data=resp.text, format="xml")
        with open(graph_path, "wb") as fp:
            data = g.serialize(format="xml")
            fp.write(data)

    def parse_from_textreader(self, reader, basefile):
        tracelog = logging.getLogger("%s.tracelog" % self.alias)

        doc = self.make_document(basefile)
        g = doc.meta

        # 1.2: Load known entities and their URIs (we have to add some
        # that are not yet in the official resource lists
        resource_list_file = self.store.path('resourcelist', 'intermediate', '.rdf')
        if not os.path.exists(resource_list_file):
            self.download_resource_lists("http://service.lagrummet.se/var/common",
                                         resource_list_file)
        resources = Graph()
        resources.parse(resource_list_file, format="xml")

        # 1.3: Define regexps for the data we search for.
        fwdtests = {'dcterms:issn': ['^ISSN (\d+\-\d+)$'],
                    'dcterms:title': ['((?:Föreskrifter|[\w ]+s (?:föreskrifter|allmänna råd)).*?)\n\n'],
                    'dcterms:identifier': ['^([A-ZÅÄÖ-]+FS\s\s?\d{4}:\d+)$'],
                    'rpubl:utkomFranTryck': ['Utkom från\strycket\s+den\s(\d+ \w+ \d{4})'],
                    'rpubl:omtryckAv': ['^(Omtryck)$'],
                    'rpubl:genomforDirektiv': ['Celex (3\d{2,4}\w\d{4})'],
                    'rpubl:beslutsdatum': ['(?:har beslutats|beslutade|beslutat) den (\d+ \w+ \d{4})'],
                    'rpubl:beslutadAv': ['\n([A-ZÅÄÖ][\w ]+?)\d? (?:meddelar|lämnar|föreskriver)',
                                         '\s(?:meddelar|föreskriver) ([A-ZÅÄÖ][\w ]+?)\d?\s'],
                    'rpubl:bemyndigande': [' ?(?:meddelar|föreskriver|Föreskrifterna meddelas|Föreskrifterna upphävs)\d?,? (?:följande |)med stöd av\s(.*?) ?(?:att|efter\ssamråd|dels|följande|i fråga om|och lämnar allmänna råd|och beslutar följande allmänna råd|\.\n)',
                                           '^Med stöd av (.*)\s(?:meddelar|föreskriver)']
                    }

        # 2: Find metadata properties

        # 2.1 Find some of the properties on the first page (or the
        # 2nd, or 3rd... continue past TOC pages, cover pages etc
        # until the "real" first page is found) NB: FFFS 2007:1 has
        # ten (10) TOC pages!
        pagecount = 0
        for page in reader.getiterator(reader.readpage):
            # replace single newlines with spaces, but keep double
            # newlines
            # page = "\n\n".join([util.normalize_space(x) for x in page.split("\n\n")])
            pagecount += 1
            props = {}
            for (prop, tests) in list(fwdtests.items()):
                if prop in props:
                    continue
                for test in tests:
                    m = re.search(
                        test, page, re.MULTILINE | re.DOTALL | re.UNICODE)
                    if m:
                        props[prop] = util.normalize_space(m.group(1))
            # Single required propery. If we find this, we're done
            if 'rpubl:beslutsdatum' in props:
                break
            self.log.warning("%s: Couldn't find required props on page %s" %
                             (basefile, pagecount))

        # 2.2 Find some of the properties on the last 'real' page (not
        # counting appendicies)
        reader.seek(0)
        pagesrev = reversed(list(reader.getiterator(reader.readpage)))
        # The language used to expres these two properties differ
        # quite a lot, more than what is reasonable to express in a
        # single regex. We therefore define a set of possible
        # expressions and try them in turn.
        revtests = {'rpubl:ikrafttradandedatum':
                    ['(?:Denna författning|Dessa föreskrifter|Dessa allmänna råd|Dessa föreskrifter och allmänna råd)\d* träder i ?kraft den (\d+ \w+ \d{4})',
                     'Dessa föreskrifter träder i kraft, (?:.*), i övrigt den (\d+ \w+ \d{4})',
                     'ska(?:ll|)\supphöra att gälla (?:den |)(\d+ \w+ \d{4}|denna dag|vid utgången av \w+ \d{4})',
                     'träder i kraft den dag då författningen enligt uppgift på den (utkom från trycket)'],
                    'rpubl:upphaver':
                    ['träder i kraft den (?:\d+ \w+ \d{4}), då(.*)ska upphöra att gälla',
                     'ska(?:ll|)\supphöra att gälla vid utgången av \w+ \d{4}, nämligen(.*?)\n\n',
                     'att (.*) skall upphöra att gälla (denna dag|vid utgången av \w+ \d{4})']
                    }

        cnt = 0
        for page in pagesrev:
            cnt += 1
            # Normalize the whitespace in each paragraph so that a
            # linebreak in the middle of the natural language
            # expression doesn't break our regexes.
            page = "\n\n".join(
                [util.normalize_space(x) for x in page.split("\n\n")])

            for (prop, tests) in list(revtests.items()):
                if prop in props:
                    continue
                for test in tests:
                    # Not re.DOTALL -- we've normalized whitespace and
                    # don't want to match across paragraphs
                    m = re.search(test, page, re.MULTILINE | re.UNICODE)
                    if m:
                        props[prop] = util.normalize_space(m.group(1))
                        # print u"%s: '%s' resulted in match '%s' at page %s from end" %
                        # (prop,test,props[prop], cnt)

            # Single required propery. If we find this, we're done
            if 'rpubl:ikrafttradandedatum' in props:
                break

        # 3: Clean up data - converting strings to Literals or
        # URIRefs, find legal references, etc
        if 'dcterms:identifier' in props:
            (publication, year, ordinal) = re.split('[ :]',
                                                    props['dcterms:identifier'])
            # FIXME: Read resources graph instead
            fs = resources.value(predicate=self.ns['skos'].altLabel,
                                 object=Literal(publication, lang='sv'))
            props['rpubl:forfattningssamling'] = fs
            publ = resources.value(subject=fs,
                                   predicate=self.ns['dcterms'].publisher)
            props['dcterms:publisher'] = publ

            props['rpubl:arsutgava'] = Literal(
                year)  # conversion to int, date not needed
            props['rpubl:lopnummer'] = Literal(ordinal)
            props['dcterms:identifier'] = Literal(props['dcterms:identifier'])

            # Now we can mint the uri (should be done through LegalURI)
            uri = ("http://rinfo.lagrummet.se/publ/%s/%s:%s" %
                   (props['rpubl:forfattningssamling'].split('/')[-1],
                    props['rpubl:arsutgava'],
                    props['rpubl:lopnummer']))
            self.log.debug("URI: %s" % uri)
        else:
            self.log.error(
                "Couldn't find dcterms:identifier, cannot create URI, giving up")
            return None

        tracelog.info("Cleaning rpubl:beslutadAv")
        if 'rpubl:beslutadAv' in props:
            agency = resources.value(predicate=self.ns['foaf'].name,
                                     object=Literal(props['rpubl:beslutadAv'],
                                                    lang="sv"))
            if agency:
                props['rpubl:beslutadAv'] = agency
            else:
                self.log.warning(
                    "Cannot find URI for rpubl:beslutadAv value %r" % props['rpubl:beslutadAv'])
                del props['rpubl:beslutadAv']

        tracelog.info("Cleaning dcterms:issn")
        if 'dcterms:issn' in props:
            props['dcterms:issn'] = Literal(props['dcterms:issn'])

        tracelog.info("Cleaning dcterms:title")

        # common false positive
        if 'dcterms:title' in props and 'denna f\xf6rfattning har beslutats den' in props['dcterms:title']:
            del props['dcterms:title']

        if 'dcterms:title' in props:
            tracelog.info("Inspecting dcterms:title %r" % props['dcterms:title'])
            # sometimes the title isn't separated with two newlines from the rest of the text
            if "\nbeslutade den " in props['dcterms:title']:
                props['dcterms:title'] = props[
                    'dcterms:title'].split("\nbeslutade den ")[0]
            props['dcterms:title'] = Literal(
                util.normalize_space(props['dcterms:title']), lang="sv")

            if re.search('^(Föreskrifter|[\w ]+s föreskrifter) om ändring i ', props['dcterms:title'], re.UNICODE):
                tracelog.info("Finding rpubl:andrar in dcterms:title")
                orig = re.search(
                    '([A-ZÅÄÖ-]+FS \d{4}:\d+)', props['dcterms:title']).group(0)
                (publication, year, ordinal) = re.split('[ :]', orig)
                origuri = "http://rinfo.lagrummet.se/publ/%s/%s:%s" % (self.rpubl_uri_transform(publication),
                                                                       year, ordinal)
                props['rpubl:andrar'] = URIRef(origuri)
                if 'rpubl:omtryckAv' in props:
                    props['rpubl:omtryckAv'] = URIRef(origuri)
            if (re.search('^(Föreskrifter|[\w ]+s föreskrifter) om upphävande av', props['dcterms:title'], re.UNICODE)
                    and not 'rpubl:upphaver' in props):
                tracelog.info("Finding rpubl:upphaver in dcterms:title")
                props['rpubl:upphaver'] = six.text_type(
                    props['dcterms:title'])  # cleaned below

        tracelog.info("Cleaning date properties")
        for prop in ('rpubl:utkomFranTryck', 'rpubl:beslutsdatum', 'rpubl:ikrafttradandedatum'):
            if prop in props:
                if (props[prop] == 'denna dag' and
                        prop == 'rpubl:ikrafttradandedatum'):
                    props[prop] = props['rpubl:beslutsdatum']
                elif (props[prop] == 'utkom från trycket' and
                      prop == 'rpubl:ikrafttradandedatum'):
                    props[prop] = props['rpubl:utkomFranTryck']
                else:
                    props[prop] = Literal(
                        self.parse_swedish_date(props[prop].lower()))

        tracelog.info("Cleaning rpubl:genomforDirektiv")
        if 'rpubl:genomforDirektiv' in props:
            props['rpubl:genomforDirektiv'] = URIRef("http://rinfo.lagrummet.se/ext/eur-lex/%s" %
                                                     props['rpubl:genomforDirektiv'])

        tracelog.info("Cleaning rpubl:bemyndigande")
        has_bemyndiganden = False

        if 'rpubl:bemyndigande' in props:
            # SimpleParse can't handle unicode endash sign, transform
            # into regular ascii hyphen
            props['rpubl:bemyndigande'] = props[
                'rpubl:bemyndigande'].replace('\u2013', '-')
            parser = LegalRef(LegalRef.LAGRUM)
            result = parser.parse(props['rpubl:bemyndigande'])
            bemyndigande_uris = [x.uri for x in result if hasattr(x, 'uri')]

            # some of these uris need to be filtered away due to
            # over-matching by parser.parse
            filtered_bemyndigande_uris = []
            for bem_uri in bemyndigande_uris:
                keep = True
                for compare in bemyndigande_uris:
                    if (len(compare) > len(bem_uri) and
                            compare.startswith(bem_uri)):
                        keep = False
                if keep:
                    filtered_bemyndigande_uris.append(bem_uri)

            for bem_uri in filtered_bemyndigande_uris:
                g.add((URIRef(
                    uri), self.ns['rpubl']['bemyndigande'], URIRef(bem_uri)))
                has_bemyndiganden = True
            del props['rpubl:bemyndigande']

        tracelog.info("Cleaning rpubl:upphaver")
        if 'rpubl:upphaver' in props:
            for upph in re.findall('([A-ZÅÄÖ-]+FS \d{4}:\d+)', util.normalize_space(props['rpubl:upphaver'])):
                (publication, year, ordinal) = re.split('[ :]', upph)
                upphuri = "http://rinfo.lagrummet.se/publ/%s/%s:%s" % (publication.lower(),
                                                                       year, ordinal)
                g.add((URIRef(
                    uri), self.ns['rpubl']['upphaver'], URIRef(upphuri)))
            del props['rpubl:upphaver']

        tracelog.info("Deciding rdf:type")
        if ('dcterms:title' in props and
            "allmänna råd" in props['dcterms:title'] and
                not "föreskrifter" in props['dcterms:title']):
            props['rdf:type'] = self.ns['rpubl']['AllmannaRad']
        else:
            props['rdf:type'] = self.ns['rpubl']['Myndighetsforeskrift']

        # 3.5: Check to see that we have all properties that we expect
        # (should maybe be done elsewhere later?)
        tracelog.info("Checking required properties")
        for prop in ('dcterms:identifier', 'dcterms:title', 'rpubl:arsutgava',
                     'dcterms:publisher', 'rpubl:beslutadAv', 'rpubl:beslutsdatum',
                     'rpubl:forfattningssamling', 'rpubl:ikrafttradandedatum',
                     'rpubl:lopnummer', 'rpubl:utkomFranTryck'):
            if not prop in props:
                self.log.warning("%s: Failed to find %s" % (basefile, prop))

        tracelog.info("Checking rpubl:bemyndigande")
        if props['rdf:type'] == self.ns['rpubl']['Myndighetsforeskrift']:
            if not has_bemyndiganden:
                self.log.warning(
                    "%s: Failed to find rpubl:bemyndigande" % (basefile))

        # 4: Add the cleaned data to a RDFLib Graph
        # (maybe we should do that as early as possible?)
        tracelog.info("Adding items to rdflib.Graph")
        for (prop, value) in list(props.items()):
            (prefix, term) = prop.split(":", 1)
            p = self.ns[prefix][term]
            if not (isinstance(value, URIRef) or isinstance(value, Literal)):
                self.log.warning("%s: %s is a %s, not a URIRef or Literal" %
                                 (basefile, prop, type(value)))
            g.add((URIRef(uri), p, value))

        # 5: Create data for the body, removing various control characters
        # TODO: Use pdftohtml to create a nice viewable HTML
        # version instead of this plaintext stuff
        reader.seek(0)
        body = []

        # A fairly involved way of filtering out all control
        # characters from a string
        import unicodedata
        if six.PY3:
            all_chars = (chr(i) for i in range(0x10000))
        else:
            all_chars = (unichr(i) for i in range(0x10000))
        control_chars = ''.join(
            c for c in all_chars if unicodedata.category(c) == 'Cc')
        # tab and newline are technically Control characters in
        # unicode, but we want to keep them.
        control_chars = control_chars.replace("\t", "").replace("\n", "")

        control_char_re = re.compile('[%s]' % re.escape(control_chars))
        for page in reader.getiterator(reader.readpage):
            text = xml_escape(control_char_re.sub('', page))
            body.append("<pre>%s</pre>\n\n" % text)

        # 5: Done!
        #
        doc.body = body
        doc.uri = uri
        return doc

    def tabs(cls, primary=False):
        return [['Myndighetsföreskrifter', '/myndfskr/']]


[docs]class SJVFS(MyndFskr): alias = "sjvfs" start_url = "http://www.jordbruksverket.se/forfattningar/forfattningssamling.4.5aec661121e2613852800012537.html" def download(self, basefile=None): soup = BeautifulSoup(requests.get(self.start_url).text) main = soup.find("ul", "c112") extra = [] for a in list(main.findAll("a")): url = urllib.parse.urljoin(self.start_url, a['href']) self.log.info("Fetching %s %s" % (a.text, url)) extra.extend(self.download_indexpage(url, usecache=usecache)) extra2 = [] for url in list(set(extra)): self.log.info("Extrafetching %s" % (url)) extra2.extend(self.download_indexpage(url, usecache=usecache)) for url in list(set(extra2)): self.log.info("Extra2fetching %s" % (url)) self.download_indexpage(url, usecache=usecache) def download_indexpage(self, url): subsoup = BeautifulSoup(requests.get(url).text) submain = subsoup.find("div", "pagecontent") extrapages = [] for a in submain.findAll("a"): if a['href'].endswith(".pdf") or a['href'].endswith(".PDF"): if re.search('\d{4}:\d+', a.text): m = re.search('(\w+FS|) ?(\d{4}:\d+)', a.text) fs = m.group(1).lower() fsnr = m.group(2) if not fs: fs = "sjvfs" basefile = "%s/%s" % (fs, fsnr) suburl = urllib.parse.unquote( urllib.parse.urljoin(url, a['href'])).encode('utf-8') self.download_single( basefile, usecache=usecache, url=suburl) elif a.text == "Besult": basefile = a.findParent( "td").findPreviousSibling("td").find("a").text self.log.debug( "Will download beslut to %s (later)" % basefile) elif a.text == "Bilaga": basefile = a.findParent( "td").findPreviousSibling("td").find("a").text self.log.debug( "Will download bilaga to %s (later)" % basefile) elif a.text == "Rättelseblad": basefile = a.findParent( "td").findPreviousSibling("td").find("a").text self.log.debug( "Will download rättelseblad to %s (later)" % basefile) else: self.log.debug("I don't know what to do with %s" % a.text) else: suburl = urljoin(url, a['href']) extrapages.append(suburl) return extrapages
[docs]class DVFS(MyndFskr): alias = "dvfs"
[docs]class FFFS(MyndFskr): alias = "fffs" start_url = "http://www.fi.se/Regler/FIs-forfattningar/Forteckning-FFFS/" document_url = "http://www.fi.se/Regler/FIs-forfattningar/Samtliga-forfattningar/%s/" def download(self, basefile=None): soup = BeautifulSoup(requests.get(self.start_url).text) main = soup.find(id="mainarea") docs = [] for numberlabel in main.findAll(text='NUMMER'): numberdiv = numberlabel.findParent('div').parent typediv = numberdiv.findNextSibling() if typediv.find('div', 'FFFSListAreaLeft').get_text(strip=True) != "TYP": self.log.error("Expected TYP in div, found %s" % typediv.get_text(strip=True)) continue titlediv = typediv.findNextSibling() if titlediv.find('div', 'FFFSListAreaLeft').get_text(strip=True) != "RUBRIK": self.log.error("Expected RUBRIK in div, found %s" % titlediv.get_text(strip=True)) continue number = numberdiv.find('div', 'FFFSListAreaRight').get_text(strip=True) tmpfile = mktemp() snippetfile = self.store.downloaded_path( number).replace(".pdf", ".snippet.html") fp = codecs.open(tmpfile, "w", encoding="utf-8") fp.write(str(numberdiv)) fp.write(str(typediv)) fp.write(str(titlediv)) fp.close() util.replace_if_different(tmpfile, snippetfile) self.download_single(number, usecache) def download_single(self, basefile, usecache=False): self.log.debug("%s: download_single..." % basefile) pdffile = self.store.downloaded_path(basefile) existed = os.path.exists(pdffile) if usecache and existed: self.log.debug("%s: already exists, not downloading" % basefile) return snippetfile = pdffile.replace(".pdf", ".snippet.html") descriptionfile = pdffile.replace(".pdf", ".html") soup = BeautifulSoup(open(snippetfile)) href = soup.find(text="RUBRIK").findParent( "div").findPreviousSibling().find('a')['href'] url = urljoin("http://www.fi.se/Regler/FIs-forfattningar/Forteckning-FFFS/", href) if href.endswith(".pdf"): if self.download_if_needed(url, pdffile): if existed: self.log.info("%s: downloaded new version from %s" % (basefile, url)) else: self.log.info("%s: downloaded from %s" % (basefile, url)) elif "/Samtliga-forfattningar/" in href: self.log.debug("%s: Separate page" % basefile) self.download_if_needed(url, descriptionfile) soup = BeautifulSoup(open(descriptionfile)) for link in soup.find("div", id="mainarea").findAll("a"): suburl = urljoin(url, link['href']).replace(" ", "%20") if link.text == 'Grundförfattning': if self.download_if_needed(suburl, pdffile): self.log.info("%s: downloaded main PDF" % basefile) elif link.text == 'Konsoliderad version': conspdffile = pdffile.replace(".pdf", "_k.pdf") if self.download_if_needed(suburl, conspdffile): self.log.info( "%s: downloaded consolidated PDF" % basefile) elif link.text == 'Ändringsförfattning': self.log.info("Skipping change regulation") elif link['href'].endswith(".pdf"): filename = link['href'].split("/")[-1] otherpdffile = pdffile.replace(".pdf", "-" + filename) if self.download_if_needed(suburl, otherpdffile): self.log.info("%s: downloaded '%s' to %s" % (basefile, link.text, otherpdffile)) else: self.log.warning("%s: No idea!" % basefile)
[docs]class ELSAKFS(MyndFskr): alias = "elsakfs" # real name is ELSÄK-FS, but avoid swedchars, uppercase and dashes uri_slug = "elsaek-fs" # for use in start_url = "http://www.elsakerhetsverket.se/sv/Lag-och-ratt/Foreskrifter/Elsakerhetsverkets-foreskrifter-listade-i-nummerordning/"
[docs]class NFS(MyndFskr): alias = "nfs" start_url = "http://www.naturvardsverket.se/sv/Start/Lagar-och-styrning/Foreskrifter-och-allmanna-rad/Foreskrifter/"
[docs]class STAFS(MyndFskr): alias = "stafs" re_identifier = re.compile('STAFS (\d{4})[:/_-](\d+)') start_url = "http://www.swedac.se/sv/Det-handlar-om-fortroende/Lagar-och-regler/Alla-foreskrifter-i-nummerordning/" def download(self, basefile=None): soup = BeautifulSoup(requests.get(self.start_url).text) for link in list(soup.find_all("a", href=re.compile('/STAFS/'))): basefile = re.search('\d{4}:\d+', link.text).group(0) self.download_single(basefile, urljoin(self.start_url, link['href'])) def download_single(self, basefile, url): self.log.info("%s: %s" % (basefile, url)) consolidated_link = None newest = None soup = BeautifulSoup(requests.get(url).text) for link in soup.find_all("a", text=self.re_identifier): self.log.info(" %s: %s %s" % (basefile, link.text, link.url)) if "konso" in link.text: consolidated_link = link else: m = self.re_identifier.search(link.text) assert m if link.url.endswith(".pdf"): basefile = m.group(1) + ":" + m.group(2) filename = self.store.downloaded_path(basefile) self.log.info(" Downloading to %s" % filename) self.download_if_needed(link.absolute_url, filename) if basefile > newest: self.log.debug( "%s larger than %s" % (basefile, newest)) consolidated_basefile = basefile + \ "/konsoliderad/" + basefile newest = basefile else: self.log.debug( "%s not larger than %s" % (basefile, newest)) else: # not pdf - link to yet another pg subsoup = BeautifulSoup(requests.get(link).text) for sublink in soup.find_all("a", text=self.re_identifier): self.log.info(" Sub %s: %s %s" % (basefile, sublink.text, sublink['href'])) m = self.re_identifier.search(sublink.text) assert m if sublink.url.endswith(".pdf"): subbasefile = m.group(1) + ":" + m.group(2) self.download_if_needed(urljoin(link, sublink['href'], subbasefile)) if consolidated_link: filename = self.store.downloaded_path(consolidated_basefile) self.log.info(" Downloading consd to %s" % filename) self.download_if_needed( consolidated_link.absolute_url, consolidated_basefile, filename=filename)
[docs]class SKVFS(MyndFskr): alias = "skvfs" source_encoding = "utf-8" downloaded_suffix = ".pdf" # start_url = "http://www.skatteverket.se/rattsinformation/foreskrifter/tidigarear.4.1cf57160116817b976680001670.html" # This url contains slightly more (older) links (and a different layout)? start_url = "http://www.skatteverket.se/rattsinformation/lagrummet/foreskriftergallande/aldrear.4.19b9f599116a9e8ef3680003547.html" # also consolidated versions # http://www.skatteverket.se/rattsinformation/lagrummet/foreskrifterkonsoliderade/aldrear.4.19b9f599116a9e8ef3680004242.html # URL's are highly unpredictable. We must find the URL for every # resource we want to download, we cannot transform the resource # id into a URL def download(self, basefile=None): self.log.info("Starting at %s" % self.start_url) years = {} soup = BeautifulSoup(requests.get(self.start_url).text) for link in sorted(list(soup.find_all("a", text=re.compile('^\d{4}$'))), key=attrgetter('text')): year = int(link.text) # Documents for the years 1985-2003 are all on one page # (with links leading to different anchors). To avoid # re-downloading stuff when usecache=False, make sure we # haven't seen this url (sans fragment) before url = link.absolute_url.split("#")[0] if year not in years and url not in list(years.values()): self.download_year(year, url) years[year] = url # just download the most recent year def download_new(self): self.log.info("Starting at %s" % self.start_url) soup = BeautifulSoup(requests.get(self.start_url).text) link = sorted(list(soup.find_all("a", text=re.compile('^\d{4}$'))), key=attrgetter('text'), reverse=True)[0] self.download_year(int(link.text), link.absolute_url, usecache=True) def download_year(self, year, url): self.log.info("Downloading year %s from %s" % (year, url)) soup = BeautifulSoup(requests.get(self.start_url).text) for link in soup.find_all("a", text=re.compile('FS \d+:\d+')): if "bilaga" in link.text: self.log.warning("Skipping attachment in %s" % link.text) continue # sanitize trailing junk linktext = re.match("\w+FS \d+:\d+", link.text).group(0) # something like skvfs/2010/23 or rsfs/1996/9 basefile = linktext.strip( ).lower().replace(" ", "/").replace(":", "/") self.download_single( basefile, link.absolute_url) def download_single(self, basefile, url): self.log.info("Downloading %s from %s" % (basefile, url)) self.document_url = url + "#%s" html_downloaded = super( SKVFS, self).download_single(basefile) year = int(basefile.split("/")[1]) if year >= 2007: # download pdf as well filename = self.store.downloaded_path(basefile) pdffilename = os.path.splitext(filename)[0] + ".pdf" if not os.path.exists(pdffilename): soup = self.soup_from_basefile(basefile) pdflink = soup.find(href=re.compile('\.pdf$')) if not pdflink: self.log.debug("No PDF file could be found") return html_downloaded pdftext = pdflink.get_text(strip=True) pdfurl = urljoin(url, pdflink['href']) self.log.debug("Found %s at %s" % (pdftext, pdfurl)) pdf_downloaded = self.download_if_needed(pdfurl, pdffilename) return html_downloaded and pdf_downloaded else: return False else: return html_downloaded