Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 111 additions & 66 deletions tools/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from colorama import Fore
from pathlib import Path
import argparse
import shelve

try:
import readline # noqa: F401
Expand All @@ -19,10 +20,7 @@
import typechat

from typeagent.aitools import utils
from typeagent.knowpro import (
kplib,
searchlang,
)
from typeagent.knowpro import kplib, searchlang, search_query_schema, convknowledge
from typeagent.knowpro.interfaces import IConversation
from typeagent.emails.email_import import import_email_from_file, import_emails_from_dir
from typeagent.emails.email_memory import EmailMemory
Expand All @@ -39,22 +37,41 @@ def __init__(
self, base_path: Path, db_name: str, conversation: EmailMemory
) -> None:
self.base_path = base_path
self.db_name = db_name
self.db_path = base_path.joinpath(db_name)
self.conversation = conversation
self.query_translator: (
typechat.TypeChatJsonTranslator[search_query_schema.SearchQuery] | None
) = None
self.index_log = load_index_log(str(self.db_path), create_new=False)

def get_translator(self):
if self.query_translator is None:
model = convknowledge.create_typechat_model()
self.query_translator = utils.create_translator(
model, search_query_schema.SearchQuery
)
return self.query_translator

async def load_conversation(self, db_name: str, create_new: bool = False):
await self.conversation.settings.conversation_settings.storage_provider.close()
await self.conversation.settings.storage_provider.close()
self.db_name = db_name
self.db_path = self.base_path.joinpath(db_name)
self.conversation = await load_or_create_email_index(
str(self.db_path), create_new
)
self.index_log = load_index_log(str(self.db_path), create_new)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it this holds a persistent, mutable mapping of email IDs to booleans? (Or rather a set of email IDs that have been indexed.)


# Delete the current conversation and re-create it
async def restart_conversation(self):
await self.conversation.settings.conversation_settings.storage_provider.close()
self.conversation = await load_or_create_email_index(
str(self.db_path), create_new=True
)
await self.load_conversation(self.db_name, create_new=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So restart is now a synonym for load?


def is_indexed(self, email_id: str | None) -> bool:
return bool(email_id and self.index_log.get(email_id))

def log_indexed(self, email_id: str | None) -> None:
if email_id is not None:
self.index_log[email_id] = True


CommandHandler = Callable[[EmailContext, list[str]], Awaitable[None]]
Expand All @@ -69,8 +86,6 @@ def decorator(func: Callable):
return decorator


# Just simple test code
# TODO : Once stable, move creation etc to query.py
async def main():

if sys.argv[1:2]:
Expand All @@ -91,10 +106,11 @@ async def main():
print("Email Memory Demo")
print("Type @help for a list of commands")

db_path = str(base_path.joinpath("pyEmails.db"))
default_db = "gmail.db" # "pyEmails.db"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not email.db. :-)

db_path = str(base_path.joinpath(default_db))
context = EmailContext(
base_path,
"pyEmails.db",
default_db,
conversation=await load_or_create_email_index(db_path, create_new=False),
)
print(f"Using email memory at: {db_path}")
Expand All @@ -107,7 +123,6 @@ async def main():
"@add_messages": add_messages, # Add messages
"@parse_messages": parse_messages,
"@load_index": load_index,
"@build_index": build_index, # Build index
"@reset_index": reset_index, # Delete index and start over
"@search": search_index, # Search index
"@answer": generate_answer, # Question answer
Expand Down Expand Up @@ -162,6 +177,10 @@ def _add_messages_def() -> argparse.ArgumentParser:
default="",
help="Path to an .eml file or to a directory with .eml files",
)
cmd.add_argument("--ignore_error", type=bool, default=True, help="Ignore errors")
cmd.add_argument(
"--knowledge", type=bool, default=True, help="Automatically extract knowledge"
)
return cmd


Expand All @@ -174,34 +193,46 @@ async def add_messages(context: EmailContext, args: list[str]):

# Get the path to the email file or directory of emails to ingest
src_path = Path(named_args.path)
emails: list[EmailMessage]
emails: Iterable[EmailMessage]
if src_path.is_file():
emails = [import_email_from_file(str(src_path))]
else:
emails = import_emails_from_dir(str(src_path))

print(Fore.CYAN, f"Importing {len(emails)} emails".capitalize())
print()

conversation = context.conversation
for email in emails:
print_email(email)
print()
# knowledge = email.metadata.get_knowledge()
# print_knowledge(knowledge)
print(Fore.CYAN, f"Importing from {src_path}" + Fore.RESET)

print("Adding email...")
await conversation.add_message(email)

await print_conversation_stats(conversation)
semantic_settings = context.conversation.settings.semantic_ref_index_settings
auto_knowledge = semantic_settings.auto_extract_knowledge
print(Fore.CYAN, f"auto_extract_knowledge={auto_knowledge}" + Fore.RESET)
try:
conversation = context.conversation
# Add one at a time for debugging etc.
for i, email in enumerate(emails):
email_id = email.metadata.id
email_src = email.src_url if email.src_url is not None else ""
print_progress(i + 1, None, email.src_url)
print()
if context.is_indexed(email_id):
print(Fore.GREEN + email_src + "[Already indexed]" + Fore.RESET)
continue

