From 4480026bbd77d25dde2478095a834ae838ff4836 Mon Sep 17 00:00:00 2001 From: Simon Clematide Date: Sat, 6 Sep 2025 10:19:41 +0200 Subject: [PATCH] Add support for loading timestamps from a JSON manifest file and update S3 metadata accordingly --- impresso_essentials/io/s3_set_timestamp.py | 211 ++++++++++++++++++++- 1 file changed, 210 insertions(+), 1 deletion(-) diff --git a/impresso_essentials/io/s3_set_timestamp.py b/impresso_essentials/io/s3_set_timestamp.py index c816a15..646b8ea 100644 --- a/impresso_essentials/io/s3_set_timestamp.py +++ b/impresso_essentials/io/s3_set_timestamp.py @@ -39,6 +39,9 @@ --all-lines: If False, only the first timestamp is considered. --output: Optional S3 URI for the output file with updated metadata (only for --s3-file). --force: Force reprocessing even if metadata is already up-to-date (default: False). + --from-manifest: Use modification dates from a JSON manifest file and process files + in the same directory as the manifest. The S3 prefix is automatically derived + from the manifest location. """ import os @@ -54,7 +57,7 @@ import boto3 from botocore.client import Config -# Configure logging +# Configure logging (default level, will be updated from CLI args) logging.basicConfig( level=logging.INFO, format="%(asctime)-15s %(filename)s:%(lineno)d %(levelname)s: %(message)s", @@ -526,6 +529,189 @@ def report_missing_metadata_dirs( print(f"s3://{bucket}/{dir_path}/") +def load_manifest_timestamps(manifest_s3_uri: str) -> dict: + """ + Load modification timestamps from a JSON manifest file. + + Args: + manifest_s3_uri: S3 URI of the manifest JSON file. + + Returns: + dict: Dictionary mapping newspaper-year keys to modification timestamps. + + Raises: + ValueError: If manifest cannot be loaded or parsed. + """ + parsed = urlparse(manifest_s3_uri) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + + s3 = get_s3_client() + + log.info("Loading manifest from: %s", manifest_s3_uri) + + try: + # Download manifest file + response = s3.get_object(Bucket=bucket, Key=key) + manifest_data = json.loads(response['Body'].read().decode('utf-8')) + + # Extract newspaper-year modification dates + timestamps = {} + + for media in manifest_data.get('media_list', []): + media_title = media.get('media_title') + if not media_title: + continue + + # Look for year-level statistics + for stats in media.get('media_statistics', []): + if stats.get('granularity') == 'year': + element = stats.get('element', '') + last_mod = stats.get('last_modification_date') + + if element and last_mod: + # Convert to ISO format with Z suffix + try: + # Parse the timestamp and convert to UTC ISO format + dt = datetime.strptime(last_mod, "%Y-%m-%d %H:%M:%S") + iso_timestamp = dt.strftime("%Y-%m-%dT%H:%M:%SZ") + timestamps[element] = iso_timestamp + log.debug("Found timestamp for %s: %s", element, iso_timestamp) + except ValueError as e: + log.warning("Invalid timestamp format for %s: %s (%s)", element, last_mod, e) + + log.info("Loaded %d newspaper-year timestamps from manifest", len(timestamps)) + return timestamps + + except Exception as e: + log.error("Failed to load manifest: %s", e) + raise ValueError(f"Failed to load manifest: {e}") + + +def update_metadata_from_manifest( + manifest_s3_uri: str, + metadata_key: str, + force: bool = False, +): + """ + Updates metadata for S3 objects using timestamps from a manifest file. + The S3 prefix is automatically derived from the manifest location. + + Args: + manifest_s3_uri: S3 URI of the manifest JSON file. + metadata_key: The metadata key to update with the timestamp. + force: Force reprocessing even if metadata is already up-to-date. + + Returns: + None + """ + # Load timestamps from manifest + timestamps = load_manifest_timestamps(manifest_s3_uri) + + # Derive S3 prefix from manifest location + manifest_parsed = urlparse(manifest_s3_uri) + bucket = manifest_parsed.netloc + manifest_key = manifest_parsed.path.lstrip("/") + + # Get the directory containing the manifest file + manifest_dir = "/".join(manifest_key.split("/")[:-1]) + s3_prefix = f"s3://{bucket}/{manifest_dir}/" + + log.info("Derived S3 prefix from manifest location: %s", s3_prefix) + + # Calculate manifest directory level for validation + manifest_dir_level = len([p for p in manifest_key.split("/")[:-1] if p]) + + log.debug("Fetching S3 objects with prefix: %s", s3_prefix) + log.debug("Manifest directory level: %d", manifest_dir_level) + + s3 = get_s3_client() + + # Use a paginator to handle S3 object listing with paging + paginator = s3.get_paginator("list_objects_v2") + page_iterator = paginator.paginate(Bucket=bucket, Prefix=manifest_dir) + + skipped = 0 + processed = 0 + not_found = 0 + + for page in page_iterator: + for obj in page.get("Contents", []): + key = obj["Key"] + if key.endswith(".jsonl.bz2"): + # Validate directory nesting level + key_dir_level = len([p for p in key.split("/")[:-1] if p]) + expected_level_same = manifest_dir_level + 1 # manifest_dir/NEWSPAPER-YEAR.jsonl.bz2 + expected_level_nested = manifest_dir_level + 2 # manifest_dir/NEWSPAPER/NEWSPAPER-YEAR.jsonl.bz2 + + if key_dir_level not in [expected_level_same, expected_level_nested]: + log.debug("Skipping file due to incorrect nesting level: %s (level %d, expected %d or %d)", + key, key_dir_level, expected_level_same, expected_level_nested) + continue + + # Extract newspaper-year from filename + filename = key.split("/")[-1] + # Remove .jsonl.bz2 extension + base_filename = filename.replace(".jsonl.bz2", "") + + # Extract newspaper-year pattern from filename ending with -NEWSPAPER-YEAR + # Look for the last occurrence of a pattern like -NEWSPAPER-YEAR + parts = base_filename.split("-") + if len(parts) >= 2: + # Take the last two parts as NEWSPAPER-YEAR + newspaper_year = "-".join(parts[-2:]) + else: + # Fallback to the whole filename if pattern doesn't match + newspaper_year = base_filename + + # Look for timestamp in manifest + if newspaper_year in timestamps: + timestamp = timestamps[newspaper_year] + log.info("Processing file: %s with timestamp: %s", key, timestamp) + + s3_uri = f"s3://{bucket}/{key}" + try: + # Check if metadata already exists + head = s3.head_object(Bucket=bucket, Key=key) + existing_metadata = head.get("Metadata", {}) + + if metadata_key in existing_metadata and not force: + log.info("[SKIP] Metadata key '%s' already exists for %s", metadata_key, key) + skipped += 1 + continue + + # Update metadata with timestamp from manifest + updated_metadata = existing_metadata.copy() + updated_metadata[metadata_key] = timestamp + + log.debug("[UPDATE] Setting %s=%s on %s", metadata_key, timestamp, s3_uri) + + with disable_interrupts(): + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=updated_metadata, + MetadataDirective="REPLACE", + ContentType=head.get("ContentType", "application/octet-stream"), + ) + + processed += 1 + log.debug("[DONE] Metadata updated for %s", key) + + except Exception as e: + log.warning("Failed to update metadata for %s: %s", key, e) + skipped += 1 + else: + log.warning("No timestamp found in manifest for: %s", newspaper_year) + not_found += 1 + + log.info("Manifest-based update statistics:") + log.info("Total files processed: %d", processed) + log.info("Total files skipped: %d", skipped) + log.info("Files without manifest timestamp: %d", not_found) + + def main(): """ Parses command-line arguments and triggers the metadata update process. @@ -541,6 +727,8 @@ def main(): - --force: Force reprocessing even if metadata is already up-to-date. - --report: Report all files missing the specified metadata key. - --report-dirs: Report all directories containing files missing the specified metadata key. + - --from-manifest: Use modification dates from a JSON manifest file instead of extracting + timestamps from records. The S3 prefix is automatically derived from the manifest location. Returns: None @@ -598,9 +786,24 @@ def main(): action="store_true", help="Report all directories containing files missing the specified metadata key. Only valid with --s3-prefix.", ) + group.add_argument( + "--from-manifest", + help="S3 URI of manifest JSON file to use for modification timestamps. S3 prefix is automatically derived from manifest location.", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (default: %(default)s).", + ) args = parser.parse_args() + # Update logging level based on CLI argument + log_level = getattr(logging, args.log_level.upper()) + logging.getLogger().setLevel(log_level) + log.setLevel(log_level) + if (args.report or args.report_dirs) and not args.s3_prefix: parser.error("The --report and --report-dirs options require --s3-prefix.") @@ -631,6 +834,12 @@ def main(): all_lines=args.all_lines, force=args.force, ) + elif args.from_manifest: + update_metadata_from_manifest( + args.from_manifest, + metadata_key=args.metadata_key, + force=args.force, + ) elif args.s3_file: update_metadata_if_needed( args.s3_file,