Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file added src/common/__pycache__/settings.cpython-313.pyc
Binary file not shown.
3 changes: 2 additions & 1 deletion src/indexing/bib_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def extract_bib_entries(path: Path) -> list[BibEntry]:
"title": latex_to_unicode(entry.fields.get("title")),
"year": entry.fields.get("year"),
"author": authors if authors else None,
"doi": entry.fields.get("doi"),
"language": entry.fields.get("language"),
}
)
Expand All @@ -68,4 +69,4 @@ def extract_bib_entries(path: Path) -> list[BibEntry]:
df = df[(df["language"] == "eng") | (df["language"].isna())]
df = df.dropna(subset=["url", "title", "year", "author"])

return df[["url", "title", "year", "author"]].to_dict(orient="records")
return df[["url", "title", "year", "author", "doi"]].to_dict(orient="records")
145 changes: 145 additions & 0 deletions src/indexing/compute_pagerank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""compute_pagerank.py
Batch job that enriches an existing Elasticsearch index with PageRank scores.

Steps
-----
1. Fetch all documents (with a DOI) from the configured index.
2. Retrieve outbound references for every DOI via the Crossref API (async).
3. Build the citation graph (optionally adding virtual nodes for external papers).
4. Run PageRank.
5. Bulk-update each indexed document with its PageRank score (field ``pagerank``).

Run as a one-off script *after* the main indexing pipeline has finished:

python -m src.indexing.compute_pagerank
"""

from __future__ import annotations

import asyncio
import logging
from collections import defaultdict
from typing import Dict, List, Tuple

import aiohttp

import networkx as nx

from src.indexing.settings import settings
from src.indexing.elastic_search_indexer import ElasticSearchIndexer

from tqdm import tqdm

logger = logging.getLogger(__name__)


async def _fetch_single_reference_list(
session: aiohttp.ClientSession, doi: str
) -> Tuple[str, List[str]]:
"""Return (doi, references[]) or (doi, []) on error."""
url = f"https://api.crossref.org/works/{doi}"
headers = {"User-Agent": "Academic Research Tool (mailto:your-email@example.com)"}

try:
async with session.get(url, headers=headers, timeout=30) as resp:
resp.raise_for_status()
data = await resp.json()
refs = data.get("message", {}).get("reference", [])
ref_dois = [r["DOI"] for r in refs if "DOI" in r]
return doi, ref_dois
except Exception as exc: # noqa: BLE001,E722
logger.warning("Crossref lookup failed for %s: %s", doi, exc)
return doi, []


async def fetch_references(
dois: List[str], concurrency: int = 20
) -> Dict[str, List[str]]:
"""Fetch reference lists for *dois* concurrently using Crossref with progress bar."""
results: Dict[str, List[str]] = {}

with tqdm(total=len(dois), desc="Crossref", unit="doi") as pbar:
for i in range(0, len(dois), concurrency):
batch = dois[i : i + concurrency]
async with aiohttp.ClientSession() as session:
tasks = [_fetch_single_reference_list(session, d) for d in batch]
batch_res = await asyncio.gather(*tasks, return_exceptions=False)
results.update(dict(batch_res))
pbar.update(len(batch))
return results


def pagerank_with_virtual_nodes(
references_dict: Dict[str, List[str]],
alpha: float = 0.85,
) -> Dict[str, float]:
"""Run PageRank. Only original papers are returned in the result."""

# Collect external papers referenced > *min_external_citations* times
external_counter: Dict[str, int] = defaultdict(int)
for cited_list in references_dict.values():
for cited in cited_list:
if cited not in references_dict:
external_counter[cited] += 1

virtual_nodes = {doi for doi, cnt in external_counter.items()}

original_nodes = set(references_dict.keys())
all_nodes = original_nodes | virtual_nodes

G = nx.DiGraph()
G.add_nodes_from(all_nodes)

for src, tgt_list in references_dict.items():
for tgt in tgt_list:
if tgt in all_nodes:
G.add_edge(src, tgt)

pagerank_full = nx.pagerank(G, alpha=alpha, max_iter=1000, tol=1e-9)
return {doi: score for doi, score in pagerank_full.items() if doi in original_nodes}


def main() -> None:
index_name = settings.index_name
indexer = ElasticSearchIndexer(settings.es_host)

logger.info("Scanning index '%s' for documents …", index_name)
internal_to_esid = indexer.build_internal_id_map(index_name)
if not internal_to_esid:
logger.error("No documents found in index '%s'.", index_name)
return

# Separate DOIs (needed for Crossref) and synthetic IDs
dois = [iid for iid in internal_to_esid if not iid.startswith("SYNTH_")]

logger.info("Fetching Crossref references for %d DOIs …", len(dois))
references = asyncio.run(
fetch_references(dois, concurrency=settings.crossref_concurrency)
)

# Ensure synthetic IDs are present in graph even if they have no outgoing refs or references
for internal_id in internal_to_esid:
if internal_id not in references:
references[internal_id] = []

logger.info("Running PageRank on citation graph …")
pagerank_scores = pagerank_with_virtual_nodes(
references,
alpha=settings.pagerank_alpha,
)

id_score_map = {
internal_to_esid[iid]: score
for iid, score in pagerank_scores.items()
if iid in internal_to_esid
}

logger.info("Updating Elasticsearch documents with PageRank scores …")
indexer.bulk_update_field(index_name, id_score_map, field="pagerank")
logger.info(
"PageRank enrichment completed: %d documents updated.", len(id_score_map)
)


if __name__ == "__main__":
main()
72 changes: 71 additions & 1 deletion src/indexing/elastic_search_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import logging
import uuid
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Dict

from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import NotFoundError
Expand All @@ -19,6 +19,8 @@

from src.indexing.entities import IndexedDocument

import hashlib

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -74,6 +76,8 @@ def create_index(
"author": {"type": "text"},
"year": {"type": "keyword"},
"url": {"type": "keyword"},
"doi": {"type": "keyword"},
"pagerank": {"type": "float"},
}
if embedding_dim:
properties["text_embedding"] = {
Expand Down Expand Up @@ -119,3 +123,69 @@ def index_documents(
]
helpers.bulk(self._client, actions)
logger.info("Indexed %d documents into '%s'.", len(chunk), index_name)

@staticmethod
def _create_synth_id(title: str | None, year: str | int | None, author: list[str] | str | None) -> str:
"""Generate a stable synthetic identifier for a paper without DOI."""
title_part = (title or "").strip()[:50]
year_part = str(year or "")
first_author = ""
if isinstance(author, list) and author:
first_author = author[0]
elif isinstance(author, str):
first_author = author.split(",")[0]
first_author = first_author[:30]
base = f"{title_part}_{year_part}_{first_author}".lower().replace(" ", "_")
return f"SYNTH_{hashlib.md5(base.encode()).hexdigest()[:12]}"

def build_internal_id_map(
self,
index_name: str,
*,
include_fields: list[str] | None = None,
) -> Dict[str, str]:
"""Return a mapping of *internal_id -> Elasticsearch _id*.