try:
await conversation.add_messages_with_indexing([email])
context.log_indexed(email_id)
Comment on lines +220 to +221
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so these two aren't transactional (because the second doesn't use SQLite3) and in theory the first could succeed and the second could fail. Not sure what to do about it other than adding an extra column to the messages table for the email_ID. :-(

except Exception as e:
if named_args.ignore_error:
print_error(f"{email.src_url}\n{e}")
print(
Fore.GREEN
+ f"ignore_error = {named_args.ignore_error}"
+ Fore.RESET
)
else:
raise
finally:
semantic_settings.auto_extract_knowledge = auto_knowledge

async def build_index(context: EmailContext, args: list[str]):
conversation = context.conversation
print(Fore.GREEN, "Building index")
await print_conversation_stats(conversation)
await conversation.build_index()
print(Fore.GREEN + "Built index.")


async def search_index(context: EmailContext, args: list[str]):
Expand All @@ -215,8 +246,10 @@ async def search_index(context: EmailContext, args: list[str]):
print(Fore.CYAN, f"Searching for:\n{search_text} ")

debug_context = searchlang.LanguageSearchDebugContext()
results = await context.conversation.search_with_language(
search_text=search_text, debug_context=debug_context
results = await context.conversation.query_debug(
search_text=search_text,
query_translator=context.get_translator(),
debug_context=debug_context,
)
await print_search_results(context.conversation, debug_context, results)

Expand All @@ -230,13 +263,9 @@ async def generate_answer(context: EmailContext, args: list[str]):
return

print(Fore.CYAN, f"Getting answer for:\n{question} ")
result = await context.conversation.get_answer_with_language(question=question)
if isinstance(result, typechat.Failure):
print_error(result.message)
return

all_answers, _ = result.value
utils.pretty_print(all_answers)
answer = await context.conversation.query(question)
print(Fore.GREEN + answer)


async def reset_index(context: EmailContext, args: list[str]):
Expand Down Expand Up @@ -326,7 +355,6 @@ def help(handlers: dict[str, CommandHandler], args: list[str]):
#
async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMemory:
if create_new:
print(f"Deleting {db_path}")
delete_sqlite_db(db_path)

settings = ConversationSettings()
Expand All @@ -336,7 +364,16 @@ async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMem
db_path,
EmailMessage,
)
return await EmailMemory.create(settings)
email_memory = await EmailMemory.create(settings)
return email_memory


def load_index_log(db_path: str, create_new: bool) -> shelve.Shelf[Any]:
log_path = db_path + ".index_log"
index_log = shelve.open(log_path)
if create_new:
index_log.clear()
return index_log


def delete_sqlite_db(db_path: str):
Expand Down Expand Up @@ -398,29 +435,6 @@ def print_knowledge(knowledge: kplib.KnowledgeResponse):
print(Fore.RESET)


def print_list(
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
):
print(color)
if title:
print(f"# {title}\n")
if type == "plain":
for item in list:
print(item)
elif type == "ul":
for item in list:
print(f"- {item}")
elif type == "ol":
for i, item in enumerate(list):
print(f"{i + 1}. {item}")
print(Fore.RESET)


def print_error(msg: str):
print(Fore.RED + msg)
print(Fore.RESET)


async def print_conversation_stats(conversation: IConversation):
print(f"Conversation index stats".upper())
print(f"Message count: {await conversation.messages.size()}")
Expand All @@ -447,6 +461,37 @@ async def print_search_results(
print(Fore.RESET)


def print_list(
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
):
print(color)
if title:
print(f"# {title}\n")
if type == "plain":
for item in list:
print(item)
elif type == "ul":
for item in list:
print(f"- {item}")
elif type == "ol":
for i, item in enumerate(list):
print(f"{i + 1}. {item}")
print(Fore.RESET)


def print_error(msg: str):
print(Fore.RED + msg + Fore.RESET)


def print_progress(cur: int, total: int | None = None, suffix: str | None = "") -> None:
if suffix is None:
suffix = ""
if total is not None:
print(f"[{cur} / {total}] {suffix}\r", end="", flush=True)
else:
print(f"[{cur}] {suffix}\r", end="", flush=True)


if __name__ == "__main__":
try:
asyncio.run(main())
Expand Down
13 changes: 6 additions & 7 deletions typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

def import_emails_from_dir(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually this file and these functions need to switch to 'ingest'

dir_path: str, max_chunk_length: int = 4096
) -> list[EmailMessage]:
messages: list[EmailMessage] = []
) -> Iterable[EmailMessage]:
for file_path in Path(dir_path).iterdir():
if file_path.is_file():
messages.append(
import_email_from_file(str(file_path.resolve()), max_chunk_length)
)
return messages
yield import_email_from_file(str(file_path.resolve()), max_chunk_length)


# Imports an email file (.eml) as a list of EmailMessage objects
Expand All @@ -32,7 +28,9 @@ def import_email_from_file(
with open(file_path, "r") as f:
email_string = f.read()

return import_email_string(email_string, max_chunk_length)
email = import_email_string(email_string, max_chunk_length)
email.src_url = file_path
return email


# Imports a single email MIME string and returns an EmailMessage object
Expand Down Expand Up @@ -65,6 +63,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
cc=_import_address_headers(msg.get_all("Cc", [])),
bcc=_import_address_headers(msg.get_all("Bcc", [])),
subject=msg.get("Subject"),
id=msg.get("Message-ID", None),
)
timestamp: str | None = None
timestamp_date = msg.get("Date", None)
Expand Down
Loading