diff --git a/concall_parser/utils/file_utils.py b/concall_parser/utils/file_utils.py index de70096..dab3a89 100644 --- a/concall_parser/utils/file_utils.py +++ b/concall_parser/utils/file_utils.py @@ -1,13 +1,16 @@ +import asyncio import json import os +import tempfile +import aiofiles +import httpx import pdfplumber -import requests from concall_parser.log_config import logger -def get_document_transcript(filepath: str) -> dict[int, str]: +async def get_document_transcript(filepath: str) -> dict[int, str]: """Extracts text of a pdf document. Args: @@ -16,24 +19,33 @@ def get_document_transcript(filepath: str) -> dict[int, str]: Returns: transcript: Dictionary of page number, page text pair. """ - transcript = {} - try: - with pdfplumber.open(filepath) as pdf: - logger.debug("Loaded document") - page_number = 1 - for page in pdf.pages: - text = page.extract_text() - if text: - transcript[page_number] = text - page_number += 1 - return transcript - except FileNotFoundError: - raise FileNotFoundError("Please check if file exists.") - except Exception: - logger.exception("Could not load file %s", filepath) - -def save_output( + def _extract_pdf_text(filepath: str) -> dict[int, str]: + transcript = {} + try: + with pdfplumber.open(filepath) as pdf: + logger.debug(f"Loaded document {filepath}") + # ? Do we need to start a counter? can we not do pdfplumber pages or enumerate? + page_number = 1 + for page in pdf.pages: + text = page.extract_text() + if text: + transcript[page_number] = text + page_number += 1 + return transcript + except FileNotFoundError: + logger.exception( + f"Could not file with path {filepath}. Please check if it exists." + ) + raise FileNotFoundError("Please check if file exists.") + except Exception: + logger.exception("Could not load file %s", filepath) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _extract_pdf_text, filepath) + + +async def save_output( dialogues: dict, document_name: str, output_base_path: str = "output" ) -> None: """Save dialogues to JSON files in the specified output path. @@ -46,18 +58,20 @@ def save_output( output_base_path (str): Path to directory in which outputs are to be saved. document_name (str): Name of the file being parsed, corresponds to company name for now. """ - for dialogue_type, dialogue in dialogues.items(): + try: output_dir_path = os.path.join( output_base_path, os.path.basename(document_name)[:-4] ) os.makedirs(output_dir_path, exist_ok=True) - with open( - os.path.join(output_dir_path, f"{dialogue_type}.json"), "w" - ) as file: - json.dump(dialogue, file, indent=4) + for dialogue_type, dialogue in dialogues.items(): + output_file_path = os.path.join(output_dir_path, f"{dialogue_type}.json") + async with aiofiles.open(output_file_path, "w") as file: + await file.write(json.dump(dialogue, indent=4)) + except Exception: + logger.exception(f"Failed to save outputs for file {output_base_path}.") -def save_transcript( +async def save_transcript( transcript: dict, document_path: str, output_base_path: str = "raw_transcript", @@ -75,44 +89,55 @@ def save_transcript( document_name = os.path.basename(document_path)[:-4] # remove the .pdf output_dir_path = os.path.join(output_base_path, document_name) os.makedirs(output_base_path, exist_ok=True) - with open(f"{output_dir_path}.txt", "w") as file: + # ? concatenate all transcript texts before writing at once? IO overhead? + async with aiofiles.open(f"{output_dir_path}.txt", "w") as file: for _, text in transcript.items(): - file.write(text) - file.write("\n\n") + await file.write(text) + await file.write("\n\n") + # ? Do we gather all tasks before asynchronously executing? logger.info("Saved transcript text to file\n") except Exception: logger.exception("Could not save document transcript") -def get_transcript_from_link(link:str) -> dict[int, str]: +async def get_transcript_from_link(link: str) -> dict[int, str]: """Extracts transcript by downloading pdf from a given link. - + Args: link: Link to the pdf document of earnings call report. - + Returns: transcript: A page number-page text mapping. - + Raises: Http error, if encountered during downloading document. """ try: + # TODO: expand error handling - file operations logger.debug("Request to get transcript from link.") headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"# noqa: E501 + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # noqa: E501 } - response = requests.get(url=link, headers=headers, timeout=30, stream=True) - response.raise_for_status() + with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as file: + temp_file_path = file.name + + # TODO: add some checks to check correct file is being downloaded + async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client: + response = await client.get(url=link, timeout=30) + response.raise_for_status() - temp_doc_path = "temp_document.pdf" - with open(temp_doc_path, 'wb') as temp_pdf: - for chunk in response.iter_content(chunk_size=8192): - temp_pdf.write(chunk) - transcript = get_document_transcript(filepath=temp_doc_path) - os.remove(temp_doc_path) + async with aiofiles.open(temp_file_path, "wb") as file: + async for chunk in response.aiter_bytes(chunk_size=8192): + await file.write(chunk) + transcript = await get_document_transcript(filepath=temp_file_path) return transcript + except Exception: logger.exception("Could not get transcript from link") - return dict() \ No newline at end of file + return dict() + + finally: + if os.path.exists(temp_file_path): + os.remove(temp_file_path) diff --git a/pyproject.toml b/pyproject.toml index d254bc5..cd739ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,49 +1,44 @@ -[tool.poetry] +[project] name = "concall-parser" version = "1.0.4" description = "A parser for extracting analyst discussion and management commentary efficiently from concalls." authors = [ - "Jay Shah ", - "Pranshu Raj " -] -maintainers = [ - "Pranshu Raj " + { name = "Jay Shah", email = "jayshah0726@gmail.com" }, + { name = "Pranshu Raj", email = "pranshuraj65536@gmail.com" }, ] +maintainers = [{ name = "Pranshu Raj", email = "pranshuraj65536@gmail.com" }] readme = "README.md" -packages = [ - { include = "concall_parser", from = "." }, +requires-python = ">=3.10" +dependencies = [ + "aiofiles>=24.1.0", + "groq==0.22.0", + "httpx>=0.28.1", + "pdfplumber==0.11.5", + "python-dotenv==1.1.0", ] -[tool.poetry.dependencies] -python = "^3.10" -groq = "0.22.0" -pdfplumber = "0.11.5" -python-dotenv = "1.1.0" -requests = "2.32.2" - -[tool.poetry.group.dev.dependencies] -ruff = "0.4.1" -pre-commit = "3.7.0" +[project.optional-dependencies] +dev = ["ruff==0.4.1", "pre-commit==3.7.0"] [build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["concall_parser"] [tool.ruff] line-length = 80 indent-width = 4 target-version = "py310" -extend-exclude = [ - "__init__.py", - "migrations", -] +extend-exclude = ["__init__.py", "migrations"] [tool.ruff.lint] extend-select = [ "UP", # pyupgrade - "E", # pycodestyle - "I", # isort - "D", # pydocstyle + "E", # pycodestyle + "I", # isort + "D", # pydocstyle ] ignore = [ diff --git a/tests/perf/time_logger.py b/tests/perf/time_logger.py new file mode 100644 index 0000000..248e4df --- /dev/null +++ b/tests/perf/time_logger.py @@ -0,0 +1,157 @@ +import asyncio +import os +import time +from dataclasses import dataclass + +from concall_parser.log_config import logger +from concall_parser.utils import file_utils + + +@dataclass +class BenchmarkResult: + """Structured benchmark result.""" + + func_name: str + duration: float + mode: str # 'sync' or 'async' + input_identifier: str # identifier for the specific input used + arg_count: int + kwarg_count: int + success: bool + error_message: str | None = None + timestamp: float = None + + def __post_init__(self): + """Override post init for benchmark results.""" + if self.timestamp is None: + self.timestamp = time.time() + + +class Benchmark: + """Benchmark function performance with respect to time for measuring improvments.""" + + def __init__(self): + self.results = [] + + def time_sync(self, func, *args, **kwargs): + """Time a synchronous function call.""" + start_time = time.perf_counter() + exec_result = func(*args, **kwargs) + end_time = time.perf_counter() + duration = end_time - start_time + result = { + "func": func.__name__, + "duration": duration, + "mode": "sync", + "arg_count": len(args), + "kwarg_count": len(kwargs), + } + + logger.info(f"Executed sync func {func.__name__}", extra=result) + self.results.append(result) + return exec_result + + async def time_async(self, func, *args, **kwargs): + """Time an asynchronous function.""" + start_time = time.perf_counter() + exec_result = await func(*args, **kwargs) + end_time = time.perf_counter() + duration = end_time - start_time + result = { + "func": func.__name__, + "duration": duration, + "mode": "async", + "arg_count": len(args), + "kwarg_count": len(kwargs), + } + logger.info(f"Executed async func {func.__name__}", extra=result) + self.results.append(result) + return exec_result + + def print_results(self): + """Prints benchmark results.""" + print("\n" + "=" * 60) + print("BENCHMARK RESULTS") + print("=" * 60) + + for result in self.results: + print(f"Function: {result['func']}") + print(f"Duration: {result['duration']:.4f} seconds") + print(f"Args: {result['arg_count']}, Kwargs: {result['kwarg_count']}") + print("-" * 40) + + def get_average_time(self, function_name: str) -> float: + """Get average time for a specific function.""" + times = [r["duration"] for r in self.results if r["function"] == function_name] + return sum(times) / len(times) if times else 0 + + async def run_batch_benchmark(self, func, times: int, *args, **kwargs): + """Run the benchmark multiple times to check average performance.""" + if not times: + times = 5 + durations = [] + for _ in range(times): + start_time = time.perf_counter() + if hasattr(func, "__await__"): + result = await func(*args, **kwargs) + else: + result = func(*args, **kwargs) + end_time = time.perf_counter() + durations.append(end_time - start_time) + avg_duration = sum(durations) / len(durations) if durations else 0 + logger.info( + f"Batch benchmark for {func.__name__}: avg {avg_duration:.4f}s over {times} runs" + ) + return avg_duration + + +async def run_file_utils_perf(): + """Benchmarking for file util functions defined in concall_parser/utils/file_utils.py.""" + benchmark = Benchmark() + test_documents_dir = "tests/test_documents" + test_links = [ + "https://www.adanigas.com/-/media/Project/AdaniGas/Investors/Financials/Earnings-Call-Transcript-and--Recordings/AdaniTotalGas-Earnings-Q3FY25.pdf", + "https://www.indusind.com/content/dam/indusind-corporate/investors/QuarterFinancialResults/FY2024-2025/Quarter4/IndusInd-Bank-Analyst-Call-Q4FY25-20250521.pdf", + "https://www.bseindia.com/stockinfo/AnnPdfOpen.aspx?Pname=a809b7d3-ca44-4410-acf7-af064786fe5a.pdf", + ] + test_files = [ + os.path.join(test_documents_dir, f) + for f in os.listdir(test_documents_dir) + if f.endswith(".pdf") + ] + + transcripts = [] + + # Test get_doc_transcript (path) + for file_path in test_files: + try: + transcript = await benchmark.time_async( + file_utils.get_document_transcript, file_path + ) + transcripts.append(transcript) + except Exception: + logger.exception(f"Error in get_document_transcript for {file_path}") + + # Test get_transcript_from_link + for link in test_links: + try: + transcript = await benchmark.time_async(file_utils.get_transcript_from_link, link) + transcripts.append(transcript) + except Exception: + logger.error(f"Error in get_transcript_from_link for {link}") + + # TODO: benchmarking for save_output + + # Test save_transcript (async) + for i, transcript in enumerate(transcripts): + try: + await benchmark.time_async( + file_utils.save_transcript, transcript, "sample.pdf", "test_perf" + ) + except Exception as e: + logger.error(f"Error in save_transcript for {test_files[i]}: {e}") + + benchmark.print_results() + + +asyncio.run(run_file_utils_perf()) diff --git a/tests/test_breaking_changes.py b/tests/test_breaking_changes.py index 525a7dd..4ddb1b5 100644 --- a/tests/test_breaking_changes.py +++ b/tests/test_breaking_changes.py @@ -1,5 +1,6 @@ import filecmp +from concall_parser.log_config import logger from tests.test_parsing import process_single_file @@ -19,9 +20,9 @@ def test_single_file_processing(filepath, output_dir, expected_output_dir): output_dir, expected_output_dir ), "Output does not match expected" except AssertionError: - print("Broken -- fix it") + logger.exception(f"Failed to process file {filepath}") return - print("Test passed") + logger.info(f"Parsing successful for {filepath}") def test_multiple_files_processing(input_files, output_dir, expected_output_dirs): @@ -30,7 +31,6 @@ def test_multiple_files_processing(input_files, output_dir, expected_output_dirs test_single_file_processing(input_file, output_dir, expected_output_dir) -# TODO: upgrade to pytest-regressions if __name__ == "__main__": test_single_file_processing( filepath="test_documents/ambuja_cement.pdf",