Files
bundeshaushalt/extract_hh2026.py

215 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Extract HH 2026 (Regierungsentwurf) from XML into clean JSON.
- Parses hierarchy: Einzelplan -> Kapitel -> Titel
- Converts values to EUR (XML uses Tsd. EUR)
- Optional category tagging by keywords
- Outputs:
* HH2026_titel_eur.json (full titles)
* EP01_24_Summen_Mrd.json (totals for EP 0124)
* EP{EP}_TopN_Titel.json (per-EP top-N list)
Usage:
python extract_hh2026.py --xml regierungsentwurf_2026.xml --outdir ./out --topN 50 --tag
"""
import argparse, json, re, os, sys
from lxml import etree
from collections import defaultdict
CAT_RULES = {
"Soziales & Arbeit": [
r"Rente", r"Rentenversicherung", r"Bundeszuschuss.*Rente", r"Arbeitslos", r"Bürgergeld", r"Grundsicherung",
r"Jobcenter", r"BAföG", r"Arbeitsförderung", r"SGB"
],
"Gesundheit & Pflege": [
r"Gesundheit", r"Gesundheitsfonds", r"GKV", r"Krankenkasse", r"Krankenhaus|Klinik",
r"Pflege", r"Pflegeversicherung", r"Hebamme"
],
"Familie & Bildung": [
r"Familie", r"Kindergeld", r"Elterngeld", r"Kita|Kindergarten", r"Schule|Bildung",
r"Hochschule|Universität|Uni", r"Jugend"
],
"Innere Sicherheit": [
r"Bundespolizei|Polizei|BKA|Verfassungsschutz|BfV", r"Katastrophenschutz", r"Justizvollzug|Strafvollzug"
],
"Verteidigung": [
r"Bundeswehr|Verteidigung|Rüst(ung|ungs)|Marine|Heer|Luftwaffe|NATO|Beschaffung.*milit"
],
"Verkehr & Infrastruktur": [
r"Straß|Autobahn|Brücke|Bahn|Schiene|ÖPNV|Verkehr", r"Digital|Breitband|Netz"
],
"Subventionen & Wirtschaftsförderung": [
r"Subvention|Förderung|Zuschuss.*Unternehmen|Mittelstand|Industrie", r"Agrar|Landwirtschaft",
r"Energiehilf|Wasserstoff|Batterie|Elektromobil|Ladeinfrastruktur|Start-?up|Gründer|Kohle|Bergbau|Schiffbau"
],
"Öffentliche Verwaltung & Politik": [
r"Bundestag|Bundesrat|Kanzleramt|Ministerium|Behörde|Amt|Regierungspräsidium|politische.*Stiftung"
],
"Zinsen & Finanzverwaltung": [
r"Zins|Bundesschuld|Schuldendienst|Anleihe|Tilgung|Finanzagentur"
],
"Internationale Zusammenarbeit & Entwicklung": [
r"Entwicklungszusammenarbeit|BMZ|ODA|Auslandshilfe|UNO|Vereinte Nationen|EU-?Beitrag|Auslandszahlung|Weltbank|IWF"
],
"Kultur, Religion & Medien": [
r"Kirche|Religionsgemeinschaft|Staatsleistung", r"Kultur|Theater|Museum|Filmförder",
r"Rundfunk|Medien|Presse"
],
"Umwelt & Klima": [
r"Umwelt|Klimaschutz|Nachhaltigkeit|Naturschutz|Energiewende|Ökologie|CO2|Emission"
],
}
def compile_rules():
return {cat: [re.compile(pat, re.I) for pat in pats] for cat, pats in CAT_RULES.items()}
def categorize(texts):
"""Return best-fit category by first match priority."""
if not texts: return None
blob = " | ".join([t for t in texts if t]).strip()
if not blob: return None
for cat, patterns in compile_rules().items():
for rx in patterns:
if rx.search(blob):
return cat
return None
def parse_xml(xml_path):
parser = etree.XMLParser(recover=True, huge_tree=True, encoding="utf-8")
tree = etree.parse(xml_path, parser)
root = tree.getroot()
records = []
for ep in root.findall(".//einzelplan"):
ep_nr = ep.get("nr")
ep_name = (ep.findtext("text") or "").strip()
for kap in ep.findall(".//kapitel"):
kap_nr = kap.get("nr")
kap_name = (kap.findtext("text") or "").strip()
for titel in kap.findall(".//titel"):
titel_nr = titel.get("nr")
fkt = titel.get("fkt")
seite = titel.get("seite")
bez = (titel.findtext("text") or "").strip()
soll = titel.find(".//soll")
wert = None
if soll is not None and "wert" in soll.attrib:
try:
wert = float(soll.attrib["wert"].replace(",", "."))
except:
wert = None
if wert is None:
continue
rec = {
"einzelplan_nr": str(ep_nr).zfill(2) if ep_nr else "",
"einzelplan_name": ep_name,
"kapitel_nr": kap_nr,
"kapitel_name": kap_name,
"titel_nr": titel_nr,
"bezeichnung": bez,
"seite": int(seite) if seite and re.match(r"^\d+$", str(seite)) else None,
"betrag_eur": wert * 1000.0, # XML uses Tsd. EUR
}
records.append(rec)
return records
def add_categories(records, do_tag=False):
if not do_tag:
return records
out = []
for r in records:
cat = categorize([r.get("bezeichnung"), r.get("kapitel_name"), r.get("einzelplan_name")])
r2 = dict(r)
r2["category"] = cat or "Sonstiges / nicht zuordenbar"
out.append(r2)
return out
def build_ep_totals(records, ep_range=("01","24")):
lo, hi = map(int, ep_range)
sums = defaultdict(float)
names = {}
for r in records:
ep = r.get("einzelplan_nr","")
if not ep: continue
try:
epi = int(ep)
except:
continue
if epi < int(lo) or epi > int(hi):
continue
sums[ep] += float(r.get("betrag_eur") or 0.0)
names[ep] = r.get("einzelplan_name","")
rows = []
for ep, val in sorted(sums.items(), key=lambda x: -x[1]):
rows.append({
"einzelplan_nr": ep,
"einzelplan_name": names.get(ep, ""),
"etat_mrd_eur": val / 1e9
})
return rows
def top_n_per_ep(records, N=20, outdir="."):
# group by EP, sort by betrag
by_ep = defaultdict(list)
for r in records:
ep = r.get("einzelplan_nr","")
by_ep[ep].append(r)
index = []
for ep, rows in by_ep.items():
rows_sorted = sorted(rows, key=lambda x: float(x.get("betrag_eur") or 0.0), reverse=True)[:N]
# enrich with Mio for readability
for rr in rows_sorted:
rr["betrag_mio_eur"] = (float(rr["betrag_eur"]) / 1e6) if rr.get("betrag_eur") is not None else None
path = os.path.join(outdir, f"EP{ep}_Top{N}_Titel.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(rows_sorted, f, ensure_ascii=False, indent=2)
index.append({"einzelplan_nr": ep, "topN_path": path})
return index
def main():
ap = argparse.ArgumentParser(description="Extract Regierungsentwurf 2026 XML to JSON")
ap.add_argument("--xml", required=True, help="Path to regierungsentwurf_2026.xml")
ap.add_argument("--outdir", default=".", help="Output directory")
ap.add_argument("--topN", type=int, default=20, help="Top-N Titel pro EP exportieren")
ap.add_argument("--tag", action="store_true", help="Einfache Kategorien per Keyword zuweisen")
ap.add_argument("--ep_lo", default="01", help="EP-Start (inklusive) für Totals")
ap.add_argument("--ep_hi", default="24", help="EP-Ende (inklusive) für Totals")
args = ap.parse_args()
os.makedirs(args.outdir, exist_ok=True)
# Parse XML fully
records = parse_xml(args.xml)
# Optional tagging
records = add_categories(records, do_tag=args.tag)
# Save full titles
titles_json = os.path.join(args.outdir, "HH2026_titel_eur.json")
with open(titles_json, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
# Build EP totals
totals = build_ep_totals(records, ep_range=(args.ep_lo, args.ep_hi))
totals_json = os.path.join(args.outdir, f"EP{args.ep_lo}_{args.ep_hi}_Summen_Mrd.json")
with open(totals_json, "w", encoding="utf-8") as f:
json.dump(totals, f, ensure_ascii=False, indent=2)
# Build per-EP TopN
index = top_n_per_ep(records, N=args.topN, outdir=args.outdir)
index_json = os.path.join(args.outdir, "Ministerien_TopN_Index.json")
with open(index_json, "w", encoding="utf-8") as f:
json.dump(index, f, ensure_ascii=False, indent=2)
# Small stdout summary
total_sum = sum(float(r.get("betrag_eur") or 0.0) for r in records)
print(f"[OK] Parsed titles: {len(records)}")
print(f"[OK] Sum of all titles (EUR): {total_sum:,.2f}")
print(f"[OK] Wrote: {titles_json}")
print(f"[OK] Wrote: {totals_json}")
print(f"[OK] Wrote per-EP Top{args.topN} files (index -> {index_json})")
if __name__ == "__main__":
main()