From 503deae5ef953434a476f32c9925de2f9b85f6dd Mon Sep 17 00:00:00 2001 From: Martin Carlsson Date: Sun, 4 Jan 2026 12:24:56 +0100 Subject: [PATCH] Fix PEP 8 compliance across all Python files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit addresses PEP 8 compliance issues throughout the codebase to ensure the project follows the coding standards documented in CONTRIBUTING.md. ## Changes made: ### Whitespace and formatting (900+ fixes) - Remove trailing whitespace from all lines (W291, W293) - Fix newline issues at end of files (W292, W391) - Fix blank lines between functions (E302, E303, E305) - Fix block comment formatting (E265) ### Line length (60 fixes) - Break long lines where possible using autopep8 (E501) - Reduced from 174 to 114 E501 violations - Remaining violations are long strings/comments per CONTRIBUTING.md flexibility ### Indentation (20 fixes) - Fix continuation line indentation (E128, E129) ## Remaining issues: - 114 E501 (line too long) violations remain, primarily for: - Long string literals - Complex regex patterns - Descriptive comments These are acceptable per CONTRIBUTING.md: "Maximal radlängd: 100 tecken (flexibelt för långa strängar)" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- downloaders/download_sfs_docs.py | 93 ++++--- downloaders/eur_lex_api.py | 109 +++++---- downloaders/fetch_new_sfs_docs.py | 58 ++--- downloaders/riksdagen_api.py | 128 +++++----- downloaders/rkrattsbaser_api.py | 26 +- exporters/git/__init__.py | 10 +- exporters/git/batch_export_to_git.py | 20 +- exporters/git/generate_commits.py | 202 ++++++++------- exporters/git/git_utils.py | 214 ++++++++-------- exporters/git/init_commits_batch_processor.py | 64 +++-- .../git/temporal_commits_batch_processor.py | 53 ++-- exporters/html/eli_utils.py | 67 ++--- exporters/html/html_diff_page.py | 12 +- exporters/html/html_export.py | 173 +++++++------ exporters/html/populate_index_pages.py | 90 ++++--- exporters/html/styling_constants.py | 45 ++-- exporters/html/upload_to_r2.py | 77 +++--- formatters/add_pdf_url_to_frontmatter.py | 124 +++++----- formatters/apply_links.py | 106 ++++---- formatters/format_sfs_text.py | 230 ++++++++++-------- formatters/frontmatter_manager.py | 75 +++--- formatters/predocs_parser.py | 58 +++-- formatters/sort_frontmatter.py | 136 ++++++----- formatters/table_converter.py | 161 ++++++------ scripts/analyze_forarbeten.py | 49 ++-- .../analyze_ikraft_overgangsbestammelse.py | 43 ++-- scripts/analyze_ikraft_years.py | 41 ++-- scripts/analyze_kap_upphavd_context.py | 70 +++--- scripts/analyze_law_name_issues.py | 43 ++-- .../analyze_link_patterns_with_documents.py | 79 +++--- scripts/analyze_unmatched_law_names.py | 26 +- scripts/dry_run_git_commits.py | 49 ++-- scripts/extract_unmatched_laws.py | 6 +- scripts/run_3year_commits.py | 102 +++++--- scripts/temporal_commits_batch.py | 13 +- scripts/validate_law_names.py | 81 +++--- sfs_processor.py | 185 +++++++++----- temporal/__init__.py | 2 +- temporal/amendments.py | 28 ++- temporal/apply_temporal.py | 136 ++++++----- temporal/find_expiring_docs.py | 52 ++-- temporal/get_temporal_date_range.py | 3 +- temporal/overgangsbestammelser.py | 35 ++- temporal/title_temporal.py | 2 +- temporal/upcoming_changes.py | 134 +++++----- util/datetime_utils.py | 11 +- util/file_utils.py | 6 +- util/text_utils.py | 4 +- util/yaml_utils.py | 2 - 49 files changed, 1939 insertions(+), 1594 deletions(-) diff --git a/downloaders/download_sfs_docs.py b/downloaders/download_sfs_docs.py index 0d96c281..bf61060c 100644 --- a/downloaders/download_sfs_docs.py +++ b/downloaders/download_sfs_docs.py @@ -14,8 +14,8 @@ # Importera funktioner från de specifika nedladdningsmodulerna from riksdagen_api import fetch_document_ids, download_documents as download_riksdagen_documents from rkrattsbaser_api import ( - fetch_document_by_rkrattsbaser, - save_document_from_rkrattsbaser, + fetch_document_by_rkrattsbaser, + save_document_from_rkrattsbaser, convert_riksdagen_id_to_rkrattsbaser_format, download_documents as download_rkrattsbaser_documents ) @@ -28,84 +28,85 @@ def download_test_docs(): """ test_docs_file = "data/test-doc-ids.json" output_dir = "data/testdocs" - + print("=== Laddar ner testdokument ===") - + # Kontrollera att filen med test-dokument-ID:n finns if not os.path.exists(test_docs_file): print(f"✗ Filen {test_docs_file} hittades inte.") return False - + try: # Läs test-dokument-ID:n från JSON-filen (med kommentarstöd) with open(test_docs_file, 'r', encoding='utf-8') as f: content = f.read() - + # Ta bort /* */ kommentarer content = re.sub(r'/\*.*?\*/', '', content, flags=re.DOTALL) # Ta bort // kommentarer (endast från början av rad eller efter whitespace) content = re.sub(r'^\s*//.*$', '', content, flags=re.MULTILINE) content = re.sub(r'\s+//.*$', '', content, flags=re.MULTILINE) - + test_docs = json.loads(content) - + if not test_docs: print("Inga testdokument att ladda ner.") return True - + print(f"Hittade {len(test_docs)} testdokument att ladda ner") print(f"Sparar i katalog: {output_dir}") - + # Skapa katalog om den inte finns os.makedirs(output_dir, exist_ok=True) - + successful_downloads = 0 failed_downloads = 0 - + # Ladda ner varje testdokument for i, doc_info in enumerate(test_docs, 1): document_id = doc_info.get("document_id") comment = doc_info.get("comment", "") - + if not document_id: print(f"⚠ Dokument {i} saknar document_id, hoppar över") failed_downloads += 1 continue - + print(f"[{i}/{len(test_docs)}] {document_id}") if comment: print(f" Kommentar: {comment}") # Konvertera dokument-ID till rätt format för Regeringskansliet converted_id = convert_riksdagen_id_to_rkrattsbaser_format(document_id) - + # Ladda ner dokumentet från Regeringskansliet document_data = fetch_document_by_rkrattsbaser(converted_id) if document_data: rkrattsbaser_dir = os.path.join(output_dir, "rkrattsbaser") - success = save_document_from_rkrattsbaser(document_id, document_data, rkrattsbaser_dir) + success = save_document_from_rkrattsbaser( + document_id, document_data, rkrattsbaser_dir) else: success = False - + if success: successful_downloads += 1 else: failed_downloads += 1 - + # Kort paus mellan nedladdningar time.sleep(0.5) - + # Sammanfattning print("\n=== Sammanfattning testdokument ===") print(f"Totalt testdokument: {len(test_docs)}") print(f"Lyckade nedladdningar: {successful_downloads}") print(f"Misslyckade nedladdningar: {failed_downloads}") - + if successful_downloads > 0: print(f"Testdokument sparade i: {os.path.abspath(output_dir)}") - + return failed_downloads == 0 - + except json.JSONDecodeError as e: print(f"✗ Fel vid parsing av {test_docs_file}: {e}") return False @@ -118,17 +119,29 @@ def main(): """ Huvudfunktion som koordinerar hämtning av dokument-ID:n och nedladdning av dokument. """ - parser = argparse.ArgumentParser(description='Ladda ner SFS-dokument från Regeringskansliets söktjänst eller Riksdagens öppna API') - parser.add_argument('--ids', default='all', - help='Kommaseparerad lista med dokument-ID:n att ladda ner, eller "all" för att hämta alla från Riksdagen (default: all)') + parser = argparse.ArgumentParser( + description='Ladda ner SFS-dokument från Regeringskansliets söktjänst eller Riksdagens öppna API') + parser.add_argument( + '--ids', + default='all', + help='Kommaseparerad lista med dokument-ID:n att ladda ner, eller "all" för att hämta alla från Riksdagen (default: all)') parser.add_argument('--out', default='sfs_docs', help='Mapp att spara nedladdade dokument i (default: sfs_docs)') - parser.add_argument('--source', choices=['riksdagen', 'rkrattsbaser'], default='rkrattsbaser', - help='Välj källa för nedladdning: riksdagen (HTML) eller rkrattsbaser (JSON via Elasticsearch) (default: rkrattsbaser)') - parser.add_argument('--year', type=int, - help='Filtrera dokument för specifikt årtal (t.ex. 2025 för sfs-2025-xxx). Fungerar endast med --ids all och --source riksdagen') - parser.add_argument('--test-docs', action='store_true', - help='Ladda ner testdokument från data/test-doc-ids.json till data/testdocs') + parser.add_argument( + '--source', + choices=[ + 'riksdagen', + 'rkrattsbaser'], + default='rkrattsbaser', + help='Välj källa för nedladdning: riksdagen (HTML) eller rkrattsbaser (JSON via Elasticsearch) (default: rkrattsbaser)') + parser.add_argument( + '--year', + type=int, + help='Filtrera dokument för specifikt årtal (t.ex. 2025 för sfs-2025-xxx). Fungerar endast med --ids all och --source riksdagen') + parser.add_argument( + '--test-docs', + action='store_true', + help='Ladda ner testdokument från data/test-doc-ids.json till data/testdocs') args = parser.parse_args() @@ -141,7 +154,7 @@ def main(): print(f"Källa: {args.source}") if args.year: print(f"Filtrerar för år: {args.year}") - + # Hämta dokument-ID:n if args.ids == 'all': document_ids = fetch_document_ids(args.year) @@ -153,30 +166,32 @@ def main(): # Varning om --year används med specifika IDs if args.year: print("⚠ --year parameter ignoreras när specifika dokument-ID:n anges med --ids.") - + if not document_ids: print("Inga dokument-ID:n hittades. Avslutar.") return - + # Skapa katalog för nedladdade författningar output_dir = args.out print(f"\nLaddar ner författningar till katalogen: {output_dir}") # Ladda ner författningar baserat på källa if args.source == 'riksdagen': - successful_downloads, failed_downloads = download_riksdagen_documents(document_ids, output_dir) + successful_downloads, failed_downloads = download_riksdagen_documents( + document_ids, output_dir) elif args.source == 'rkrattsbaser': - successful_downloads, failed_downloads = download_rkrattsbaser_documents(document_ids, output_dir) - + successful_downloads, failed_downloads = download_rkrattsbaser_documents( + document_ids, output_dir) + # Sammanfattning print("\n=== Sammanfattning ===") print(f"Totalt dokument-ID:n: {len(document_ids)}") print(f"Lyckade nedladdningar: {successful_downloads}") print(f"Misslyckade nedladdningar: {failed_downloads}") - + if successful_downloads > 0: print(f"Författningar sparade i katalogen: {os.path.abspath(output_dir)}") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/downloaders/eur_lex_api.py b/downloaders/eur_lex_api.py index 7a946613..fc039554 100644 --- a/downloaders/eur_lex_api.py +++ b/downloaders/eur_lex_api.py @@ -25,7 +25,7 @@ def parse_eu_regulation_to_celex(regulation_text: str) -> Optional[str]: """ Konverterar EU-förordningstext till CELEX-nummer. - + Stöder format som: - "(EU) nr 651/2014" - "(EU) Nr 651/2014" @@ -33,19 +33,19 @@ def parse_eu_regulation_to_celex(regulation_text: str) -> Optional[str]: - "651/2014" - "Förordning (EU) nr 651/2014" - "Rådets förordning (EU) nr 651/2014" - + Returnerar CELEX-nummer med sektor 3 (lagstiftning) som standard. fetch_eur_lex_document_info() kommer att försöka med sektor 2 (rättspraxis) om det inte hittas. - + Args: regulation_text (str): Text som innehåller EU-förordningsreferens - + Returns: Optional[str]: CELEX-nummer (t.ex. "32014R0651") eller None om inget hittas """ # Normalisera texten text = regulation_text.strip() - + # Mönster för att hitta EU-förordningar # Matchar olika format av EU-förordningar patterns = [ @@ -60,121 +60,128 @@ def parse_eu_regulation_to_celex(regulation_text: str) -> Optional[str]: # "Rådets förordning (EU) nr 651/2014" etc. r'[Rr]ådets\s*förordning\s*\(EU\)\s*[Nn]r\s*(\d+)/(\d{4})', ] - + for pattern in patterns: match = re.search(pattern, text) if match: number = match.group(1) year = match.group(2) - + # Konvertera till CELEX-format (börjar alltid med sektor 3) return eu_regulation_to_celex(number, year) - + return None def eu_regulation_to_celex(number: str, year: str, regulation_type: str = "R") -> str: """ Konverterar EU-förordningsnummer och år till CELEX-format. - + Args: number (str): Förordningsnummer (t.ex. "651") year (str): År (t.ex. "2014") regulation_type (str): Typ av förordning ("R" för regulation, "L" för directive, "D" för decision) - + Returns: str: CELEX-nummer (t.ex. "32014R0651") """ # Sektor 3 för lagstiftning sector = "3" - + # Formatera löpnummer med fyllnad till 4 siffror formatted_number = number.zfill(4) - + # Bygga CELEX-nummer: sektor + år + typ + löpnummer celex = f"{sector}{year}{regulation_type}{formatted_number}" - + return celex def generate_eur_lex_url(celex_number: str, language: str = "SV") -> str: """ Genererar URL till EUR-Lex-dokument baserat på CELEX-nummer. - + Args: celex_number (str): CELEX-nummer (t.ex. "32014R0651") language (str): Språkkod (t.ex. "SV" för svenska, "EN" för engelska) - + Returns: str: URL till EUR-Lex-dokument """ # URL-koda CELEX-numret encoded_celex = quote(celex_number) - + # Bygga URL url = f"https://eur-lex.europa.eu/legal-content/{language}/ALL/?uri=celex%3A{encoded_celex}" - + return url -def fetch_eur_lex_document_info(celex_number: str, language: str = "SV") -> Optional[Dict[str, Any]]: +def fetch_eur_lex_document_info( + celex_number: str, language: str = "SV") -> Optional[Dict[str, Any]]: """ Hämtar grundläggande information om ett EUR-Lex-dokument. - + Om det ursprungliga CELEX-numret (med sektor 3 för lagstiftning) inte hittas, försöker funktionen med sektor 2 (rättspraxis). - + Args: celex_number (str): CELEX-nummer (t.ex. "32014R0651") language (str): Språkkod (t.ex. "SV" för svenska, "EN" för engelska) - + Returns: Optional[Dict[str, Any]]: Grundläggande dokumentinformation eller None om misslyckad """ # Försök först med det ursprungliga CELEX-numret url = generate_eur_lex_url(celex_number, language) - + try: response = requests.get(url, timeout=30) - + # Om det fungerar, använd det ursprungliga numret if response.status_code == 200: return _create_document_info(celex_number, url, language, response.status_code) - - # Om vi får 404 eller 400 och det är sektor 3 (lagstiftning), försök med sektor 2 (rättspraxis) + + # Om vi får 404 eller 400 och det är sektor 3 (lagstiftning), försök med + # sektor 2 (rättspraxis) elif response.status_code in [400, 404] and celex_number.startswith('3'): print(f"CELEX {celex_number} (lagstiftning) hittades inte, försöker med rättspraxis...") - + # Skapa nytt CELEX-nummer med sektor 2 istället för 3 alternative_celex = '2' + celex_number[1:] alternative_url = generate_eur_lex_url(alternative_celex, language) - + # Försök med det alternativa numret alt_response = requests.get(alternative_url, timeout=30) alt_response.raise_for_status() - + print(f"Hittade dokument med CELEX {alternative_celex} (rättspraxis)") - return _create_document_info(alternative_celex, alternative_url, language, alt_response.status_code) - + return _create_document_info( + alternative_celex, + alternative_url, + language, + alt_response.status_code) + else: # Försök att göra normal error handling response.raise_for_status() - + except requests.exceptions.RequestException as e: print(f"Fel vid hämtning av EUR-Lex-dokument {celex_number}: {e}") return None -def _create_document_info(celex_number: str, url: str, language: str, status_code: int) -> Dict[str, Any]: +def _create_document_info(celex_number: str, url: str, language: str, + status_code: int) -> Dict[str, Any]: """ Hjälpfunktion för att skapa dokumentinformation baserat på CELEX-nummer. - + Args: celex_number (str): CELEX-nummer url (str): URL till dokumentet language (str): Språkkod status_code (int): HTTP-statuskod - + Returns: Dict[str, Any]: Dokumentinformation """ @@ -185,14 +192,14 @@ def _create_document_info(celex_number: str, url: str, language: str, status_cod "language": language, "status": "found" if status_code == 200 else "not_found" } - + # Extrahera år och typ från CELEX-numret if len(celex_number) >= 8: sector = celex_number[0] year = celex_number[1:5] regulation_type = celex_number[5] number = celex_number[6:].lstrip('0') - + info.update({ "sector": sector, "year": year, @@ -201,23 +208,23 @@ def _create_document_info(celex_number: str, url: str, language: str, status_cod "formatted_reference": f"(EU) nr {number}/{year}" if regulation_type == "R" else f"(EU) {number}/{year}", "sector_description": "lagstiftning" if sector == "3" else "rättspraxis" if sector == "2" else "okänd" }) - + return info def validate_celex_number(celex_number: str) -> bool: """ Validerar att ett CELEX-nummer har korrekt format. - + Args: celex_number (str): CELEX-nummer att validera - + Returns: bool: True om giltigt format, False annars """ # CELEX-nummer ska ha format: sektor (1) + år (4) + typ (1) + löpnummer (4) pattern = r'^[1-9]\d{4}[A-Z]\d{4}$' - + return bool(re.match(pattern, celex_number)) @@ -225,22 +232,22 @@ def validate_celex_number(celex_number: str) -> bool: if __name__ == "__main__": # Test med exempel från beskrivningen test_regulation = "(EU) nr 651/2014" - + print(f"Input: {test_regulation}") - + # Konvertera till CELEX celex = parse_eu_regulation_to_celex(test_regulation) print(f"CELEX: {celex}") - + if celex: # Generera URL url = generate_eur_lex_url(celex) print(f"URL: {url}") - + # Validera CELEX-numret is_valid = validate_celex_number(celex) print(f"Giltigt CELEX: {is_valid}") - + # Hämta dokumentinformation (med automatisk fallback till sektor 2 om sektor 3 inte hittas) print("Hämtar dokumentinformation...") info = fetch_eur_lex_document_info(celex) @@ -250,17 +257,17 @@ def validate_celex_number(celex_number: str) -> bool: print(f" {key}: {value}") else: print("Kunde inte hämta dokumentinformation") - + # Test med flera format - print("\n" + "="*50) + print("\n" + "=" * 50) print("Test med olika format:") - + test_cases = [ "(EU) nr 651/2014", - "(EU) Nr 1234/2020", + "(EU) Nr 1234/2020", "Rådets förordning (EU) nr 999/2023" ] - + for test in test_cases: celex = parse_eu_regulation_to_celex(test) - print(f"{test:<35} -> {celex}") \ No newline at end of file + print(f"{test:<35} -> {celex}") diff --git a/downloaders/fetch_new_sfs_docs.py b/downloaders/fetch_new_sfs_docs.py index a26c3042..c2bdbd56 100644 --- a/downloaders/fetch_new_sfs_docs.py +++ b/downloaders/fetch_new_sfs_docs.py @@ -19,19 +19,19 @@ def _post(payload_dict: Dict[str, Any]) -> Optional[Dict]: """ Gör en POST-förfrågan till Regeringskansliets Elasticsearch API. - + Args: payload_dict (Dict[str, Any]): Payload för API-anropet - + Returns: Optional[Dict]: API-svar eller None vid fel """ url = "https://beta.rkrattsbaser.gov.se/elasticsearch/SearchEsByRawJson" - + headers = { 'content-type': 'application/json' } - + try: response = requests.post(url, headers=headers, json=payload_dict, timeout=30) response.raise_for_status() @@ -50,7 +50,7 @@ def get_newer_items(date: str) -> Optional[Dict]: Args: date (str): Datum i ISO-format (YYYY-MM-DD eller YYYY-MM-DDTHH:MM:SS) - + Returns: Optional[Dict]: API-svar med författningar eller None vid fel """ @@ -82,10 +82,10 @@ def save_document_as_json(document: Dict[str, Any], output_dir: Path) -> bool: Args: document (Dict[str, Any]): Författningsdata från API:et output_dir (Path): Katalog att spara filen i - + Returns: bool: True om sparningen lyckades, False annars - + Raises: ValueError: Om beteckning saknas eller är tom """ @@ -94,22 +94,22 @@ def save_document_as_json(document: Dict[str, Any], output_dir: Path) -> bool: beteckning = document.get('beteckning') if not beteckning: raise ValueError("Beteckning saknas eller är tom i dokumentet") - + # Konvertera beteckning till filnamn (t.ex. "2024:123" -> "sfs-2024-123.json") safe_filename = "sfs-" + re.sub(r'[^\w\-]', '-', beteckning) + '.json' output_file = output_dir / safe_filename - + # Kontrollera om filen redan finns if output_file.exists(): print(f"⚠ {output_file.name} finns redan, skriver över") - + # Skriv JSON-filen with open(output_file, 'w', encoding='utf-8') as f: json.dump(document, f, ensure_ascii=False, indent=2) - + print(f"✓ Sparade {beteckning} -> {output_file}") return True - + except ValueError as e: print(f"✗ Fel vid sparning av författning: {e}") return False @@ -122,13 +122,13 @@ def save_document_as_json(document: Dict[str, Any], output_dir: Path) -> bool: def parse_date(date_str: str) -> str: """ Parsar och validerar ett datumformat. - + Args: date_str (str): Datum som sträng - + Returns: str: Validerat datum i ISO-format - + Raises: ValueError: Om datumet inte kan parsas """ @@ -138,14 +138,14 @@ def parse_date(date_str: str) -> str: '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S' ] - + for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) return parsed_date.isoformat() except ValueError: continue - + raise ValueError(f"Kunde inte parsa datum: {date_str}") @@ -164,24 +164,24 @@ def main(): Efter att JSON-filerna sparats kan de bearbetas med sfs_processor.py: python sfs_processor.py --input /path/to/json --output /path/to/markdown - """ - ) - + """) + # Datum-alternativ (antingen --date eller --days) date_group = parser.add_mutually_exclusive_group(required=True) - date_group.add_argument('--date', - help='Hämta författningar uppdaterade efter detta datum (YYYY-MM-DD eller YYYY-MM-DDTHH:MM:SS)') + date_group.add_argument( + '--date', + help='Hämta författningar uppdaterade efter detta datum (YYYY-MM-DD eller YYYY-MM-DDTHH:MM:SS)') date_group.add_argument('--days', type=int, - help='Hämta författningar uppdaterade de senaste X dagarna') + help='Hämta författningar uppdaterade de senaste X dagarna') parser.add_argument('--output', '-o', default='sfs_json', help='Mapp att spara JSON-filer i (default: sfs_json)') # Remove year folder option since we're saving JSON files directly - + args = parser.parse_args() print("=== Dokumenthämtare (nya/uppdaterade) ===") - + # Bestäm datum att söka från if args.date: try: @@ -193,7 +193,8 @@ def main(): else: # args.days cutoff_date = datetime.now() - timedelta(days=args.days) search_date = cutoff_date.isoformat() - print(f"Hämtar författningar uppdaterade de senaste {args.days} dagarna (efter {search_date})") + print( + f"Hämtar författningar uppdaterade de senaste {args.days} dagarna (efter {search_date})") # Skapa output-katalog tidigt så den finns även om inga dokument hittas output_dir = Path(args.output) @@ -238,7 +239,8 @@ def main(): # Skip documents with beteckning starting with 'N' (myndighetsföreskrifter) if beteckning.startswith('N'): - print(f"\n[{i}/{len(documents)}] ⚠ Hoppar över {beteckning} - myndighetsföreskrift (N-beteckning)") + print( + f"\n[{i}/{len(documents)}] ⚠ Hoppar över {beteckning} - myndighetsföreskrift (N-beteckning)") skipped_n += 1 continue @@ -248,7 +250,7 @@ def main(): successful_saves += 1 else: failed_saves += 1 - + # Sammanfattning print("\n=== Sammanfattning ===") print(f"Totalt författningar: {len(documents)}") diff --git a/downloaders/riksdagen_api.py b/downloaders/riksdagen_api.py index c55f5d7d..6eef6068 100644 --- a/downloaders/riksdagen_api.py +++ b/downloaders/riksdagen_api.py @@ -22,12 +22,12 @@ class RiksdagenAPIError(Exception): def construct_rd_docid(doc_type: str, rm: str, bet: str) -> Optional[str]: """ Construct a Riksdag document ID (rd_docid) from document type, riksmötesår and beteckning. - + Args: doc_type: Document type ('prop', 'bet', 'rskr', etc.) rm: Riksmötesår (e.g., "2024/25") bet: Beteckning (document number/designation) - + Returns: Constructed rd_docid string or None if construction fails """ @@ -43,7 +43,7 @@ def construct_rd_docid(doc_type: str, rm: str, bet: str) -> Optional[str]: "2000/01": "GN", "1999/00": "GM", "1998/99": "GL", "1997/98": "GK", "1996/97": "GJ", "1995/96": "GI", "1994/95": "GH", "1993/94": "GG" } - + # Mapping from document types to series codes doc_type_mappings = { 'prop': '03', # Government propositions @@ -53,35 +53,36 @@ def construct_rd_docid(doc_type: str, rm: str, bet: str) -> Optional[str]: 'ip': '10', # Interpellations 'fr': '11', # Questions } - + # Get year code year_code = year_mappings.get(rm) if not year_code: return None - + # Get document series code series_code = doc_type_mappings.get(doc_type) if not series_code: return None - + # Construct the rd_docid # Format: [year_code][series_code][beteckning] rd_docid = f"{year_code}{series_code}{bet}" - + return rd_docid -def fetch_document_info(doc_type: str, rm: str, bet: str, max_retries: int = 3, delay: float = 0.5) -> Optional[Dict[str, str]]: +def fetch_document_info(doc_type: str, rm: str, bet: str, max_retries: int = 3, + delay: float = 0.5) -> Optional[Dict[str, str]]: """ Fetch document information from Riksdag API using document type, riksmötesår and beteckning. - + Args: doc_type: Document type ('prop', 'bet', 'rskr', etc.) rm: Riksmötesår (e.g., "2024/25") bet: Beteckning (document number) max_retries: Maximum number of retry attempts delay: Delay between requests in seconds - + Returns: Dictionary with document info: {'dokumentnamn': '...', 'titel': '...'} Returns None if document not found or on error. @@ -91,36 +92,36 @@ def fetch_document_info(doc_type: str, rm: str, bet: str, max_retries: int = 3, if not rd_docid: print(f"Varning: Kunde inte konstruera rd_docid för {doc_type} {rm}:{bet}") return None - + url = f"https://data.riksdagen.se/dokument/{rd_docid}.json" - + for attempt in range(max_retries): try: # Add delay to be respectful to the API if attempt > 0: time.sleep(delay) - + response = requests.get(url, timeout=10) response.raise_for_status() - + data = response.json() - + # Extract document information from JSON response if 'dokumentstatus' in data and 'dokument' in data['dokumentstatus']: doc = data['dokumentstatus']['dokument'] - + # Extract the information we need dokumentnamn = doc.get('dokumentnamn', '') titel = doc.get('titel', '') - + if dokumentnamn and titel: return { 'dokumentnamn': dokumentnamn, 'titel': titel } - + return None - + except requests.exceptions.RequestException as e: print(f"Varning: HTTP-fel vid hämtning av {rd_docid} ({doc_type} {rm}:{bet}): {e}") if attempt == max_retries - 1: @@ -133,25 +134,25 @@ def fetch_document_info(doc_type: str, rm: str, bet: str, max_retries: int = 3, print(f"Varning: Oväntat fel vid hämtning av {rd_docid} ({doc_type} {rm}:{bet}): {e}") if attempt == max_retries - 1: return None - + return None -def fetch_predocs_details(predocs_list: List[Dict[str, str]], - delay_between_requests: float = 0.5) -> List[Dict[str, str]]: +def fetch_predocs_details(predocs_list: List[Dict[str, str]], + delay_between_requests: float = 0.5) -> List[Dict[str, str]]: """ Fetch detailed information for a list of förarbeten references. - + Args: predocs_list: List of parsed förarbeten dictionaries delay_between_requests: Delay between requests in seconds - + Returns: List of dictionaries with detailed information: [ { 'type': 'prop', - 'rm': '2024/25', + 'rm': '2024/25', 'bet': '1', 'original': 'Prop. 2024/25:1', 'dokumentnamn': 'Prop. 2024/25:1', @@ -161,24 +162,24 @@ def fetch_predocs_details(predocs_list: List[Dict[str, str]], ] """ detailed_results = [] - + for i, predoc in enumerate(predocs_list): # Add delay between requests to be respectful if i > 0: time.sleep(delay_between_requests) - + rm = predoc.get('rm') bet = predoc.get('bet') - + if not all([rm, bet]): # Keep original entry if we can't fetch details detailed_results.append(predoc) continue - + print(f"Hämtar information för {rm}:{bet}...") - + doc_info = fetch_document_info(predoc.get('type'), rm, bet) - + if doc_info: # Merge the original information with the fetched details result = predoc.copy() @@ -189,27 +190,27 @@ def fetch_predocs_details(predocs_list: List[Dict[str, str]], # Keep original entry if we couldn't fetch details detailed_results.append(predoc) print(f" - Kunde inte hämta information för {predoc['original']}") - + return detailed_results def format_predocs_for_frontmatter(detailed_predocs: List[Dict[str, str]]) -> List[str]: """ Format detailed förarbeten information for use in frontmatter. - + Args: detailed_predocs: List of dictionaries with document details - + Returns: List of formatted strings in the format "(Dokumentnamn): (titel)" """ formatted = [] - + for predoc in detailed_predocs: dokumentnamn = predoc.get('dokumentnamn', '') titel = predoc.get('titel', '') original = predoc.get('original', '') - + if titel: if dokumentnamn: # Extract part after the first period from original @@ -221,14 +222,14 @@ def format_predocs_for_frontmatter(detailed_predocs: List[Dict[str, str]]) -> Li else: # Fallback to original reference if we don't have full details formatted.append(original) - + return formatted def fetch_document_ids(year: Optional[int] = None) -> List[str]: """ Hämtar författnings-ID:n från Riksdagens dokumentlista. - + Args: year (Optional[int]): Filtrera författningar för specifikt årtal (t.ex. 2025 för sfs-2025-xxx) @@ -236,26 +237,27 @@ def fetch_document_ids(year: Optional[int] = None) -> List[str]: List[str]: Lista med författnings-ID:n """ url = "https://data.riksdagen.se/dokumentlista/?sok=&doktyp=SFS&utformat=iddump&a=s#soktraff" - + print(f"Hämtar författnings-ID:n från: {url}") - + try: response = requests.get(url, timeout=30) response.raise_for_status() - + # Parsa kommaseparerade värden och trimma mellanslag content = response.text.strip() document_ids = [doc_id.strip() for doc_id in content.split(',') if doc_id.strip()] - + # Filtrera baserat på årtal om specificerat if year is not None: original_count = len(document_ids) document_ids = [doc_id for doc_id in document_ids if doc_id.startswith(f"sfs-{year}-")] - print(f"Filtrerade för år {year}: {len(document_ids)} av {original_count} författningar") + print( + f"Filtrerade för år {year}: {len(document_ids)} av {original_count} författningar") print(f"Hittade {len(document_ids)} författnings-ID:n") return document_ids - + except requests.RequestException as e: print(f"Fel vid hämtning av författnings-ID:n: {e}") return [] @@ -264,18 +266,18 @@ def fetch_document_ids(year: Optional[int] = None) -> List[str]: def download_doc_as_html(document_id: str, output_dir: str = "documents") -> bool: """ Laddar ner textinnehållet för en specifik författning. - + Args: document_id (str): Författnings-ID att ladda ner output_dir (str): Katalog att spara filen i - + Returns: bool: True om nedladdningen lyckades, False annars """ url = f"https://data.riksdagen.se/dokument/{document_id}.html" filename = f"{document_id}.html" filepath = os.path.join(output_dir, filename) - + # Kontrollera om filen redan finns if os.path.exists(filepath): print(f"⚠ {filename} finns redan, hoppar över") @@ -284,17 +286,17 @@ def download_doc_as_html(document_id: str, output_dir: str = "documents") -> boo try: response = requests.get(url, timeout=30) response.raise_for_status() - + # Skapa katalog om den inte finns os.makedirs(output_dir, exist_ok=True) - + # Spara textinnehållet till fil with open(filepath, 'w', encoding='utf-8') as f: f.write(response.text) - + print(f"✓ Sparade {filename}") return True - + except requests.RequestException as e: print(f"✗ Fel vid hämtning av {document_id}: {e}") return False @@ -306,48 +308,48 @@ def download_doc_as_html(document_id: str, output_dir: str = "documents") -> boo def download_documents(document_ids: List[str], output_dir: str = "documents") -> Tuple[int, int]: """ Laddar ner en lista med dokument från Riksdagen. - + Args: document_ids (List[str]): Lista med dokument-ID:n att ladda ner output_dir (str): Katalog att spara filerna i - + Returns: Tuple[int, int]: (successful_downloads, failed_downloads) """ successful_downloads = 0 failed_downloads = 0 - + for i, document_id in enumerate(document_ids, 1): print(f"[{i}/{len(document_ids)}] Laddar ner {document_id}...") - + success = download_doc_as_html(document_id, output_dir) - + if success: successful_downloads += 1 else: failed_downloads += 1 - + # Kort paus mellan nedladdningar för att vara snäll mot servern time.sleep(0.5) - + return successful_downloads, failed_downloads if __name__ == "__main__": # Test the API functions from formatters.predocs_parser import parse_predocs_string - + test_string = "Prop. 2024/25:1, bet. 2024/25:FiU1" print(f"Testing with: {test_string}") - + parsed = parse_predocs_string(test_string) print(f"Parsed: {parsed}") - + if parsed: detailed = fetch_predocs_details(parsed) print(f"Detailed: {detailed}") - + formatted = format_predocs_for_frontmatter(detailed) print(f"Formatted for frontmatter:") for item in formatted: - print(f" - {item}") \ No newline at end of file + print(f" - {item}") diff --git a/downloaders/rkrattsbaser_api.py b/downloaders/rkrattsbaser_api.py index d859a364..30daae4c 100644 --- a/downloaders/rkrattsbaser_api.py +++ b/downloaders/rkrattsbaser_api.py @@ -69,7 +69,10 @@ def fetch_document_by_rkrattsbaser(doc_id: str) -> Optional[Dict]: return None -def save_document_from_rkrattsbaser(doc_id: str, document_data: Dict, output_dir: str = "rkrattsbaser") -> bool: +def save_document_from_rkrattsbaser( + doc_id: str, + document_data: Dict, + output_dir: str = "rkrattsbaser") -> bool: """ Sparar dokumentdata från Regeringskansliets API till fil. @@ -128,39 +131,40 @@ def convert_riksdagen_id_to_rkrattsbaser_format(doc_id: str) -> str: return doc_id -def download_documents(document_ids: List[str], output_dir: str = "rkrattsbaser") -> tuple[int, int]: +def download_documents(document_ids: List[str], + output_dir: str = "rkrattsbaser") -> tuple[int, int]: """ Laddar ner en lista med dokument från rkrattsbaser. - + Args: document_ids (List[str]): Lista med dokument-ID:n att ladda ner (i Riksdagen-format) output_dir (str): Katalog att spara filerna i - + Returns: tuple[int, int]: (successful_downloads, failed_downloads) """ successful_downloads = 0 failed_downloads = 0 - + for i, document_id in enumerate(document_ids, 1): print(f"[{i}/{len(document_ids)}] Laddar ner {document_id}...") - + # Konvertera dokument-ID till rätt format för rkrattsbaser converted_id = convert_riksdagen_id_to_rkrattsbaser_format(document_id) - + # Ladda ner dokumentet från rkrattsbaser document_data = fetch_document_by_rkrattsbaser(converted_id) if document_data: success = save_document_from_rkrattsbaser(document_id, document_data, output_dir) else: success = False - + if success: successful_downloads += 1 else: failed_downloads += 1 - + # Kort paus mellan nedladdningar för att vara snäll mot servern time.sleep(0.5) - - return successful_downloads, failed_downloads \ No newline at end of file + + return successful_downloads, failed_downloads diff --git a/exporters/git/__init__.py b/exporters/git/__init__.py index c1b7666f..516c4b32 100644 --- a/exporters/git/__init__.py +++ b/exporters/git/__init__.py @@ -1,8 +1,8 @@ """Git export functionality for SFS documents.""" from .git_utils import ( - prepare_git_branch, - restore_original_branch, + prepare_git_branch, + restore_original_branch, remove_all_commits_on_branch, get_target_repository, configure_git_remote, @@ -19,11 +19,11 @@ from .generate_commits import create_init_git_commit __all__ = [ - 'prepare_git_branch', - 'restore_original_branch', + 'prepare_git_branch', + 'restore_original_branch', 'remove_all_commits_on_branch', 'get_target_repository', - 'configure_git_remote', + 'configure_git_remote', 'push_to_target_repository', 'clone_target_repository_to_temp', 'is_file_tracked', diff --git a/exporters/git/batch_export_to_git.py b/exporters/git/batch_export_to_git.py index b6de312e..0bec93d0 100644 --- a/exporters/git/batch_export_to_git.py +++ b/exporters/git/batch_export_to_git.py @@ -69,8 +69,7 @@ def year_range_to_date_range(year_range: str) -> tuple[str, str]: def main(): parser = argparse.ArgumentParser( - description='Batch export SFS documents to Git repository with initial and temporal commits.' - ) + description='Batch export SFS documents to Git repository with initial and temporal commits.') parser.add_argument( '--years', help='Year range to export (e.g., "2024-2026" or "2024"). Filters both documents and temporal commits (ikraft/upphör dates) to this period.' @@ -195,9 +194,9 @@ def main(): # Step 1: Create initial commits (if not skipped) if not args.skip_initial: - print("\n" + "="*80) + print("\n" + "=" * 80) print("STEG 1: SKAPAR INITIALA COMMITS") - print("="*80 + "\n") + print("=" * 80 + "\n") try: process_files_with_git_batch( @@ -217,9 +216,9 @@ def main(): # Step 2: Create temporal commits (if not skipped) if not args.skip_temporal: - print("\n" + "="*80) + print("\n" + "=" * 80) print("STEG 2: SKAPAR TEMPORAL COMMITS (UPCOMING CHANGES)") - print("="*80 + "\n") + print("=" * 80 + "\n") # Check if markers directory exists if not markers_dir.exists(): @@ -233,7 +232,8 @@ def main(): temporal_to_date = None if args.years: temporal_from_date, temporal_to_date = year_range_to_date_range(args.years) - print(f"Filtrerar temporal commits för perioden: {temporal_from_date} till {temporal_to_date}") + print( + f"Filtrerar temporal commits för perioden: {temporal_from_date} till {temporal_to_date}") try: process_temporal_commits_batch( @@ -253,15 +253,15 @@ def main(): print("\n⏭️ Hoppar över temporal commits (--skip-temporal)") # Summary - print("\n" + "="*80) + print("\n" + "=" * 80) print("✅ BATCH EXPORT KLAR!") - print("="*80) + print("=" * 80) print(f"Branch: {args.branch}") print(f"Antal filer bearbetade: {len(json_files)}") print("\nNästa steg:") print(f"1. Gå till target repository och skapa en Pull Request från branch '{args.branch}'") print(f"2. Granska ändringarna och merga till main") - print("="*80 + "\n") + print("=" * 80 + "\n") return 0 diff --git a/exporters/git/generate_commits.py b/exporters/git/generate_commits.py index d29b29fb..b381477a 100644 --- a/exporters/git/generate_commits.py +++ b/exporters/git/generate_commits.py @@ -29,16 +29,16 @@ def create_init_git_commit( ) -> str: """ Create the initial git commit for an SFS document. - - It handles creating commits for individual documents and assumes + + It handles creating commits for individual documents and assumes we're already in a git repository and on the correct branch. - + Args: data: JSON data containing document information markdown_content: The markdown content to commit and save output_file: Path to the output markdown file (for local reference) verbose: Enable verbose output - + Returns: str: The final markdown content (cleaned, without selex tags) """ @@ -46,11 +46,11 @@ def create_init_git_commit( beteckning = data.get('beteckning') if not beteckning: raise ValueError("Beteckning saknas i dokumentdata") - + rubrik = data.get('rubrik_after_temporal', data.get('rubrik')) if not rubrik: raise ValueError("Rubrik saknas i dokumentdata") - + # Always expect utfardad_datum to exist utfardad_datum = format_datetime(data.get('fulltext', {}).get('utfardadDateTime')) if not utfardad_datum: @@ -63,24 +63,28 @@ def create_init_git_commit( if is_document_content_empty(temporal_content): temporal_content = add_empty_document_message(temporal_content, data, utfardad_datum) if verbose: - print(f"Info: Tomt dokument efter temporal processing för {beteckning} vid {utfardad_datum}, lade till förklarande meddelande") + print( + f"Info: Tomt dokument efter temporal processing för {beteckning} vid {utfardad_datum}, lade till förklarande meddelande") # Apply temporal title processing for frontmatter rubrik temporal_rubrik = title_temporal(rubrik, utfardad_datum) # Update rubrik in frontmatter with temporal title - temporal_content_with_rubrik = set_prop_in_frontmatter(temporal_content, "rubrik", temporal_rubrik) + temporal_content_with_rubrik = set_prop_in_frontmatter( + temporal_content, "rubrik", temporal_rubrik) # Add ikraft_datum to frontmatter (even if it's a future date) ikraft_datum = format_datetime(data.get('ikraftDateTime')) if ikraft_datum: - temporal_content_with_ikraft = set_prop_in_frontmatter(temporal_content_with_rubrik, "ikraft_datum", ikraft_datum) + temporal_content_with_ikraft = set_prop_in_frontmatter( + temporal_content_with_rubrik, "ikraft_datum", ikraft_datum) else: temporal_content_with_ikraft = temporal_content_with_rubrik # Remove andringsforfattningar from frontmatter in git mode - temporal_content_clean = remove_prop_from_frontmatter(temporal_content_with_ikraft, "andringsforfattningar") - + temporal_content_clean = remove_prop_from_frontmatter( + temporal_content_with_ikraft, "andringsforfattningar") + # Prepare final content for local save (always clean selex tags in git mode) final_content = clean_selex_tags(temporal_content_clean) @@ -105,7 +109,7 @@ def create_init_git_commit( if verbose: print(f"Varning: Filen {relative_path} finns redan i git repository, skippar") return final_content - + # Also check if file is already tracked by git (in case it was deleted locally) if is_file_tracked(str(relative_path)): if verbose: @@ -132,7 +136,7 @@ def create_init_git_commit( predocs = register_data.get('forarbeten') if predocs: commit_message += (f"\n\nHar tillkommit i Svensk författningssamling " - f"efter dessa förarbeten: {predocs}") + f"efter dessa förarbeten: {predocs}") # Format date for git commit_date = format_datetime_for_git(utfardad_datum) @@ -147,7 +151,7 @@ def create_init_git_commit( def format_section_list(sections): """Format a list of sections with proper Swedish enumeration (commas and 'och' before last). - + If more than 3 sections, return count instead of listing them all. """ if not sections: @@ -183,24 +187,24 @@ def generate_descriptive_commit_message( ) -> str: """ Generate a descriptive commit message based on the changes. - + Args: doc_name: The document ID (e.g., "2024:123") changes: List of changes for this date - + Returns: A descriptive commit message with emoji """ has_ikraft = any(c['type'] == 'ikraft' for c in changes) has_upphor = any(c['type'] in ['upphor', 'upphor_villkor'] for c in changes) - + # Collect sections with titles and check for article-level changes ikraft_sections = [] upphor_sections = [] upphavd_sections = [] # Sections with selex:upphavd="true" has_article_changes = False has_article_revoked = False # Article-level active revocation - + for change in changes: # Check if this is an article-level change (whole document) if change.get('source') == 'article_tag': @@ -209,16 +213,16 @@ def generate_descriptive_commit_message( if change.get('is_revoked'): has_article_revoked = True continue - + section_id = change.get('section_id') section_title = change.get('section_title', section_id or '') - + if not section_id: continue - + # Use section title display_text = section_title if section_title else f"{section_id} §" - + if change['type'] == 'ikraft': ikraft_sections.append(display_text) elif change['type'] == 'upphor': @@ -230,30 +234,31 @@ def generate_descriptive_commit_message( # Handle conditional expiry - treat similar to upphor but with different messaging upphor_sections.append(display_text) else: - raise ValueError(f"Okänd ändringstyp '{change['type']}' för {section_id}. Kända typer: 'ikraft', 'upphor', 'upphor_villkor'") - + raise ValueError( + f"Okänd ändringstyp '{change['type']}' för {section_id}. Kända typer: 'ikraft', 'upphor', 'upphor_villkor'") + # Build commit message if has_ikraft and has_upphor: # Both entry into force and expiration emoji = "🔄" - + # Check if same sections are both taking effect and expiring ikraft_set = set(ikraft_sections) upphor_set = set(upphor_sections) updated_sections = ikraft_set & upphor_set only_ikraft = ikraft_set - upphor_set only_upphor = upphor_set - ikraft_set - + message_parts = [] - + if updated_sections: sections_str = format_section_list(list(updated_sections)) message_parts.append(f"{sections_str} uppdateras") - + if only_ikraft: sections_str = format_section_list(list(only_ikraft)) message_parts.append(f"{sections_str} träder i kraft") - + if only_upphor: sections_str = format_section_list(list(only_upphor)) # Use specific terminology if all are actively revoked @@ -261,12 +266,13 @@ def generate_descriptive_commit_message( message_parts.append(f"{sections_str} upphävs") else: message_parts.append(f"{sections_str} upphör att gälla") - + if message_parts: message = f"{emoji} {doc_name}: {', och '.join(message_parts)}" else: - raise ValueError(f"Ikraft- och upphör-ändringar på samma datum, borde inte vara möjligt för {doc_name}. Kontrollera ändringarna.") - + raise ValueError( + f"Ikraft- och upphör-ändringar på samma datum, borde inte vara möjligt för {doc_name}. Kontrollera ändringarna.") + elif has_ikraft: # Entry into force emoji = "✅" @@ -280,8 +286,9 @@ def generate_descriptive_commit_message( # Article-level change - whole document comes into force message = f"{emoji} {doc_name} träder i kraft" else: - raise ValueError(f"Ikraft-ändringar hittades för {doc_name} men varken sections eller article-ändringar kunde identifieras") - + raise ValueError( + f"Ikraft-ändringar hittades för {doc_name} men varken sections eller article-ändringar kunde identifieras") + else: # has_upphor # Expiration emoji = "🚫" @@ -298,7 +305,8 @@ def generate_descriptive_commit_message( if set(upphor_sections).issubset(set(upphavd_sections)): message = f"{emoji} {doc_name}: {sections_str} upphävs" else: - # Mixed or temporal expiration - use general term but indicate if some are actively revoked + # Mixed or temporal expiration - use general term but indicate if some are + # actively revoked if upphavd_sections: message = f"{emoji} {doc_name}: {sections_str} upphävs" else: @@ -310,8 +318,9 @@ def generate_descriptive_commit_message( else: message = f"{emoji} {doc_name} upphör att gälla" else: - raise ValueError(f"Upphor-ändringar hittades för {doc_name} men varken sections eller article-ändringar kunde identifieras") - + raise ValueError( + f"Upphor-ändringar hittades för {doc_name} men varken sections eller article-ändringar kunde identifieras") + return message @@ -324,17 +333,17 @@ def generate_temporal_commits( ) -> None: """ Generate Git commits for temporal changes in a markdown file. - + This function reads a markdown file, identifies upcoming changes using identify_upcoming_changes, and creates Git commits on the appropriate dates with suitable emojis. - + Args: markdown_file: Path to the markdown file to process from_date: Start date (inclusive) in YYYY-MM-DD format. If None, no lower bound. to_date: End date (inclusive) in YYYY-MM-DD format. If None, no upper bound. dry_run: If True, show what would be committed without making actual commits - + Raises: ValueError: If date format is invalid subprocess.CalledProcessError: If git commands fail @@ -345,62 +354,63 @@ def generate_temporal_commits( datetime.strptime(from_date, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid from_date format: {from_date}. Expected YYYY-MM-DD") - + if to_date: try: datetime.strptime(to_date, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid to_date format: {to_date}. Expected YYYY-MM-DD") - + # Read the markdown file if not markdown_file.exists(): print(f"Fel: Filen {markdown_file} finns inte") return - + try: content = read_file_content(markdown_file) except IOError as e: print(str(e)) return - + # Check if selex tags are present (required for temporal processing) if ' to_date: continue - + filtered_changes.append(change) - + if not filtered_changes: print(f"Inga ändringar inom datumintervallet {from_date or 'början'} - {to_date or 'slut'}") return - + # Extract doc_name and rubrik from frontmatter doc_name = extract_frontmatter_property(content, 'beteckning') rubrik = extract_frontmatter_property(content, 'rubrik') - + if not doc_name: print(f"Varning: Ingen doc_name hittades i frontmatter för {markdown_file}") return - + print(f"Använder doc_name: {doc_name}") - + # Group changes by date changes_by_date = {} for change in filtered_changes: @@ -408,98 +418,104 @@ def generate_temporal_commits( if date not in changes_by_date: changes_by_date[date] = [] changes_by_date[date].append(change) - + if dry_run: # Dry run mode - show what would be committed without actually committing print(f"\n{'='*80}") print(f"DRY RUN: Visar planerade commits för {markdown_file.name}") print(f"{'='*80}") - + # Table headers print(f"{'Datum':<12} {'Meddelande':<150}") print(f"{'-'*12} {'-'*150}") - + for date in sorted(changes_by_date.keys()): date_changes = changes_by_date[date] - + # Apply temporal changes for this date (includes H1 title processing) try: filtered_content = apply_temporal(content, date, dry_run) # No verbose for dry run # Check if document is empty after temporal processing and add explanatory message if is_document_content_empty(filtered_content): - filtered_content = add_empty_document_message(filtered_content, data=None, target_date=date) + filtered_content = add_empty_document_message( + filtered_content, data=None, target_date=date) # Apply temporal title processing for frontmatter rubrik if it exists if rubrik: temporal_rubrik = title_temporal(rubrik, date) - filtered_content = set_prop_in_frontmatter(filtered_content, "rubrik", temporal_rubrik) + filtered_content = set_prop_in_frontmatter( + filtered_content, "rubrik", temporal_rubrik) # Remove andringsforfattningar from frontmatter in git mode - filtered_content = remove_prop_from_frontmatter(filtered_content, "andringsforfattningar") - + filtered_content = remove_prop_from_frontmatter( + filtered_content, "andringsforfattningar") + # Clean selex tags for final content clean_content = clean_selex_tags(filtered_content) - + # Generate descriptive commit message message = generate_descriptive_commit_message(doc_name, date_changes) - + # Truncate message if too long for table display_message = message[:147] + "..." if len(message) > 150 else message - + print(f"{date:<12} {display_message:<150}") - + except Exception as e: print(f"{date:<12} {'FEL: ' + str(e)[:147]:<150}") - + print(f"\nTotalt {len(changes_by_date)} commits skulle skapas.") print("Kör utan --dry-run för att utföra commits på riktigt.") return - + # Normal mode - create actual commits original_content = content # Store original content for restoration - + # Create commits for each date for date in sorted(changes_by_date.keys()): date_changes = changes_by_date[date] - + # Apply temporal changes for this date (includes H1 title processing) try: filtered_content = apply_temporal(content, date, False) # Check if document is empty after temporal processing and add explanatory message if is_document_content_empty(filtered_content): - filtered_content = add_empty_document_message(filtered_content, data=None, target_date=date) + filtered_content = add_empty_document_message( + filtered_content, data=None, target_date=date) # Apply temporal title processing for frontmatter rubrik if it exists if rubrik: temporal_rubrik = title_temporal(rubrik, date) - filtered_content = set_prop_in_frontmatter(filtered_content, "rubrik", temporal_rubrik) + filtered_content = set_prop_in_frontmatter( + filtered_content, "rubrik", temporal_rubrik) # Remove andringsforfattningar from frontmatter in git mode - filtered_content = remove_prop_from_frontmatter(filtered_content, "andringsforfattningar") + filtered_content = remove_prop_from_frontmatter( + filtered_content, "andringsforfattningar") # Clean selex tags before committing to git clean_content = clean_selex_tags(filtered_content) - + # Write the file (use clean content without selex tags for git) save_to_disk(markdown_file, clean_content) except Exception as e: print(f"Fel vid tillämpning av temporal ändringar för {date}: {e}") continue - + # Generate descriptive commit message message = generate_descriptive_commit_message(doc_name, date_changes) - + # Stage the file if not stage_file(str(markdown_file)): continue - + # Check if there are any changes to commit if not has_staged_changes(): print(f"Inga ändringar att committa för {date}") continue - + # Create commit with the appropriate date git_date = format_datetime_for_git(date) if not git_date: @@ -507,7 +523,7 @@ def generate_temporal_commits( if not create_commit_with_date(message, git_date, verbose=True): print(f"Fel vid commit för {date}") - + # Restore original content after all commits try: save_to_disk(markdown_file, original_content) @@ -523,7 +539,7 @@ def generate_commits_for_directory( ) -> None: """ Generate Git commits for all markdown files in a directory. - + Args: directory: Path to directory containing markdown files from_date: Start date (inclusive) in YYYY-MM-DD format. If None, no lower bound. @@ -533,23 +549,23 @@ def generate_commits_for_directory( if not directory.exists(): print(f"Fel: Katalogen {directory} finns inte") return - + if not directory.is_dir(): print(f"Fel: {directory} är inte en katalog") return - + # Find all markdown files md_files = list(directory.rglob("*.md")) - + if not md_files: print(f"Inga markdown-filer hittades i {directory}") return - + print(f"Bearbetar {len(md_files)} markdown-filer...") - + for md_file in md_files: print(f"\nBearbetar {md_file.name}...") - + try: generate_temporal_commits(md_file, None, from_date, to_date, dry_run) except Exception as e: @@ -558,7 +574,7 @@ def generate_commits_for_directory( if __name__ == "__main__": import argparse - + parser = argparse.ArgumentParser( description='Generera Git-commits baserat på temporala ändringar i svenska lagdokument.' ) @@ -571,7 +587,7 @@ def generate_commits_for_directory( help='Startdatum (inklusivt) i formatet YYYY-MM-DD' ) parser.add_argument( - '--to-date', + '--to-date', help='Slutdatum (inklusivt) i formatet YYYY-MM-DD' ) parser.add_argument( @@ -579,14 +595,14 @@ def generate_commits_for_directory( action='store_true', help='Visa planerade commits utan att utföra dem' ) - + args = parser.parse_args() - + path = Path(args.path) - + if path.is_file(): generate_temporal_commits(path, None, args.from_date, args.to_date, args.dry_run) elif path.is_dir(): generate_commits_for_directory(path, args.from_date, args.to_date, args.dry_run) else: - print(f"Fel: {path} finns inte") \ No newline at end of file + print(f"Fel: {path} finns inte") diff --git a/exporters/git/git_utils.py b/exporters/git/git_utils.py index b526cc17..8d9fd0b2 100644 --- a/exporters/git/git_utils.py +++ b/exporters/git/git_utils.py @@ -21,17 +21,17 @@ def prepare_git_branch(git_branch, remove_all_commits_first=True, verbose=False) Ensures that git commits are made in a different branch than the current one. Creates a new branch if needed and switches to it. Returns the original branch name and the commit branch name. - + Args: - git_branch: The branch name to use. If it contains "(date)", + git_branch: The branch name to use. If it contains "(date)", that will be replaced with current date. remove_all_commits_first: If True, removes all commits on the branch before proceeding. verbose: If True, print detailed information. """ try: # Get current branch name - result = subprocess.run(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) + result = subprocess.run(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) current_branch = result.stdout.strip() # Generate commit branch name @@ -42,17 +42,17 @@ def prepare_git_branch(git_branch, remove_all_commits_first=True, verbose=False) commit_branch = git_branch # Create and switch to the new branch - subprocess.run(['git', 'checkout', '-b', commit_branch], - check=True, capture_output=True, timeout=GIT_TIMEOUT) + subprocess.run(['git', 'checkout', '-b', commit_branch], + check=True, capture_output=True, timeout=GIT_TIMEOUT) print(f"Skapade och bytte till branch '{commit_branch}' för git-commits") - + # Remove all commits on branch if requested if remove_all_commits_first: removed_commits = remove_all_commits_on_branch(verbose=verbose) if removed_commits > 0: print(f"Tog bort {removed_commits} tidigare commits från branchen") - + return current_branch, commit_branch except subprocess.CalledProcessError as e: @@ -69,10 +69,10 @@ def restore_original_branch(original_branch): """ if not original_branch: return - + try: - subprocess.run(['git', 'checkout', original_branch], - check=True, capture_output=True, timeout=GIT_TIMEOUT) + subprocess.run(['git', 'checkout', original_branch], + check=True, capture_output=True, timeout=GIT_TIMEOUT) print(f"Bytte tillbaka till ursprunglig branch '{original_branch}'") except subprocess.CalledProcessError as e: print(f"Varning: Kunde inte byta tillbaka till ursprunglig branch: {e}") @@ -83,11 +83,11 @@ def restore_original_branch(original_branch): def remove_all_commits_on_branch(branch_name=None, verbose=False): """ Remove all commits on the specified branch (or current branch) that are not on the main branch. - + Args: branch_name: The branch to remove commits from. If None, uses current branch. verbose: If True, print detailed information about removed commits. - + Returns: int: Number of commits removed """ @@ -96,55 +96,55 @@ def remove_all_commits_on_branch(branch_name=None, verbose=False): original_branch = None if branch_name: # Get current branch - result = subprocess.run(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) + result = subprocess.run(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) original_branch = result.stdout.strip() - + # Switch to target branch if different if original_branch != branch_name: - subprocess.run(['git', 'checkout', branch_name], - check=True, capture_output=True, timeout=GIT_TIMEOUT) - + subprocess.run(['git', 'checkout', branch_name], + check=True, capture_output=True, timeout=GIT_TIMEOUT) + # Get the merge base with main branch (the point where current branch diverged) result = subprocess.run([ 'git', 'merge-base', 'HEAD', GIT_MAIN_BRANCH ], capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) - + merge_base = result.stdout.strip() - + # Find all commits on current branch since merge base result = subprocess.run([ - 'git', 'log', + 'git', 'log', f'{merge_base}..HEAD', '--format=%H %s', '--reverse' # Show oldest first ], capture_output=True, text=True, check=True, timeout=GIT_TIMEOUT) - + commits_to_remove = result.stdout.strip().split('\n') if result.stdout.strip() else [] - + if not commits_to_remove: if verbose: print("Inga commits att ta bort på denna branch") return 0 - + if verbose: print(f"Tar bort {len(commits_to_remove)} commits på branchen:") for commit_info in commits_to_remove: print(f" - {commit_info}") - + # Reset to merge base (hard reset to remove all changes) - subprocess.run(['git', 'reset', '--hard', merge_base], - check=True, capture_output=True, timeout=GIT_TIMEOUT) - + subprocess.run(['git', 'reset', '--hard', merge_base], + check=True, capture_output=True, timeout=GIT_TIMEOUT) + print(f"Tog bort {len(commits_to_remove)} commits från branchen") - + # Switch back to original branch if we switched if original_branch and original_branch != branch_name: - subprocess.run(['git', 'checkout', original_branch], - check=True, capture_output=True, timeout=GIT_TIMEOUT) - + subprocess.run(['git', 'checkout', original_branch], + check=True, capture_output=True, timeout=GIT_TIMEOUT) + return len(commits_to_remove) - + except subprocess.CalledProcessError as e: print(f"Varning: Kunde inte ta bort commits: {e}") if hasattr(e, 'stderr') and e.stderr: @@ -161,7 +161,7 @@ def remove_all_commits_on_branch(branch_name=None, verbose=False): def get_target_repository() -> str: """ Get the target repository URL from environment variable or use default. - + Returns: str: Repository URL to push to """ @@ -171,35 +171,35 @@ def get_target_repository() -> str: def configure_git_remote(repo_url: str, remote_name: str = 'target', verbose: bool = False) -> bool: """ Configure a git remote for pushing commits. - + Args: repo_url: URL of the target repository remote_name: Name for the remote (default: 'target') verbose: Enable verbose output - + Returns: bool: True if successful, False otherwise """ try: # Check if remote already exists - result = subprocess.run(['git', 'remote', 'get-url', remote_name], - capture_output=True, timeout=GIT_TIMEOUT) - + result = subprocess.run(['git', 'remote', 'get-url', remote_name], + capture_output=True, timeout=GIT_TIMEOUT) + if result.returncode == 0: # Remote exists, update it - subprocess.run(['git', 'remote', 'set-url', remote_name, repo_url], - check=True, capture_output=True, timeout=GIT_TIMEOUT) + subprocess.run(['git', 'remote', 'set-url', remote_name, repo_url], + check=True, capture_output=True, timeout=GIT_TIMEOUT) if verbose: print(f"Uppdaterade remote '{remote_name}' till {repo_url}") else: # Remote doesn't exist, add it - subprocess.run(['git', 'remote', 'add', remote_name, repo_url], - check=True, capture_output=True, timeout=GIT_TIMEOUT) + subprocess.run(['git', 'remote', 'add', remote_name, repo_url], + check=True, capture_output=True, timeout=GIT_TIMEOUT) if verbose: print(f"Lade till remote '{remote_name}': {repo_url}") - + return True - + except subprocess.CalledProcessError as e: print(f"Fel vid konfiguration av git remote: {e}") if hasattr(e, 'stderr') and e.stderr: @@ -210,17 +210,17 @@ def configure_git_remote(repo_url: str, remote_name: str = 'target', verbose: bo def create_authenticated_url(repo_url: str, pat_token: str) -> str: """ Create an authenticated URL using PAT token. - + Args: repo_url: Original repository URL pat_token: Personal Access Token - + Returns: str: Authenticated URL """ if not pat_token: return repo_url - + parsed = urlparse(repo_url) if parsed.hostname == 'github.com': # For GitHub, use token as username @@ -233,20 +233,20 @@ def create_authenticated_url(repo_url: str, pat_token: str) -> str: def clone_target_repository_to_temp(verbose: bool = False) -> tuple[Path, str]: """ Clone target repository to a temporary directory. - + Args: verbose: Enable verbose output - + Returns: tuple[Path, str]: (repo_directory_path, original_cwd) or (None, None) if failed """ import tempfile - + try: # Get repository URL and PAT token repo_url = get_target_repository() pat_token = os.getenv('GIT_GITHUB_PAT') - + # Try to load PAT from .env file if not in environment if not pat_token: try: @@ -255,7 +255,7 @@ def clone_target_repository_to_temp(verbose: bool = False) -> tuple[Path, str]: pat_token = os.getenv('GIT_GITHUB_PAT') except ImportError: pass # dotenv not available - + # Create authenticated URL if PAT is available if pat_token: auth_url = create_authenticated_url(repo_url, pat_token) @@ -263,24 +263,24 @@ def clone_target_repository_to_temp(verbose: bool = False) -> tuple[Path, str]: auth_url = repo_url if verbose: print("Varning: Ingen PAT token hittades, använder okrypterad URL") - + # Create temporary directory for cloning temp_dir = tempfile.mkdtemp() repo_dir = Path(temp_dir) / "target_repo" - + if verbose: print(f"Klonar {repo_url} till temporär katalog...") - + # Clone the repository subprocess.run([ 'git', 'clone', auth_url, str(repo_dir) ], check=True, capture_output=True, timeout=GIT_TIMEOUT) - + # Remember original directory original_cwd = os.getcwd() - + return repo_dir, original_cwd - + except subprocess.CalledProcessError as e: print(f"Fel vid kloning av target repository: {e}") if hasattr(e, 'stderr') and e.stderr: @@ -294,16 +294,16 @@ def clone_target_repository_to_temp(verbose: bool = False) -> tuple[Path, str]: def is_file_tracked(file_path: str) -> bool: """ Check if a file is already tracked by git. - + Args: file_path: Path to the file to check - + Returns: bool: True if file is tracked, False otherwise """ try: - result = subprocess.run(['git', 'ls-files', file_path], - capture_output=True, text=True, timeout=GIT_TIMEOUT) + result = subprocess.run(['git', 'ls-files', file_path], + capture_output=True, text=True, timeout=GIT_TIMEOUT) return result.returncode == 0 and result.stdout.strip() != "" except subprocess.CalledProcessError: return False @@ -314,13 +314,13 @@ def is_file_tracked(file_path: str) -> bool: def has_staged_changes() -> bool: """ Check if there are any staged changes ready to commit. - + Returns: bool: True if there are staged changes, False otherwise """ try: - result = subprocess.run(['git', 'diff', '--cached', '--quiet'], - capture_output=True, timeout=GIT_TIMEOUT) + result = subprocess.run(['git', 'diff', '--cached', '--quiet'], + capture_output=True, timeout=GIT_TIMEOUT) # git diff --cached --quiet returns 0 if there are no changes, 1 if there are changes return result.returncode != 0 except subprocess.CalledProcessError: @@ -332,23 +332,23 @@ def has_staged_changes() -> bool: def stage_file(file_path: str, verbose: bool = False) -> bool: """ Stage a file for git commit. - + Args: file_path: Path to the file to stage verbose: Enable verbose output - + Returns: bool: True if staging was successful, False otherwise """ try: subprocess.run(['git', 'add', file_path], - check=True, capture_output=True, timeout=GIT_TIMEOUT) - + check=True, capture_output=True, timeout=GIT_TIMEOUT) + if verbose: print(f"Stagade fil: {file_path}") - + return True - + except subprocess.CalledProcessError as e: print(f"Fel vid staging av {file_path}: {e}") if hasattr(e, 'stderr') and e.stderr: @@ -359,23 +359,26 @@ def stage_file(file_path: str, verbose: bool = False) -> bool: return False -def checkout_branch(branch_name: str, create_if_missing: bool = True, verbose: bool = False) -> bool: +def checkout_branch( + branch_name: str, + create_if_missing: bool = True, + verbose: bool = False) -> bool: """ Checkout to a git branch, optionally creating it if it doesn't exist. - + Args: branch_name: Name of the branch to checkout create_if_missing: If True, create the branch if it doesn't exist verbose: Enable verbose output - + Returns: bool: True if checkout was successful, False otherwise """ try: # Try to checkout the branch first result = subprocess.run(['git', 'checkout', branch_name], - capture_output=True, timeout=GIT_TIMEOUT) - + capture_output=True, timeout=GIT_TIMEOUT) + if result.returncode == 0: if verbose: print(f"Bytte till branch '{branch_name}'") @@ -383,7 +386,7 @@ def checkout_branch(branch_name: str, create_if_missing: bool = True, verbose: b elif create_if_missing: # Branch doesn't exist, create it subprocess.run(['git', 'checkout', '-b', branch_name], - check=True, capture_output=True, timeout=GIT_TIMEOUT) + check=True, capture_output=True, timeout=GIT_TIMEOUT) if verbose: print(f"Skapade och bytte till branch '{branch_name}'") return True @@ -391,7 +394,7 @@ def checkout_branch(branch_name: str, create_if_missing: bool = True, verbose: b if verbose: print(f"Branch '{branch_name}' finns inte och create_if_missing=False") return False - + except subprocess.CalledProcessError as e: print(f"Fel vid checkout av branch '{branch_name}': {e}") if hasattr(e, 'stderr') and e.stderr: @@ -405,11 +408,11 @@ def checkout_branch(branch_name: str, create_if_missing: bool = True, verbose: b def check_duplicate_commit_message(message: str, verbose: bool = False) -> bool: """ Check if a commit with the given message already exists in the current branch. - + Args: message: Commit message to check verbose: Enable verbose output - + Returns: bool: True if a duplicate exists, False otherwise """ @@ -418,15 +421,15 @@ def check_duplicate_commit_message(message: str, verbose: bool = False) -> bool: result = subprocess.run([ 'git', 'log', '--grep', f'^{message}$', '--format=%H', '-n', '1' ], capture_output=True, text=True, timeout=GIT_TIMEOUT) - + has_duplicate = result.returncode == 0 and result.stdout.strip() != "" - + if has_duplicate and verbose: commit_hash = result.stdout.strip() print(f"Varning: En commit med samma meddelande finns redan: {commit_hash}") - + return has_duplicate - + except subprocess.CalledProcessError as e: if verbose: print(f"Fel vid sökning efter duplicerad commit: {e}") @@ -440,12 +443,12 @@ def check_duplicate_commit_message(message: str, verbose: bool = False) -> bool: def create_commit_with_date(message: str, date: str, verbose: bool = False) -> bool: """ Create a git commit with a specified date. - + Args: message: Commit message date: Date string in format that git accepts (e.g., "2024-01-01 12:00:00 +0100") verbose: Enable verbose output - + Returns: bool: True if commit was successful, False otherwise """ @@ -453,19 +456,19 @@ def create_commit_with_date(message: str, date: str, verbose: bool = False) -> b # Check for duplicate commit message if check_duplicate_commit_message(message, verbose): raise ValueError(f"En commit med meddelandet '{message}' finns redan!") - + # Set both author and committer dates env = {**os.environ, 'GIT_AUTHOR_DATE': date, 'GIT_COMMITTER_DATE': date} - + subprocess.run([ 'git', 'commit', '-m', message ], check=True, capture_output=True, env=env, timeout=GIT_TIMEOUT) - + if verbose: print(f"Git-commit skapad: '{message}' daterad {date}") - + return True - + except ValueError as e: print(f"❌ Fel: {e}") return False @@ -479,15 +482,18 @@ def create_commit_with_date(message: str, date: str, verbose: bool = False) -> b return False -def push_to_target_repository(branch_name: str, remote_name: str = 'target', verbose: bool = False) -> bool: +def push_to_target_repository( + branch_name: str, + remote_name: str = 'target', + verbose: bool = False) -> bool: """ Push the specified branch to the target repository. - + Args: branch_name: Name of the branch to push remote_name: Name of the remote to push to verbose: Enable verbose output - + Returns: bool: True if push was successful, False otherwise """ @@ -497,11 +503,11 @@ def push_to_target_repository(branch_name: str, remote_name: str = 'target', ver print(f"❌ Fel: Kan inte pusha till main branch '{GIT_MAIN_BRANCH}' för säkerhet") print(f"Använd ensure_git_branch_for_commits() för att skapa en separat branch först") return False - + # Get repository URL and PAT token repo_url = get_target_repository() pat_token = os.getenv('GIT_GITHUB_PAT') - + # Create authenticated URL if PAT is available if pat_token: auth_url = create_authenticated_url(repo_url, pat_token) @@ -512,14 +518,14 @@ def push_to_target_repository(branch_name: str, remote_name: str = 'target', ver # Configure remote without authentication if not configure_git_remote(repo_url, remote_name, verbose): return False - + # Push the branch if verbose: print(f"Pushar branch '{branch_name}' till remote '{remote_name}'...") - - result = subprocess.run(['git', 'push', remote_name, branch_name], - capture_output=True, text=True, timeout=GIT_TIMEOUT) - + + result = subprocess.run(['git', 'push', remote_name, branch_name], + capture_output=True, text=True, timeout=GIT_TIMEOUT) + if result.returncode == 0: if verbose: print(f"Lyckades pusha branch '{branch_name}' till {repo_url}") @@ -531,7 +537,7 @@ def push_to_target_repository(branch_name: str, remote_name: str = 'target', ver if result.stderr: print(f"Git stderr: {result.stderr}") return False - + except subprocess.CalledProcessError as e: print(f"Fel vid push till target repository: {e}") if hasattr(e, 'stderr') and e.stderr: diff --git a/exporters/git/init_commits_batch_processor.py b/exporters/git/init_commits_batch_processor.py index ed150269..556b48bd 100644 --- a/exporters/git/init_commits_batch_processor.py +++ b/exporters/git/init_commits_batch_processor.py @@ -17,7 +17,13 @@ from util.file_utils import read_file_content -def process_files_with_git_batch(json_files, output_dir, verbose, fetch_predocs_from_api, batch_size=100, branch_name=None): +def process_files_with_git_batch( + json_files, + output_dir, + verbose, + fetch_predocs_from_api, + batch_size=100, + branch_name=None): """Process files with git batch workflow, using same branch but pushing after each batch.""" # Clone target repository once for all batches repo_dir, original_cwd = clone_target_repository_to_temp(verbose=verbose) @@ -47,26 +53,43 @@ def process_files_with_git_batch(json_files, output_dir, verbose, fetch_predocs_ print(f"Delar upp {total_files} filer i batcher om {batch_size} filer var") batches = [json_files[i:i + batch_size] for i in range(0, total_files, batch_size)] print(f"Skapade {len(batches)} batcher") - + # Process each batch in the same repository and branch, pushing after each for i, batch in enumerate(batches, 1): print(f"\nBearbetar batch {i}/{len(batches)} ({len(batch)} filer)...") - _process_batch_files(batch, output_dir, verbose, fetch_predocs_from_api, original_cwd, i, len(batches)) - + _process_batch_files( + batch, + output_dir, + verbose, + fetch_predocs_from_api, + original_cwd, + i, + len(batches)) + # Push after each batch print(f"Pushar batch {i}/{len(batches)} till target repository...") if push_to_target_repository(unique_branch, 'origin', verbose): - print(f"Batch {i}/{len(batches)} pushad till target repository som branch '{unique_branch}'") + print( + f"Batch {i}/{len(batches)} pushad till target repository som branch '{unique_branch}'") else: - print(f"Misslyckades med att pusha batch {i}/{len(batches)} till target repository") + print( + f"Misslyckades med att pusha batch {i}/{len(batches)} till target repository") else: print(f"Bearbetar {total_files} filer i en enda batch...") - _process_batch_files(json_files, output_dir, verbose, fetch_predocs_from_api, original_cwd, 1, 1) - + _process_batch_files( + json_files, + output_dir, + verbose, + fetch_predocs_from_api, + original_cwd, + 1, + 1) + # Push the single batch print(f"Pushar alla {total_files} filer till target repository...") if push_to_target_repository(unique_branch, 'origin', verbose): - print(f"Alla {total_files} filer pushade till target repository som branch '{unique_branch}'") + print( + f"Alla {total_files} filer pushade till target repository som branch '{unique_branch}'") else: print(f"Misslyckades med att pusha till target repository") @@ -78,7 +101,14 @@ def process_files_with_git_batch(json_files, output_dir, verbose, fetch_predocs_ os.chdir(original_cwd) -def _process_batch_files(json_files, output_dir, verbose, fetch_predocs_from_api, original_cwd, batch_num, total_batches): +def _process_batch_files( + json_files, + output_dir, + verbose, + fetch_predocs_from_api, + original_cwd, + batch_num, + total_batches): """Process batch files in the current repository without creating new branches.""" # Process each JSON file in the current git repository from sfs_processor import make_document @@ -98,9 +128,15 @@ def _process_batch_files(json_files, output_dir, verbose, fetch_predocs_from_api original_output_dir = Path(original_cwd) / Path(output_dir).name else: original_output_dir = Path(output_dir) - make_document(data, original_output_dir, ["git"], True, verbose, True, fetch_predocs_from_api, True) - + make_document( + data, + original_output_dir, + ["git"], + True, + verbose, + True, + fetch_predocs_from_api, + True) + if verbose: print(f"Batch {batch_num}/{total_batches} bearbetad ({len(json_files)} filer)") - - diff --git a/exporters/git/temporal_commits_batch_processor.py b/exporters/git/temporal_commits_batch_processor.py index 1ca8195a..ff46314d 100644 --- a/exporters/git/temporal_commits_batch_processor.py +++ b/exporters/git/temporal_commits_batch_processor.py @@ -19,11 +19,11 @@ def _create_temporal_branch_name(from_date: Optional[str], to_date: Optional[str]) -> str: """ Create a descriptive branch name based on the date range. - + Args: from_date: Start date in YYYY-MM-DD format to_date: End date in YYYY-MM-DD format - + Returns: str: Branch name for temporal commits """ @@ -57,11 +57,11 @@ def process_temporal_commits_batch( ) -> None: """ Process temporal commits for markdown files in batch, with git operations. - + Args: markdown_dir: Directory containing markdown files to process from_date: Start date (inclusive) in YYYY-MM-DD format - to_date: End date (inclusive) in YYYY-MM-DD format + to_date: End date (inclusive) in YYYY-MM-DD format dry_run: If True, show what would be committed without making actual commits verbose: Enable verbose output batch_size: Number of files to process per batch @@ -76,7 +76,7 @@ def process_temporal_commits_batch( # Find all markdown files md_files = list(markdown_dir.rglob("*.md")) - + if not md_files: print(f"Inga markdown-filer hittades i {markdown_dir}") return @@ -87,7 +87,8 @@ def process_temporal_commits_batch( print("KÖR I DRY-RUN LÄGE - inga commits kommer att skapas") _process_temporal_commits_dry_run(md_files, from_date, to_date) else: - _process_temporal_commits_with_git(md_files, from_date, to_date, verbose, batch_size, branch_name) + _process_temporal_commits_with_git( + md_files, from_date, to_date, verbose, batch_size, branch_name) def _process_temporal_commits_dry_run( @@ -99,7 +100,7 @@ def _process_temporal_commits_dry_run( print(f"\n{'='*80}") print(f"DRY RUN: Visar planerade temporal commits") print(f"{'='*80}") - + for md_file in md_files: print(f"\nBearbetar {md_file.name}...") try: @@ -108,7 +109,7 @@ def _process_temporal_commits_dry_run( # Note: generate_temporal_commits handles its own dry-run output except Exception as e: print(f"Fel vid bearbetning av {md_file}: {e}") - + print(f"\nDRY RUN KLAR") @@ -147,26 +148,30 @@ def _process_temporal_commits_with_git( print(f"Delar upp {total_files} filer i batcher om {batch_size} filer var") batches = [md_files[i:i + batch_size] for i in range(0, total_files, batch_size)] print(f"Skapade {len(batches)} batcher") - + # Process each batch in the same repository and branch, pushing after each for i, batch in enumerate(batches, 1): print(f"\nBearbetar temporal batch {i}/{len(batches)} ({len(batch)} filer)...") - _process_temporal_batch_files(batch, from_date, to_date, verbose, original_cwd, i, len(batches)) - + _process_temporal_batch_files( + batch, from_date, to_date, verbose, original_cwd, i, len(batches)) + # Push after each batch print(f"Pushar temporal batch {i}/{len(batches)} till target repository...") if push_to_target_repository(unique_branch, 'origin', verbose): - print(f"Temporal batch {i}/{len(batches)} pushad till target repository som branch '{unique_branch}'") + print( + f"Temporal batch {i}/{len(batches)} pushad till target repository som branch '{unique_branch}'") else: - print(f"Misslyckades med att pusha temporal batch {i}/{len(batches)} till target repository") + print( + f"Misslyckades med att pusha temporal batch {i}/{len(batches)} till target repository") else: print(f"Bearbetar {total_files} filer i en enda temporal batch...") _process_temporal_batch_files(md_files, from_date, to_date, verbose, original_cwd, 1, 1) - + # Push the single batch print(f"Pushar alla {total_files} temporal commits till target repository...") if push_to_target_repository(unique_branch, 'origin', verbose): - print(f"Alla {total_files} temporal commits pushade till target repository som branch '{unique_branch}'") + print( + f"Alla {total_files} temporal commits pushade till target repository som branch '{unique_branch}'") else: print(f"Misslyckades med att pusha temporal commits till target repository") @@ -189,36 +194,36 @@ def _process_temporal_batch_files( ) -> None: """Process batch of markdown files for temporal commits in the current repository.""" import shutil - + for md_file in md_files: # Use absolute path since we changed working directory abs_md_file = Path(original_cwd) / md_file - + # Copy file to year folder in cloned repo and remove -markers from filename # Extract year from file path (e.g., 2013/sfs-2013-xxx-markers.md -> 2013) year_dir = md_file.parent.name filename = md_file.name - + # Remove -markers from filename if present if "-markers" in filename: filename = filename.replace("-markers", "") - + # Target structure: year/filename (directly in year folder at root) target_file = Path.cwd() / year_dir / filename - + # Create directory structure if needed target_file.parent.mkdir(parents=True, exist_ok=True) - + try: # Copy the file to the git repo shutil.copy2(abs_md_file, target_file) - + print(f"Bearbetar {md_file.name} för temporal commits...") # Run generate_temporal_commits on the copied file in the repo generate_temporal_commits(target_file, None, from_date, to_date, dry_run=False) except Exception as e: print(f"Fel vid temporal bearbetning av {abs_md_file}: {e}") continue - + if verbose: - print(f"Temporal batch {batch_num}/{total_batches} bearbetad ({len(md_files)} filer)") \ No newline at end of file + print(f"Temporal batch {batch_num}/{total_batches} bearbetad ({len(md_files)} filer)") diff --git a/exporters/html/eli_utils.py b/exporters/html/eli_utils.py index defa6da5..beb11bd1 100644 --- a/exporters/html/eli_utils.py +++ b/exporters/html/eli_utils.py @@ -18,7 +18,7 @@ def get_eli_host() -> str: """ Returnerar ELI host från environment variabel eller default. - + Returns: str: ELI host (default: selex.se) """ @@ -28,7 +28,7 @@ def get_eli_host() -> str: def get_eli_base_url() -> str: """ Returnerar bas-URL:en för ELI-systemet. - + Returns: str: ELI bas-URL """ @@ -38,7 +38,7 @@ def get_eli_base_url() -> str: def get_sfs_eli_namespace() -> str: """ Returnerar namespace för SFS-dokument i ELI-systemet. - + Returns: str: SFS namespace inom ELI """ @@ -48,14 +48,14 @@ def get_sfs_eli_namespace() -> str: def generate_eli_canonical_url(beteckning: str, output_format: str = 'html') -> Optional[str]: """ Genererar en ELI canonical URL för ett SFS-dokument. - + Args: beteckning (str): Dokument beteckning i format "YYYY:NNN" (t.ex. "2024:1000") output_format (str): Format för URL:en ('html', 'pdf', etc). 'html' ger bas-URL utan suffix. - + Returns: Optional[str]: ELI URL eller None om beteckningen är ogiltig - + Example: >>> generate_eli_canonical_url("2024:1000") 'http://selex.se/eli/sfs/2024/1000/' @@ -65,37 +65,37 @@ def generate_eli_canonical_url(beteckning: str, output_format: str = 'html') -> """ if not beteckning or not isinstance(beteckning, str): return None - + # Validera format med regex pattern = r'^(\d{4}):(\d+)$' match = re.match(pattern, beteckning.strip()) - + if not match: return None - + year, nummer = match.groups() - + # Bygg bas-URL med konfigurerad host base_url = f"{get_sfs_eli_namespace()}/{year}/{nummer}" - + # För 'html' format, lägg bara till avslutande slash if output_format == 'html': base_url += "/" elif output_format: # För andra format, lägg till format-suffix base_url += f"/{output_format}" - + return base_url def generate_eli_canonical_url_from_data(data: dict, output_format: str = 'html') -> Optional[str]: """ Genererar en ELI canonical URL från SFS-dokumentdata. - + Args: data (dict): SFS-dokumentdata med 'beteckning' fält output_format (str): Format för URL:en ('html', 'pdf', 'oj/swe') - + Returns: Optional[str]: ELI URL eller None om beteckningen saknas eller är ogiltig @@ -106,7 +106,7 @@ def generate_eli_canonical_url_from_data(data: dict, output_format: str = 'html' """ if not isinstance(data, dict): return None - + beteckning = data.get('beteckning') if not beteckning: return None @@ -117,27 +117,28 @@ def generate_eli_canonical_url_from_data(data: dict, output_format: str = 'html' def validate_eli_url(url: str) -> bool: """ Validerar om en URL följer ELI-standarden för SFS-dokument. - + Args: url (str): URL att validera - + Returns: bool: True om URL:en är en giltig ELI URL för SFS - + Example: >>> validate_eli_url("http://selex.se/eli/sfs/2024/1000/") True - + >>> validate_eli_url("https://example.com/doc") False """ if not url or not isinstance(url, str): return False - + # Få aktuell host från konfiguration current_host = get_eli_host().replace('.', r'\.') - - # Regex för att matcha ELI SFS URL-format med konfigurerad host och tillåtna format (html, pdf, md) + + # Regex för att matcha ELI SFS URL-format med konfigurerad host och + # tillåtna format (html, pdf, md) pattern = f'^https?://{current_host}/eli/sfs/\\d{{4}}/\\d+(?:/(html|pdf|md))?/?$' return bool(re.match(pattern, url.strip())) @@ -145,38 +146,38 @@ def validate_eli_url(url: str) -> bool: def extract_beteckning_from_eli_url(url: str) -> Optional[str]: """ Extraherar beteckning (YYYY:NNN) från en ELI URL. - + Args: url (str): ELI URL att parsa - + Returns: Optional[str]: Beteckning i format "YYYY:NNN" eller None om URL:en är ogiltig """ if not validate_eli_url(url): return None - + # Få aktuell host från konfiguration current_host = get_eli_host().replace('.', r'\.') - + # Regex för att extrahera år och nummer med konfigurerad host pattern = f'https?://{current_host}/eli/sfs/(\\d{{4}})/(\\d+)' match = re.search(pattern, url.strip()) - + if match: year, nummer = match.groups() return f"{year}:{nummer}" - + return None def generate_eli_metadata_html(beteckning: str, output_format: str = 'html') -> Optional[str]: """ Genererar HTML meta-taggar för ELI canonical URL. - + Args: beteckning (str): Dokument beteckning i format "YYYY:NNN" output_format (str): Format för URL:en ('html', 'pdf', etc). 'html' ger bas-URL utan suffix. - + Returns: Optional[str]: HTML meta-taggar eller None om beteckningen är ogiltig @@ -187,10 +188,10 @@ def generate_eli_metadata_html(beteckning: str, output_format: str = 'html') -> eli_url = generate_eli_canonical_url(beteckning, output_format) if not eli_url: return None - + html_parts = [ f'', f'' ] - - return '\n'.join(html_parts) \ No newline at end of file + + return '\n'.join(html_parts) diff --git a/exporters/html/html_diff_page.py b/exporters/html/html_diff_page.py index 207104aa..5afd81c8 100644 --- a/exporters/html/html_diff_page.py +++ b/exporters/html/html_diff_page.py @@ -1,3 +1,4 @@ +from exporters.html.styling_constants import get_css_variables, COLORS import difflib import html import re @@ -9,9 +10,14 @@ project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) -from exporters.html.styling_constants import get_css_variables, COLORS -def create_html_diff(text_before: str, text_after: str, beteckning: str, rubrik: str, ikraft_datum: str, output_dir: Path = None) -> str: +def create_html_diff( + text_before: str, + text_after: str, + beteckning: str, + rubrik: str, + ikraft_datum: str, + output_dir: Path = None) -> str: """ Create an HTML diff file showing changes between before and after text. @@ -56,7 +62,7 @@ def create_html_diff(text_before: str, text_after: str, beteckning: str, rubrik: Ändringsförfattning {beteckning} - Textändringar