Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions ace/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,53 @@
import logging
from . import sources, config
from .scrape import _validate_scrape
import multiprocessing as mp
from functools import partial

logger = logging.getLogger(__name__)

# The actual function that takes articles and adds them to the database
# imports sources; sources is a module that contains the classes for each
# source of articles.
def _process_file(f):
"""Helper function to read and validate a single file."""
logger.info("Processing article %s..." % f)
try:
html = open(f).read()
except Exception as e:
logger.warning("Failed to read file %s: %s" % (f, str(e)))
return f, None

if not _validate_scrape(html):
logger.warning("Invalid HTML for %s" % f)
return f, None

return f, html


def add_articles(db, files, commit=True, table_dir=None, limit=None,
pmid_filenames=False, metadata_dir=None, force_ingest=True, **kwargs):
pmid_filenames=False, metadata_dir=None, force_ingest=True, parallel=True, num_workers=None, **kwargs):
''' Process articles and add their data to the DB.
Args:
files: The path to the article(s) to process. Can be a single
filename (string), a list of filenames, or a path to pass
to glob (e.g., "article_ls dir/NIMG*html")
commit: Whether or not to save records to DB file after adding them.
table_dir: Directory to store downloaded tables in (if None, tables
table_dir: Directory to store downloaded tables in (if None, tables
will not be saved.)
limit: Optional integer indicating max number of articles to add
limit: Optional integer indicating max number of articles to add
(selected randomly from all available). When None, will add all
available articles.
pmid_filenames: When True, assume that the file basename is a PMID.
This saves us from having to retrieve metadata from PubMed When
checking if a file is already in the DB, and greatly speeds up
checking if a file is already in the DB, and greatly speeds up
batch processing when overwrite is off.
metadata_dir: Location to read/write PubMed metadata for articles.
When None (default), retrieves new metadata each time. If a
When None (default), retrieves new metadata each time. If a
path is provided, will check there first before querying PubMed,
and will save the result of the query if it doesn't already
exist.
force_ingest: Ingest even if no source is identified.
force_ingest: Ingest even if no source is identified.
parallel: Whether to process articles in parallel (default: True).
num_workers: Number of worker processes to use when processing in parallel.
If None (default), uses the number of CPUs available on the system.
kwargs: Additional keyword arguments to pass to parse_article.
'''

Expand All @@ -46,12 +63,22 @@ def add_articles(db, files, commit=True, table_dir=None, limit=None,
files = files[:limit]

missing_sources = []
for i, f in enumerate(files):
logger.info("Processing article %s..." % f)
html = open(f).read()

if not _validate_scrape(html):
logger.warning("Invalid HTML for %s" % f)

if parallel:
# Process files in parallel to extract HTML content
with mp.Pool(processes=num_workers) as pool:
file_html_pairs = pool.map(_process_file, files)
else:
# Process files sequentially
file_html_pairs = []
for f in files:
file_html_pairs.append(_process_file(f))

# Process each file's HTML content
for i, (f, html) in enumerate(file_html_pairs):
if html is None:
# File reading or validation failed
missing_sources.append(f)
continue

source = manager.identify_source(html)
Expand All @@ -67,7 +94,7 @@ def add_articles(db, files, commit=True, table_dir=None, limit=None,
article = source.parse_article(html, pmid, metadata_dir=metadata_dir, **kwargs)
if article and (config.SAVE_ARTICLES_WITHOUT_ACTIVATIONS or article.tables):
db.add(article)
if commit and (i % 100 == 0 or i == len(files) - 1):
if commit and (i % 100 == 0 or i == len(file_html_pairs) - 1):
db.save()
db.save()

Expand Down
Loading