*internal_id* is the DOI if present, otherwise a synthetic ID derived
from title/year/author so that every document participates in graph
enrichment jobs.
"""

src_fields = ["doi", "title", "year", "author"]
if include_fields:
src_fields.extend(include_fields)

id_map: Dict[str, str] = {}
for hit in helpers.scan(self._client, index=index_name, _source=src_fields):
src = hit["_source"]
doc_doi = src.get("doi")
if doc_doi:
internal_id = doc_doi
else:
internal_id = self._create_synth_id(src.get("title"), src.get("year"), src.get("author"))

id_map[internal_id] = hit["_id"]

return id_map

def bulk_update_field(
self,
index_name: str,
id_value_map: Dict[str, float],
field: str = "pagerank",
) -> None:
"""Update *field* for each ES doc where `_id` is a key in id_value_map."""

actions = [
{
"_op_type": "update",
"_index": index_name,
"_id": es_id,
"doc": {field: value},
}
for es_id, value in id_value_map.items()
]

if actions:
helpers.bulk(self._client, actions, refresh=True)
logger.info("Updated %d documents (%s field).", len(actions), field)
3 changes: 3 additions & 0 deletions src/indexing/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class IndexedDocument(TypedDict, total=False):
author: list[str]
url: str
year: Union[int, str]
doi: str
pagerank: float


class BibEntry(TypedDict):
Expand All @@ -25,3 +27,4 @@ class BibEntry(TypedDict):
title: str
author: list[str]
year: Union[int, str]
doi: str | None
Loading