Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 210 additions & 1 deletion impresso_essentials/io/s3_set_timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
--all-lines: If False, only the first timestamp is considered.
--output: Optional S3 URI for the output file with updated metadata (only for --s3-file).
--force: Force reprocessing even if metadata is already up-to-date (default: False).
--from-manifest: Use modification dates from a JSON manifest file and process files
in the same directory as the manifest. The S3 prefix is automatically derived
from the manifest location.
"""

import os
Expand All @@ -54,7 +57,7 @@
import boto3
from botocore.client import Config

# Configure logging
# Configure logging (default level, will be updated from CLI args)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)-15s %(filename)s:%(lineno)d %(levelname)s: %(message)s",
Expand Down Expand Up @@ -526,6 +529,189 @@ def report_missing_metadata_dirs(
print(f"s3://{bucket}/{dir_path}/")


def load_manifest_timestamps(manifest_s3_uri: str) -> dict:
"""
Load modification timestamps from a JSON manifest file.

Args:
manifest_s3_uri: S3 URI of the manifest JSON file.

Returns:
dict: Dictionary mapping newspaper-year keys to modification timestamps.

Raises:
ValueError: If manifest cannot be loaded or parsed.
"""
parsed = urlparse(manifest_s3_uri)
bucket = parsed.netloc
key = parsed.path.lstrip("/")

s3 = get_s3_client()

log.info("Loading manifest from: %s", manifest_s3_uri)

try:
# Download manifest file
response = s3.get_object(Bucket=bucket, Key=key)
manifest_data = json.loads(response['Body'].read().decode('utf-8'))

# Extract newspaper-year modification dates
timestamps = {}

for media in manifest_data.get('media_list', []):
media_title = media.get('media_title')
if not media_title:
continue

# Look for year-level statistics
for stats in media.get('media_statistics', []):
if stats.get('granularity') == 'year':
element = stats.get('element', '')
last_mod = stats.get('last_modification_date')

if element and last_mod:
# Convert to ISO format with Z suffix
try:
# Parse the timestamp and convert to UTC ISO format
dt = datetime.strptime(last_mod, "%Y-%m-%d %H:%M:%S")
iso_timestamp = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
timestamps[element] = iso_timestamp
log.debug("Found timestamp for %s: %s", element, iso_timestamp)
except ValueError as e:
log.warning("Invalid timestamp format for %s: %s (%s)", element, last_mod, e)

log.info("Loaded %d newspaper-year timestamps from manifest", len(timestamps))
return timestamps

except Exception as e:
log.error("Failed to load manifest: %s", e)
raise ValueError(f"Failed to load manifest: {e}")


def update_metadata_from_manifest(
manifest_s3_uri: str,
metadata_key: str,
force: bool = False,
):
"""
Updates metadata for S3 objects using timestamps from a manifest file.
The S3 prefix is automatically derived from the manifest location.

Args:
manifest_s3_uri: S3 URI of the manifest JSON file.
metadata_key: The metadata key to update with the timestamp.
force: Force reprocessing even if metadata is already up-to-date.

