diff --git a/tools/test_email.py b/tools/test_email.py index 5cad846..ab29442 100644 --- a/tools/test_email.py +++ b/tools/test_email.py @@ -10,6 +10,7 @@ from colorama import Fore from pathlib import Path import argparse +import shelve try: import readline # noqa: F401 @@ -19,10 +20,7 @@ import typechat from typeagent.aitools import utils -from typeagent.knowpro import ( - kplib, - searchlang, -) +from typeagent.knowpro import kplib, searchlang, search_query_schema, convknowledge from typeagent.knowpro.interfaces import IConversation from typeagent.emails.email_import import import_email_from_file, import_emails_from_dir from typeagent.emails.email_memory import EmailMemory @@ -39,22 +37,41 @@ def __init__( self, base_path: Path, db_name: str, conversation: EmailMemory ) -> None: self.base_path = base_path + self.db_name = db_name self.db_path = base_path.joinpath(db_name) self.conversation = conversation + self.query_translator: ( + typechat.TypeChatJsonTranslator[search_query_schema.SearchQuery] | None + ) = None + self.index_log = load_index_log(str(self.db_path), create_new=False) + + def get_translator(self): + if self.query_translator is None: + model = convknowledge.create_typechat_model() + self.query_translator = utils.create_translator( + model, search_query_schema.SearchQuery + ) + return self.query_translator async def load_conversation(self, db_name: str, create_new: bool = False): - await self.conversation.settings.conversation_settings.storage_provider.close() + await self.conversation.settings.storage_provider.close() + self.db_name = db_name self.db_path = self.base_path.joinpath(db_name) self.conversation = await load_or_create_email_index( str(self.db_path), create_new ) + self.index_log = load_index_log(str(self.db_path), create_new) # Delete the current conversation and re-create it async def restart_conversation(self): - await self.conversation.settings.conversation_settings.storage_provider.close() - self.conversation = await load_or_create_email_index( - str(self.db_path), create_new=True - ) + await self.load_conversation(self.db_name, create_new=True) + + def is_indexed(self, email_id: str | None) -> bool: + return bool(email_id and self.index_log.get(email_id)) + + def log_indexed(self, email_id: str | None) -> None: + if email_id is not None: + self.index_log[email_id] = True CommandHandler = Callable[[EmailContext, list[str]], Awaitable[None]] @@ -69,8 +86,6 @@ def decorator(func: Callable): return decorator -# Just simple test code -# TODO : Once stable, move creation etc to query.py async def main(): if sys.argv[1:2]: @@ -91,10 +106,11 @@ async def main(): print("Email Memory Demo") print("Type @help for a list of commands") - db_path = str(base_path.joinpath("pyEmails.db")) + default_db = "gmail.db" # "pyEmails.db" + db_path = str(base_path.joinpath(default_db)) context = EmailContext( base_path, - "pyEmails.db", + default_db, conversation=await load_or_create_email_index(db_path, create_new=False), ) print(f"Using email memory at: {db_path}") @@ -107,7 +123,6 @@ async def main(): "@add_messages": add_messages, # Add messages "@parse_messages": parse_messages, "@load_index": load_index, - "@build_index": build_index, # Build index "@reset_index": reset_index, # Delete index and start over "@search": search_index, # Search index "@answer": generate_answer, # Question answer @@ -162,6 +177,10 @@ def _add_messages_def() -> argparse.ArgumentParser: default="", help="Path to an .eml file or to a directory with .eml files", ) + cmd.add_argument("--ignore_error", type=bool, default=True, help="Ignore errors") + cmd.add_argument( + "--knowledge", type=bool, default=True, help="Automatically extract knowledge" + ) return cmd @@ -174,34 +193,46 @@ async def add_messages(context: EmailContext, args: list[str]): # Get the path to the email file or directory of emails to ingest src_path = Path(named_args.path) - emails: list[EmailMessage] + emails: Iterable[EmailMessage] if src_path.is_file(): emails = [import_email_from_file(str(src_path))] else: emails = import_emails_from_dir(str(src_path)) - print(Fore.CYAN, f"Importing {len(emails)} emails".capitalize()) - print() - - conversation = context.conversation - for email in emails: - print_email(email) - print() - # knowledge = email.metadata.get_knowledge() - # print_knowledge(knowledge) + print(Fore.CYAN, f"Importing from {src_path}" + Fore.RESET) - print("Adding email...") - await conversation.add_message(email) - - await print_conversation_stats(conversation) + semantic_settings = context.conversation.settings.semantic_ref_index_settings + auto_knowledge = semantic_settings.auto_extract_knowledge + print(Fore.CYAN, f"auto_extract_knowledge={auto_knowledge}" + Fore.RESET) + try: + conversation = context.conversation + # Add one at a time for debugging etc. + for i, email in enumerate(emails): + email_id = email.metadata.id + email_src = email.src_url if email.src_url is not None else "" + print_progress(i + 1, None, email.src_url) + print() + if context.is_indexed(email_id): + print(Fore.GREEN + email_src + "[Already indexed]" + Fore.RESET) + continue + try: + await conversation.add_messages_with_indexing([email]) + context.log_indexed(email_id) + except Exception as e: + if named_args.ignore_error: + print_error(f"{email.src_url}\n{e}") + print( + Fore.GREEN + + f"ignore_error = {named_args.ignore_error}" + + Fore.RESET + ) + else: + raise + finally: + semantic_settings.auto_extract_knowledge = auto_knowledge -async def build_index(context: EmailContext, args: list[str]): - conversation = context.conversation - print(Fore.GREEN, "Building index") await print_conversation_stats(conversation) - await conversation.build_index() - print(Fore.GREEN + "Built index.") async def search_index(context: EmailContext, args: list[str]): @@ -215,8 +246,10 @@ async def search_index(context: EmailContext, args: list[str]): print(Fore.CYAN, f"Searching for:\n{search_text} ") debug_context = searchlang.LanguageSearchDebugContext() - results = await context.conversation.search_with_language( - search_text=search_text, debug_context=debug_context + results = await context.conversation.query_debug( + search_text=search_text, + query_translator=context.get_translator(), + debug_context=debug_context, ) await print_search_results(context.conversation, debug_context, results) @@ -230,13 +263,9 @@ async def generate_answer(context: EmailContext, args: list[str]): return print(Fore.CYAN, f"Getting answer for:\n{question} ") - result = await context.conversation.get_answer_with_language(question=question) - if isinstance(result, typechat.Failure): - print_error(result.message) - return - all_answers, _ = result.value - utils.pretty_print(all_answers) + answer = await context.conversation.query(question) + print(Fore.GREEN + answer) async def reset_index(context: EmailContext, args: list[str]): @@ -326,7 +355,6 @@ def help(handlers: dict[str, CommandHandler], args: list[str]): # async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMemory: if create_new: - print(f"Deleting {db_path}") delete_sqlite_db(db_path) settings = ConversationSettings() @@ -336,7 +364,16 @@ async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMem db_path, EmailMessage, ) - return await EmailMemory.create(settings) + email_memory = await EmailMemory.create(settings) + return email_memory + + +def load_index_log(db_path: str, create_new: bool) -> shelve.Shelf[Any]: + log_path = db_path + ".index_log" + index_log = shelve.open(log_path) + if create_new: + index_log.clear() + return index_log def delete_sqlite_db(db_path: str): @@ -398,29 +435,6 @@ def print_knowledge(knowledge: kplib.KnowledgeResponse): print(Fore.RESET) -def print_list( - color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain" -): - print(color) - if title: - print(f"# {title}\n") - if type == "plain": - for item in list: - print(item) - elif type == "ul": - for item in list: - print(f"- {item}") - elif type == "ol": - for i, item in enumerate(list): - print(f"{i + 1}. {item}") - print(Fore.RESET) - - -def print_error(msg: str): - print(Fore.RED + msg) - print(Fore.RESET) - - async def print_conversation_stats(conversation: IConversation): print(f"Conversation index stats".upper()) print(f"Message count: {await conversation.messages.size()}") @@ -447,6 +461,37 @@ async def print_search_results( print(Fore.RESET) +def print_list( + color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain" +): + print(color) + if title: + print(f"# {title}\n") + if type == "plain": + for item in list: + print(item) + elif type == "ul": + for item in list: + print(f"- {item}") + elif type == "ol": + for i, item in enumerate(list): + print(f"{i + 1}. {item}") + print(Fore.RESET) + + +def print_error(msg: str): + print(Fore.RED + msg + Fore.RESET) + + +def print_progress(cur: int, total: int | None = None, suffix: str | None = "") -> None: + if suffix is None: + suffix = "" + if total is not None: + print(f"[{cur} / {total}] {suffix}\r", end="", flush=True) + else: + print(f"[{cur}] {suffix}\r", end="", flush=True) + + if __name__ == "__main__": try: asyncio.run(main()) diff --git a/typeagent/emails/email_import.py b/typeagent/emails/email_import.py index 47a6f04..cdf547e 100644 --- a/typeagent/emails/email_import.py +++ b/typeagent/emails/email_import.py @@ -14,14 +14,10 @@ def import_emails_from_dir( dir_path: str, max_chunk_length: int = 4096 -) -> list[EmailMessage]: - messages: list[EmailMessage] = [] +) -> Iterable[EmailMessage]: for file_path in Path(dir_path).iterdir(): if file_path.is_file(): - messages.append( - import_email_from_file(str(file_path.resolve()), max_chunk_length) - ) - return messages + yield import_email_from_file(str(file_path.resolve()), max_chunk_length) # Imports an email file (.eml) as a list of EmailMessage objects @@ -32,7 +28,9 @@ def import_email_from_file( with open(file_path, "r") as f: email_string = f.read() - return import_email_string(email_string, max_chunk_length) + email = import_email_string(email_string, max_chunk_length) + email.src_url = file_path + return email # Imports a single email MIME string and returns an EmailMessage object @@ -65,6 +63,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage: cc=_import_address_headers(msg.get_all("Cc", [])), bcc=_import_address_headers(msg.get_all("Bcc", [])), subject=msg.get("Subject"), + id=msg.get("Message-ID", None), ) timestamp: str | None = None timestamp_date = msg.get("Date", None) diff --git a/typeagent/emails/email_memory.py b/typeagent/emails/email_memory.py index 9bfd83b..19f078f 100644 --- a/typeagent/emails/email_memory.py +++ b/typeagent/emails/email_memory.py @@ -4,6 +4,7 @@ import os from dataclasses import dataclass import json +import copy from pydantic.dataclasses import dataclass as pydantic_dataclass import typechat from ..aitools import utils @@ -25,6 +26,7 @@ ITermToSemanticRefIndex, Term, ) +from ..knowpro.conversation_base import ConversationBase from ..storage.memory import semrefindex from typeagent.storage.sqlite.provider import SqliteStorageProvider @@ -47,121 +49,10 @@ def __init__(self, conversation_settings: ConversationSettings) -> None: @dataclass -class EmailMemory(IConversation[EmailMessage, ITermToSemanticRefIndex]): - settings: EmailMemorySettings - name_tag: str - messages: IMessageCollection[EmailMessage] - semantic_refs: ISemanticRefCollection - tags: list[str] - semantic_ref_index: ITermToSemanticRefIndex - secondary_indexes: IConversationSecondaryIndexes[EmailMessage] | None - - noise_terms: set[str] - - @classmethod - async def create( - cls, - settings: ConversationSettings, - name_tag: str | None = None, - messages: IMessageCollection[EmailMessage] | None = None, - semantic_refs: ISemanticRefCollection | None = None, - semantic_ref_index: ITermToSemanticRefIndex | None = None, - tags: list[str] | None = None, - secondary_indexes: IConversationSecondaryIndexes[EmailMessage] | None = None, - ) -> "EmailMemory": - - noise_terms = set() - storage_provider = await settings.get_storage_provider() - email_memory = cls( - EmailMemorySettings(settings), - name_tag or "", - messages or await storage_provider.get_message_collection(), - semantic_refs or await storage_provider.get_semantic_ref_collection(), - tags if tags is not None else [], - semantic_ref_index or await storage_provider.get_semantic_ref_index(), - secondary_indexes - or await secindex.ConversationSecondaryIndexes.create( - storage_provider, settings.related_term_index_settings - ), - noise_terms, - ) - - # Add aliases for all the ways in which people can say 'send' and 'received' - await _add_synonyms_file_as_aliases(email_memory, "emailVerbs.json") - # Remove common terms used in email search that can make retrieval noisy - _add_noise_words_from_file(email_memory.noise_terms, "noiseTerms.txt") - email_memory.noise_terms.add("email") - email_memory.noise_terms.add("message") - - return email_memory - - # Add an email message to the memory. - async def add_message(self, message: EmailMessage) -> None: - await self.messages.append(message) - self._commit() - - # Build an index using ALL messages in the memory - async def build_index( - self, - ) -> None: - await semrefindex.add_metadata_to_index( - self.messages, - self.semantic_refs, - self.semantic_ref_index, - ) - assert ( - self.settings is not None - ), "Settings must be initialized before building index" - - self._commit() - await semrefindex.build_semantic_ref(self, self.settings.conversation_settings) - self._commit() - await secindex.build_transient_secondary_indexes( - self, self.settings.conversation_settings - ) - self._commit() - - # Search email memory using language - async def search_with_language( - self, - search_text: str, - options: searchlang.LanguageSearchOptions | None = None, - lang_search_filter: searchlang.LanguageSearchFilter | None = None, - debug_context: searchlang.LanguageSearchDebugContext | None = None, - ) -> typechat.Result[list[searchlang.ConversationSearchResult]]: - return await searchlang.search_conversation_with_language( - self, - self.settings.query_translator, - search_text, - self._adjust_search_options(options), - lang_search_filter, - debug_context, - ) - - async def get_answer_with_language( - self, - question: str, - search_options: searchlang.LanguageSearchOptions | None = None, - lang_search_filter: searchlang.LanguageSearchFilter | None = None, - answer_context_options: answers.AnswerContextOptions | None = None, - ) -> typechat.Result[tuple[list[answers.AnswerResponse], answers.AnswerResponse]]: - search_results = await self.search_with_language( - question, search_options, lang_search_filter, None - ) - if isinstance(search_results, typechat.Failure): - return search_results - - if answer_context_options is None: - answer_context_options = EmailMemory.create_answer_context_options() - - answer = await answers.generate_answers( - self.settings.answer_translator, - search_results.value, - self, - question, - answer_context_options, - ) - return typechat.Success(answer) +class EmailMemory(ConversationBase[EmailMessage]): + def __init__(self, settings, name, tags): + super().__init__(settings, name, tags) + self.noise_terms: set[str] = set() @staticmethod def create_lang_search_options() -> searchlang.LanguageSearchOptions: @@ -184,24 +75,71 @@ def create_answer_context_options() -> answers.AnswerContextOptions: entities_top_k=50, topics_top_k=50, messages_top_k=None, chunking=None ) - def _get_secondary_indexes(self) -> IConversationSecondaryIndexes[EmailMessage]: - """Get secondary indexes, asserting they are initialized.""" - assert ( - self.secondary_indexes is not None - ), "Use await f.create() to create an initialized instance" - return self.secondary_indexes + @classmethod + async def create( + cls, + settings: ConversationSettings, + name: str | None = None, + tags: list[str] | None = None, + ) -> "EmailMemory": + instance = await super().create(settings, name, tags) + await instance._configure_memory() + return instance + + async def query( + self, + question: str, + search_options: searchlang.LanguageSearchOptions | None = None, + answer_options: answers.AnswerContextOptions | None = None, + ) -> str: + return await super().query( + question, + self._adjust_search_options(search_options), + ( + answer_options + if answer_options is not None + else EmailMemory.create_answer_context_options() + ), + ) + + # Search email memory using language + async def query_debug( + self, + search_text: str, + query_translator: typechat.TypeChatJsonTranslator[ + search_query_schema.SearchQuery + ], + debug_context: searchlang.LanguageSearchDebugContext | None = None, + ) -> typechat.Result[list[searchlang.ConversationSearchResult]]: + return await searchlang.search_conversation_with_language( + self, + query_translator, + search_text, + self._adjust_search_options(None), + None, + debug_context, + ) - def _commit(self): - provider = self.settings.conversation_settings.storage_provider - if isinstance(provider, SqliteStorageProvider): - provider.db.commit() + async def _configure_memory(self): + # Adjust settings to support knowledge extraction from message ext + self.settings.semantic_ref_index_settings.auto_extract_knowledge = True + # Add aliases for all the ways in which people can say 'send' and 'received' + await _add_synonyms_file_as_aliases(self, "emailVerbs.json", clean=True) + # Remove common terms used in email search that can make retrieval noisy + _add_noise_words_from_file(self.noise_terms, "noiseTerms.txt") - def _adjust_search_options(self, options: searchlang.LanguageSearchOptions | None): + def _adjust_search_options( + self, options: searchlang.LanguageSearchOptions | None + ) -> searchlang.LanguageSearchOptions: + # TODO: should actually clone the object the caller passed if options is None: options = EmailMemory.create_lang_search_options() if options.compile_options is None: options.compile_options = EmailMemory.create_lang_search_compile_options() + else: + # Copy for modification + options.compile_options = copy.copy(options.compile_options) options.compile_options.term_filter = lambda term: self._is_searchable_term( term @@ -220,7 +158,7 @@ def _is_searchable_term(self, term: str) -> bool: # Load synonyms from a file and add them as aliases async def _add_synonyms_file_as_aliases( - conversation: IConversation, file_name: str + conversation: ConversationBase, file_name: str, clean: bool ) -> None: secondary_indexes = conversation.secondary_indexes assert secondary_indexes is not None @@ -234,13 +172,17 @@ async def _add_synonyms_file_as_aliases( with open(synonym_file) as f: data: list[dict] = json.load(f) if data: - for obj in data: - text = obj.get("term") - synonyms = obj.get("relatedTerms") - if text and synonyms: - related_term = Term(text=text.lower()) - for synonym in synonyms: - await aliases.add_related_term(synonym.lower(), related_term) + storage_provider = conversation.settings.storage_provider + async with storage_provider: + if clean: + await aliases.clear() + for obj in data: + text = obj.get("term") + synonyms = obj.get("relatedTerms") + if text and synonyms: + related_term = Term(text=text.lower()) + for synonym in synonyms: + await aliases.add_related_term(synonym.lower(), related_term) def _add_noise_words_from_file( diff --git a/typeagent/emails/email_message.py b/typeagent/emails/email_message.py index f4cc0d7..893115f 100644 --- a/typeagent/emails/email_message.py +++ b/typeagent/emails/email_message.py @@ -28,6 +28,7 @@ class EmailMessageMeta(IKnowledgeSource, IMessageMetadata): cc: list[str] = Field(default_factory=list) bcc: list[str] = Field(default_factory=list) subject: str | None = None + id: str | None = None @property def source(self) -> str | None: # type: ignore[reportIncompatibleVariableOverride] @@ -155,6 +156,7 @@ def __init__(self, **data: Any) -> None: "Tags associated with the message", default_factory=list ) timestamp: str | None = None # Use metadata.sent_on for the actual sent time + src_url: str | None = None # Source file or uri for this email def get_knowledge(self) -> kplib.KnowledgeResponse: return self.metadata.get_knowledge() diff --git a/typeagent/emails/noiseTerms.txt b/typeagent/emails/noiseTerms.txt index 1128e32..2f42bad 100644 --- a/typeagent/emails/noiseTerms.txt +++ b/typeagent/emails/noiseTerms.txt @@ -49,4 +49,7 @@ each every any some -none \ No newline at end of file +none + +email +message diff --git a/typeagent/knowpro/conversation_base.py b/typeagent/knowpro/conversation_base.py index ee7ca75..b09a500 100644 --- a/typeagent/knowpro/conversation_base.py +++ b/typeagent/knowpro/conversation_base.py @@ -304,7 +304,13 @@ async def _update_message_index_incremental( # The message index add_messages handles the ordinal tracking internally await self.secondary_indexes.message_index.add_messages(new_messages) - async def query(self, question: str) -> str: + # Use options to customize number of messages to match, topK etc. + async def query( + self, + question: str, + search_options: searchlang.LanguageSearchOptions | None = None, + answer_options: answers.AnswerContextOptions | None = None, + ) -> str: """ Run an end-to-end query on the conversation. @@ -335,13 +341,17 @@ async def query(self, question: str) -> str: ) # Stage 1-3: Search the conversation with the natural language query - search_options = searchlang.LanguageSearchOptions( - compile_options=searchlang.LanguageQueryCompileOptions( - exact_scope=False, verb_scope=True, term_filter=None, apply_scope=True - ), - exact_match=False, - max_message_matches=25, - ) + if search_options is None: + search_options = searchlang.LanguageSearchOptions( + compile_options=searchlang.LanguageQueryCompileOptions( + exact_scope=False, + verb_scope=True, + term_filter=None, + apply_scope=True, + ), + exact_match=False, + max_message_matches=25, + ) result = await searchlang.search_conversation_with_language( self, @@ -356,9 +366,10 @@ async def query(self, question: str) -> str: search_results = result.value # Stage 4: Generate answer from search results - answer_options = answers.AnswerContextOptions( - entities_top_k=50, topics_top_k=50, messages_top_k=None, chunking=None - ) + if answer_options is None: + answer_options = answers.AnswerContextOptions( + entities_top_k=50, topics_top_k=50, messages_top_k=None, chunking=None + ) all_answers, combined_answer = await answers.generate_answers( self._answer_translator,