Returns:
None
"""
# Load timestamps from manifest
timestamps = load_manifest_timestamps(manifest_s3_uri)

# Derive S3 prefix from manifest location
manifest_parsed = urlparse(manifest_s3_uri)
bucket = manifest_parsed.netloc
manifest_key = manifest_parsed.path.lstrip("/")

# Get the directory containing the manifest file
manifest_dir = "/".join(manifest_key.split("/")[:-1])
s3_prefix = f"s3://{bucket}/{manifest_dir}/"

log.info("Derived S3 prefix from manifest location: %s", s3_prefix)

# Calculate manifest directory level for validation
manifest_dir_level = len([p for p in manifest_key.split("/")[:-1] if p])

log.debug("Fetching S3 objects with prefix: %s", s3_prefix)
log.debug("Manifest directory level: %d", manifest_dir_level)

s3 = get_s3_client()

# Use a paginator to handle S3 object listing with paging
paginator = s3.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket, Prefix=manifest_dir)

skipped = 0
processed = 0
not_found = 0

for page in page_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
if key.endswith(".jsonl.bz2"):
# Validate directory nesting level
key_dir_level = len([p for p in key.split("/")[:-1] if p])
expected_level_same = manifest_dir_level + 1 # manifest_dir/NEWSPAPER-YEAR.jsonl.bz2
expected_level_nested = manifest_dir_level + 2 # manifest_dir/NEWSPAPER/NEWSPAPER-YEAR.jsonl.bz2

if key_dir_level not in [expected_level_same, expected_level_nested]:
log.debug("Skipping file due to incorrect nesting level: %s (level %d, expected %d or %d)",
key, key_dir_level, expected_level_same, expected_level_nested)
continue

# Extract newspaper-year from filename
filename = key.split("/")[-1]
# Remove .jsonl.bz2 extension
base_filename = filename.replace(".jsonl.bz2", "")

# Extract newspaper-year pattern from filename ending with -NEWSPAPER-YEAR
# Look for the last occurrence of a pattern like -NEWSPAPER-YEAR
parts = base_filename.split("-")
if len(parts) >= 2:
# Take the last two parts as NEWSPAPER-YEAR
newspaper_year = "-".join(parts[-2:])
else:
# Fallback to the whole filename if pattern doesn't match
newspaper_year = base_filename

# Look for timestamp in manifest
if newspaper_year in timestamps:
timestamp = timestamps[newspaper_year]
log.info("Processing file: %s with timestamp: %s", key, timestamp)

s3_uri = f"s3://{bucket}/{key}"
try:
# Check if metadata already exists
head = s3.head_object(Bucket=bucket, Key=key)
existing_metadata = head.get("Metadata", {})

if metadata_key in existing_metadata and not force:
log.info("[SKIP] Metadata key '%s' already exists for %s", metadata_key, key)
skipped += 1
continue

# Update metadata with timestamp from manifest
updated_metadata = existing_metadata.copy()
updated_metadata[metadata_key] = timestamp

log.debug("[UPDATE] Setting %s=%s on %s", metadata_key, timestamp, s3_uri)

with disable_interrupts():
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={"Bucket": bucket, "Key": key},
Metadata=updated_metadata,
MetadataDirective="REPLACE",
ContentType=head.get("ContentType", "application/octet-stream"),
)

processed += 1
log.debug("[DONE] Metadata updated for %s", key)

except Exception as e:
log.warning("Failed to update metadata for %s: %s", key, e)
skipped += 1
else:
log.warning("No timestamp found in manifest for: %s", newspaper_year)
not_found += 1

log.info("Manifest-based update statistics:")
log.info("Total files processed: %d", processed)
log.info("Total files skipped: %d", skipped)
log.info("Files without manifest timestamp: %d", not_found)


def main():
"""
Parses command-line arguments and triggers the metadata update process.
Expand All @@ -541,6 +727,8 @@ def main():
- --force: Force reprocessing even if metadata is already up-to-date.
- --report: Report all files missing the specified metadata key.
- --report-dirs: Report all directories containing files missing the specified metadata key.
- --from-manifest: Use modification dates from a JSON manifest file instead of extracting
timestamps from records. The S3 prefix is automatically derived from the manifest location.

Returns:
None
Expand Down Expand Up @@ -598,9 +786,24 @@ def main():
action="store_true",
help="Report all directories containing files missing the specified metadata key. Only valid with --s3-prefix.",
)
group.add_argument(
"--from-manifest",
help="S3 URI of manifest JSON file to use for modification timestamps. S3 prefix is automatically derived from manifest location.",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level (default: %(default)s).",
)

args = parser.parse_args()

# Update logging level based on CLI argument
log_level = getattr(logging, args.log_level.upper())
logging.getLogger().setLevel(log_level)
log.setLevel(log_level)

if (args.report or args.report_dirs) and not args.s3_prefix:
parser.error("The --report and --report-dirs options require --s3-prefix.")

Expand Down Expand Up @@ -631,6 +834,12 @@ def main():
all_lines=args.all_lines,
force=args.force,
)
elif args.from_manifest:
update_metadata_from_manifest(
args.from_manifest,
metadata_key=args.metadata_key,
force=args.force,
)
elif args.s3_file:
update_metadata_if_needed(
args.s3_file,
Expand Down