From 3c7a512007c70d82c4583769919cca727c955093 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 15 Nov 2025 22:19:54 +0000 Subject: [PATCH 1/5] feat: Implement inventory and listing automation tool Co-authored-by: davidlozzi1 --- .env.example | 10 +++ README.md | 56 ++++++++++++++- __init__.py | 0 app.py | 97 ++++++++++++++++++++++++++ core/__init__.py | 0 core/csv_io.py | 88 +++++++++++++++++++++++ core/env.py | 60 ++++++++++++++++ core/logging.py | 16 +++++ core/models.py | 161 +++++++++++++++++++++++++++++++++++++++++++ gpt/__init__.py | 0 gpt/analyze.py | 123 +++++++++++++++++++++++++++++++++ gpt/organize.py | 99 ++++++++++++++++++++++++++ mcp/__init__.py | 0 mcp/client.py | 91 ++++++++++++++++++++++++ mcp/images.py | 8 +++ mcp/listings.py | 108 +++++++++++++++++++++++++++++ pyproject.toml | 20 ++++++ utils/__init__.py | 0 utils/concurrency.py | 41 +++++++++++ utils/files.py | 51 ++++++++++++++ 20 files changed, 1027 insertions(+), 2 deletions(-) create mode 100644 .env.example create mode 100644 __init__.py create mode 100644 app.py create mode 100644 core/__init__.py create mode 100644 core/csv_io.py create mode 100644 core/env.py create mode 100644 core/logging.py create mode 100644 core/models.py create mode 100644 gpt/__init__.py create mode 100644 gpt/analyze.py create mode 100644 gpt/organize.py create mode 100644 mcp/__init__.py create mode 100644 mcp/client.py create mode 100644 mcp/images.py create mode 100644 mcp/listings.py create mode 100644 pyproject.toml create mode 100644 utils/__init__.py create mode 100644 utils/concurrency.py create mode 100644 utils/files.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..fd7fe69 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ + # OpenAI configuration + OPENAI_API_KEY= + OPENAI_API_BASE= + + # MCP server configuration + MCP_ENDPOINT= + MCP_API_KEY= + + # Optional overrides + LOG_LEVEL=INFO diff --git a/README.md b/README.md index 27adbbf..20f5fe3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,54 @@ -# ebay- -AI driven ebay listing creator +# Inventory & Listing Automation Tool + +AI-assisted workflow for organizing raw photo drops, generating inventory CSVs, and creating/publishing eBay listings through the MCP server. + +## Prerequisites + +- Python 3.11+ +- Access to the eBay MCP server (`MCP_ENDPOINT`, `MCP_API_KEY`) +- OpenAI API key with GPT-5 access + +## Setup + +```bash +git clone +cd +python -m venv .venv +source .venv/bin/activate +pip install -e . +cp .env.example .env # fill in secrets +``` + +## CLI Overview + +``` +python app.py --organize /path/to/raw_images +python app.py /path/to/organized_items +python app.py --upload /path/to/listings.csv [--draft|--publish] +``` + +- `--organize`: groups loose photos into SKU folders (max 50 images per GPT batch) and renames label shots to `SKU.jpg`. +- `inventory` (default): scans organized folders, calls GPT for metadata, queries MCP for sold comps, and writes `listings_YYYYMMDD_HHMMSS.csv` in the provided root. Prints absolute CSV path on success. +- `--upload`: reads a CSV, uploads images via MCP, and creates/publishes listings. + - Omit flags to create + publish in one pass. + - `--draft`: create drafts only. + - `--publish`: publish previously-created drafts (`mcp_listing_id` required per row). + +## Environment Variables + +See `.env.example`. Required per mode: + +| Mode | Variables | +|-----------|------------------------------------------| +| Organize | `OPENAI_API_KEY` | +| Inventory | `OPENAI_API_KEY`, `MCP_ENDPOINT`, `MCP_API_KEY` | +| Publish | `MCP_ENDPOINT`, `MCP_API_KEY` | + +Optional: `OPENAI_API_BASE`, `LOG_LEVEL`, `CONCURRENCY_LIMIT`. + +## Architecture Highlights + +- Async `httpx` client with retry/backoff for MCP interactions. +- Pydantic models for CSV rows, price data, and listing payloads. +- Structured concurrency helpers for bounded parallelism. +- GPT-driven vision prompts for organization and metadata extraction (OpenAI Responses API). \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app.py b/app.py new file mode 100644 index 0000000..fac3196 --- /dev/null +++ b/app.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Optional + +import typer + +from core.csv_io import write_inventory_csv +from core.env import ConfigError, get_config_for_mode +from core.logging import setup_logging +from gpt.analyze import analyze_inventory +from gpt.organize import organize_images +from mcp.listings import process_uploads +from utils.files import move_assignments + +app = typer.Typer(add_completion=False, help="Inventory and listing automation tool.") + + +def _run_async(coro): + return asyncio.run(coro) + + +@app.command() +def main( + items_path: Optional[Path] = typer.Argument( + None, + help="Path to organized item folders (inventory mode).", + ), + organize: Optional[Path] = typer.Option( + None, + "--organize", + exists=True, + dir_okay=True, + file_okay=False, + help="Path to raw images root.", + ), + upload: Optional[Path] = typer.Option( + None, + "--upload", + exists=True, + file_okay=True, + dir_okay=False, + help="Path to inventory CSV for publish mode.", + ), + draft: bool = typer.Option(False, "--draft", help="Create draft listings only."), + publish: bool = typer.Option(False, "--publish", help="Publish existing drafts."), +) -> None: + try: + if organize: + _run_organize_mode(organize) + elif upload: + _run_publish_mode(upload, draft, publish) + elif items_path: + _run_inventory_mode(items_path) + else: + raise typer.BadParameter("Provide --organize, --upload, or an items path.") + except ConfigError as exc: + typer.echo(str(exc), err=True) + raise typer.Exit(code=1) from exc + + +def _run_organize_mode(path: Path) -> None: + if not path.exists(): + raise typer.BadParameter(f"{path} not found.") + config = get_config_for_mode("organize") + setup_logging(config.log_level) + assignments = _run_async(organize_images(path, config)) + move_assignments(assignments, path) + typer.echo(f"Organized {len(assignments)} images under {path}") + + +def _run_inventory_mode(path: Path) -> None: + if not path.exists(): + raise typer.BadParameter(f"{path} not found.") + config = get_config_for_mode("inventory") + setup_logging(config.log_level) + rows = _run_async(analyze_inventory(path, config)) + csv_path = write_inventory_csv(rows, path) + typer.echo(f"Created CSV at {csv_path.resolve()}") + + +def _run_publish_mode(csv_path: Path, draft: bool, publish: bool) -> None: + if draft and publish: + raise typer.BadParameter("Use either --draft or --publish, not both.") + config = get_config_for_mode("publish") + setup_logging(config.log_level) + updated_rows = _run_async(process_uploads(csv_path, draft_only=draft, publish_only=publish, config=config)) + typer.echo(f"Updated {csv_path} for {len(updated_rows)} rows") + + +def main_cli() -> None: + app() + + +if __name__ == "__main__": + main_cli() diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/csv_io.py b/core/csv_io.py new file mode 100644 index 0000000..250c02d --- /dev/null +++ b/core/csv_io.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import csv +from datetime import datetime +from pathlib import Path +from typing import Sequence + +from core.models import CSV_HEADERS, InventoryRow + + +def _parse_float_list(value: str) -> list[float]: + if not value: + return [] + return [float(part) for part in value.split(";") if part] + + +def _parse_str_list(value: str) -> list[str]: + if not value: + return [] + return [part for part in value.split(";") if part] + + +def read_inventory_csv(csv_path: Path) -> list[InventoryRow]: + with csv_path.open("r", newline="", encoding="utf-8") as handle: + reader = csv.DictReader(handle) + rows: list[InventoryRow] = [] + for row in reader: + rows.append( + InventoryRow( + item_number=row.get("item_number", ""), + sku=row.get("sku", ""), + folder=row.get("folder", ""), + title=row.get("title", ""), + description=row.get("description", ""), + condition=row.get("condition", "USED"), + condition_note=row.get("condition_note", "") or "", + recommended_price=float(row.get("recommended_price") or 0), + price_rationale=row.get("price_rationale", "") or "", + suggested_category_id=row.get("suggested_category_id", "") or "", + suggested_category_path=row.get("suggested_category_path", "") or "", + ebay_search_query=row.get("ebay_search_query", "") or "", + last_sold_prices=_parse_float_list(row.get("last_sold_prices", "")), + price_source=row.get("price_source", "") or "", + highest_sold_price=( + float(row["highest_sold_price"]) + if row.get("highest_sold_price") + else None + ), + image_urls=_parse_str_list(row.get("image_urls", "")), + notes_path=row.get("notes_path", "") or "", + listing_format=row.get("listing_format", "") or "", + marketplace_id=row.get("marketplace_id", "") or "", + category_id=row.get("category_id", "") or "", + quantity=int(row.get("quantity") or 1), + listing_duration=row.get("listing_duration", "") or "", + best_offer_enabled=( + row.get("best_offer_enabled", "true").lower() == "true" + ), + mcp_listing_id=row.get("mcp_listing_id", "") or "", + ) + ) + return rows + + +def _timestamped_filename(root: Path) -> Path: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return root / f"listings_{timestamp}.csv" + + +def write_inventory_csv(rows: Sequence[InventoryRow], root_path: Path) -> Path: + csv_path = _timestamped_filename(root_path) + root_path.mkdir(parents=True, exist_ok=True) + + with csv_path.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=CSV_HEADERS) + writer.writeheader() + for row in rows: + writer.writerow(row.csv_row()) + + return csv_path + + +def overwrite_inventory_csv(csv_path: Path, rows: Sequence[InventoryRow]) -> None: + with csv_path.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=CSV_HEADERS) + writer.writeheader() + for row in rows: + writer.writerow(row.csv_row()) \ No newline at end of file diff --git a/core/env.py b/core/env.py new file mode 100644 index 0000000..e1fc047 --- /dev/null +++ b/core/env.py @@ -0,0 +1,60 @@ + from __future__ import annotations + +from functools import lru_cache +from pathlib import Path +from typing import Iterable, Sequence + +from dotenv import load_dotenv +from pydantic import BaseModel, Field, HttpUrl, PositiveInt, ValidationError + + +class ConfigError(RuntimeError): + """Raised when required configuration is missing.""" + + +class AppConfig(BaseModel): + openai_api_key: str | None = Field(default=None, alias="OPENAI_API_KEY") + openai_api_base: HttpUrl | None = Field(default=None, alias="OPENAI_API_BASE") + mcp_endpoint: HttpUrl | None = Field(default=None, alias="MCP_ENDPOINT") + mcp_api_key: str | None = Field(default=None, alias="MCP_API_KEY") + log_level: str = Field(default="INFO", alias="LOG_LEVEL") + concurrency_limit: PositiveInt = Field(default=5, alias="CONCURRENCY_LIMIT") + + model_config = {"populate_by_name": True} + + def require(self, fields: Sequence[str]) -> None: + missing = [field for field in fields if not getattr(self, field)] + if missing: + raise ConfigError( + f"Missing required environment variables: {', '.join(missing)}" + ) + + +def _load_env_files() -> None: + env_path = Path(".env") + load_dotenv(dotenv_path=env_path if env_path.exists() else None, override=False) + + +REQUIRED_BY_MODE = { + "organize": ("openai_api_key",), + "inventory": ("openai_api_key", "mcp_endpoint", "mcp_api_key"), + "publish": ("mcp_endpoint", "mcp_api_key"), +} + + +@lru_cache(maxsize=1) +def get_config(required: Iterable[str] | None = None) -> AppConfig: + _load_env_files() + try: + config = AppConfig() # type: ignore[call-arg] + except ValidationError as exc: + raise ConfigError(str(exc)) from exc + + if required: + config.require(tuple(required)) + return config + + +def get_config_for_mode(mode: str) -> AppConfig: + requirements = REQUIRED_BY_MODE.get(mode, ()) + return get_config(requirements) \ No newline at end of file diff --git a/core/logging.py b/core/logging.py new file mode 100644 index 0000000..0a91975 --- /dev/null +++ b/core/logging.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import logging + +LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s" + + +def setup_logging(level: str = "INFO") -> None: + numeric_level = logging.getLevelName(level.upper()) + if isinstance(numeric_level, str): + numeric_level = logging.INFO + logging.basicConfig(level=numeric_level, format=LOG_FORMAT) + + +def get_logger(name: str | None = None) -> logging.Logger: + return logging.getLogger(name or "ebay-agent") \ No newline at end of file diff --git a/core/models.py b/core/models.py new file mode 100644 index 0000000..d3608bc --- /dev/null +++ b/core/models.py @@ -0,0 +1,161 @@ + from __future__ import annotations + +from datetime import datetime +from pathlib import Path + + from pydantic import BaseModel, Field + + + class ImageAssignment(BaseModel): + source_path: Path + destination_folder: str + is_label_image: bool = False + + + class FolderImages(BaseModel): + folder: Path + sku: str + label_image: Path | None = None + images: list[Path] = Field(default_factory=list) + + @property + def non_label_images(self) -> list[Path]: + if not self.label_image: + return self.images + return [img for img in self.images if img != self.label_image] + + + CSV_HEADERS: list[str] = [ + "item_number", + "sku", + "folder", + "title", + "description", + "condition", + "condition_note", + "recommended_price", + "price_rationale", + "suggested_category_id", + "suggested_category_path", + "ebay_search_query", + "last_sold_prices", + "price_source", + "highest_sold_price", + "image_urls", + "notes_path", + "listing_format", + "marketplace_id", + "category_id", + "quantity", + "listing_duration", + "best_offer_enabled", + "mcp_listing_id", + ] + + + class InventoryRow(BaseModel): + item_number: str + sku: str + folder: str + title: str + description: str + condition: str = "USED" + condition_note: str = "" + recommended_price: float = 0.0 + price_rationale: str = "" + suggested_category_id: str = "" + suggested_category_path: str = "" + ebay_search_query: str = "" + last_sold_prices: list[float] = Field(default_factory=list) + price_source: str = "ebay-sold" + highest_sold_price: float | None = None + image_urls: list[str] = Field(default_factory=list) + notes_path: str = "" + listing_format: str = "BIN" + marketplace_id: str = "" + category_id: str = "" + quantity: int = 1 + listing_duration: str = "GTC" + best_offer_enabled: bool = True + mcp_listing_id: str = "" + + def csv_row(self) -> dict[str, str]: + return { + "item_number": self.item_number, + "sku": self.sku, + "folder": self.folder, + "title": self.title, + "description": self.description, + "condition": self.condition, + "condition_note": self.condition_note, + "recommended_price": f"{self.recommended_price:.2f}", + "price_rationale": self.price_rationale, + "suggested_category_id": self.suggested_category_id, + "suggested_category_path": self.suggested_category_path, + "ebay_search_query": self.ebay_search_query, + "last_sold_prices": ";".join(f"{price:.2f}" for price in self.last_sold_prices), + "price_source": self.price_source, + "highest_sold_price": ( + f"{self.highest_sold_price:.2f}" if self.highest_sold_price else "" + ), + "image_urls": ";".join(self.image_urls), + "notes_path": self.notes_path, + "listing_format": self.listing_format, + "marketplace_id": self.marketplace_id, + "category_id": self.category_id, + "quantity": str(self.quantity), + "listing_duration": self.listing_duration, + "best_offer_enabled": "true" if self.best_offer_enabled else "false", + "mcp_listing_id": self.mcp_listing_id, + } + + + class SoldListing(BaseModel): + title: str + price: float + url: str | None = None + sold_date: datetime | None = None + + + class PriceRecommendation(BaseModel): + recommended_price: float + rationale: str + sold_listings: list[SoldListing] = Field(default_factory=list) + + @property + def last_sold_prices(self) -> list[float]: + return [listing.price for listing in self.sold_listings] + + @property + def highest_price(self) -> float | None: + prices = self.last_sold_prices + return max(prices) if prices else None + + + class MCPImageUpload(BaseModel): + image_path: Path + remote_url: str + + + class MCPListingRequest(BaseModel): + sku: str + title: str + description: str + price: float + listing_format: str + image_urls: list[str] + + + class MCPListingResponse(BaseModel): + listing_id: str + status: str + + + def blank_inventory_row(folder: FolderImages) -> InventoryRow: + return InventoryRow( + item_number=folder.sku, + sku=folder.sku, + folder=str(folder.folder), + title="", + description=f"\n\nItem: {folder.sku}", + ) \ No newline at end of file diff --git a/gpt/__init__.py b/gpt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gpt/analyze.py b/gpt/analyze.py new file mode 100644 index 0000000..0f0bbfc --- /dev/null +++ b/gpt/analyze.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import base64 +import json +from pathlib import Path +from typing import Any + +from openai import AsyncOpenAI + +from core.env import AppConfig +from core.logging import get_logger +from core.models import FolderImages, InventoryRow, blank_inventory_row +from mcp.client import MCPClient +from utils.concurrency import run_with_concurrency +from utils.files import gather_folders + +logger = get_logger(__name__) + + +class InventoryAnalyzer: + def __init__(self, config: AppConfig) -> None: + self.config = config + self.client = AsyncOpenAI( + api_key=config.openai_api_key, + base_url=str(config.openai_api_base) if config.openai_api_base else None, + ) + + async def analyze_folder( + self, + folder: FolderImages, + mcp_client: MCPClient, + ) -> InventoryRow: + llm_data = await self._describe_folder(folder) + row = blank_inventory_row(folder) + row.title = llm_data.get("title", "") + row.description = f"{llm_data.get('description', '')}\n\nItem: {folder.sku}" + row.condition = llm_data.get("condition", "USED") + row.condition_note = llm_data.get("condition_note", "") + row.suggested_category_id = llm_data.get("suggested_category_id", "") + row.suggested_category_path = llm_data.get("suggested_category_path", "") + row.ebay_search_query = llm_data.get("ebay_search_query", row.title) + row.listing_format = llm_data.get("listing_format", "BIN") + row.marketplace_id = llm_data.get("marketplace_id", "") + row.category_id = llm_data.get("category_id", "") + row.notes_path = str(folder.folder / "notes.txt") if (folder.folder / "notes.txt").exists() else "" + row.image_urls = [image.name for image in folder.non_label_images] + row.price_source = "mcp-sold" + + quantity_value = llm_data.get("quantity") + if quantity_value: + try: + row.quantity = int(quantity_value) + except (TypeError, ValueError): + logger.debug("Unable to parse quantity '%s' for %s", quantity_value, folder.sku) + + price = await mcp_client.fetch_sold_prices(row.ebay_search_query or row.title) + row.recommended_price = price.recommended_price + row.price_rationale = price.rationale + row.last_sold_prices = price.last_sold_prices + row.highest_sold_price = price.highest_price + + return row + + async def _describe_folder(self, folder: FolderImages) -> dict[str, Any]: + images = folder.non_label_images[:10] + if not images and folder.label_image: + images = [folder.label_image] + content = [ + { + "type": "input_text", + "text": ( + "Provide listing metadata JSON with keys: title, description, condition, condition_note, " + "suggested_category_id, suggested_category_path, ebay_search_query, listing_format, marketplace_id, category_id." + ), + } + ] + for path in images: + encoded = base64.b64encode(path.read_bytes()).decode("utf-8") + mime = "image/png" if path.suffix.lower() == ".png" else "image/jpeg" + content.append( + { + "type": "input_image", + "image_base64": encoded, + "mime_type": mime, + } + ) + response = await self.client.responses.create( + model="gpt-5", + input=[ + { + "role": "system", + "content": [{"type": "input_text", "text": "You write structured JSON only."}], + }, + {"role": "user", "content": content}, + ], + ) + text = self._extract_text(response) + return json.loads(text) + + @staticmethod + def _extract_text(response) -> str: + for item in response.output: + for output in item.content: + if output.type == "output_text": + return output.text + raise ValueError("No text output found.") + + +async def analyze_inventory(root: Path, config: AppConfig) -> list[InventoryRow]: + folders = gather_folders(root) + analyzer = InventoryAnalyzer(config) + async with MCPClient.from_config(config) as mcp_client: + + async def worker(folder: FolderImages) -> InventoryRow: + return await analyzer.analyze_folder(folder, mcp_client) + + rows = await run_with_concurrency( + folders, + limit=config.concurrency_limit, + worker=worker, + ) + + return rows diff --git a/gpt/organize.py b/gpt/organize.py new file mode 100644 index 0000000..6625fa0 --- /dev/null +++ b/gpt/organize.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import base64 +import json +from pathlib import Path +from typing import Iterable + +from openai import AsyncOpenAI + +from core.env import AppConfig +from core.logging import get_logger +from core.models import ImageAssignment +from utils.files import iter_image_files + +logger = get_logger(__name__) + +BATCH_SIZE = 50 +SYSTEM_PROMPT = ( + "You are an assistant that groups raw product photos into folders based on SKU labels. " + "Return JSON with entries: filename, folder, is_label." +) + + +def _encode_image(path: Path) -> str: + return base64.b64encode(path.read_bytes()).decode("utf-8") + + +def _image_payload(path: Path) -> dict[str, str]: + mime = "image/png" if path.suffix.lower() == ".png" else "image/jpeg" + return {"type": "input_image", "image_base64": _encode_image(path), "mime_type": mime} + + +def _chunked(iterable: list[Path], size: int) -> Iterable[list[Path]]: + for idx in range(0, len(iterable), size): + yield iterable[idx : idx + size] + + +def _build_user_content(chunk: list[Path]) -> list[dict[str, str]]: + content: list[dict[str, str]] = [ + { + "type": "input_text", + "text": ( + "Group these images. Use existing alphanumeric labels written in the photo when present, " + "otherwise create a new folder label when the subject changes. Respond with JSON array " + "like [{\"filename\": \"IMG_1234.jpg\", \"folder\": \"A12\", \"is_label\": true}]." + ), + } + ] + for path in chunk: + content.append(_image_payload(path)) + return content + + +def _extract_text(response) -> str: + for item in response.output: + for output in item.content: + if output.type == "output_text": + return output.text + raise ValueError("No text output found in response.") + + +def _parse_assignments(text: str, root: Path) -> list[ImageAssignment]: + data = json.loads(text) + assignments: list[ImageAssignment] = [] + for entry in data: + filename = entry["filename"] + folder = entry["folder"] + is_label = entry.get("is_label", False) + source_path = root / filename + assignments.append( + ImageAssignment( + source_path=source_path, + destination_folder=folder, + is_label_image=is_label, + ) + ) + return assignments + + +async def organize_images(root: Path, config: AppConfig) -> list[ImageAssignment]: + client = AsyncOpenAI( + api_key=config.openai_api_key, + base_url=str(config.openai_api_base) if config.openai_api_base else None, + ) + images = list(iter_image_files(root)) + assignments: list[ImageAssignment] = [] + + for chunk in _chunked(images, BATCH_SIZE): + response = await client.responses.create( + model="gpt-5", + input=[ + {"role": "system", "content": [{"type": "input_text", "text": SYSTEM_PROMPT}]}, + {"role": "user", "content": _build_user_content(chunk)}, + ], + ) + text = _extract_text(response) + assignments.extend(_parse_assignments(text, root)) + + return assignments diff --git a/mcp/__init__.py b/mcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mcp/client.py b/mcp/client.py new file mode 100644 index 0000000..403051f --- /dev/null +++ b/mcp/client.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import asyncio +import base64 +import random +from pathlib import Path +from typing import Any + +import httpx + +from core.env import AppConfig +from core.logging import get_logger +from core.models import MCPImageUpload, MCPListingRequest, MCPListingResponse, PriceRecommendation, SoldListing + +logger = get_logger(__name__) + + +class MCPClient: + def __init__( + self, + base_url: str, + api_key: str, + timeout: float = 30.0, + retries: int = 3, + ) -> None: + self._client = httpx.AsyncClient( + base_url=base_url, + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + timeout=timeout, + ) + self._retries = retries + + @classmethod + def from_config(cls, config: AppConfig) -> "MCPClient": + if not config.mcp_endpoint or not config.mcp_api_key: + raise ValueError("MCP credentials are not configured.") + return cls(str(config.mcp_endpoint), config.mcp_api_key) + + async def __aenter__(self) -> "MCPClient": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: # type: ignore[override] + await self.close() + + async def close(self) -> None: + await self._client.aclose() + + async def _request(self, method: str, path: str, json: Any | None = None) -> Any: + for attempt in range(self._retries): + try: + response = await self._client.request(method, path, json=json) + response.raise_for_status() + return response.json() + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + if attempt == self._retries - 1: + raise + backoff = min(2 ** attempt + random.random(), 10) + logger.warning("MCP request failed (%s), retrying in %.2fs", exc, backoff) + await asyncio.sleep(backoff) + + async def invoke(self, tool: str, payload: dict[str, Any]) -> Any: + return await self._request("POST", "/invoke", {"tool": tool, "input": payload}) + + async def upload_image(self, sku: str, image_path: Path) -> MCPImageUpload: + data = image_path.read_bytes() + encoded = base64.b64encode(data).decode("utf-8") + payload = {"sku": sku, "filename": image_path.name, "content": encoded} + response = await self.invoke("upload_image", payload) + return MCPImageUpload(image_path=image_path, remote_url=response["url"]) + + async def create_listing(self, request: MCPListingRequest) -> MCPListingResponse: + response = await self.invoke("create_listing", request.model_dump()) + return MCPListingResponse(**response) + + async def publish_listing(self, listing_id: str) -> MCPListingResponse: + response = await self.invoke("publish_listing", {"listing_id": listing_id}) + return MCPListingResponse(**response) + + async def fetch_sold_prices(self, query: str) -> PriceRecommendation: + response = await self.invoke("search_sold", {"query": query}) + sold_listings = [SoldListing(**item) for item in response.get("results", [])] + if not sold_listings: + return PriceRecommendation(recommended_price=0.0, rationale="No comps found.") + highest = max(sold.price for sold in sold_listings) + recommended = round(highest * 0.9, 2) + rationale = f"Recommended price is 10% below highest sold price (${highest:.2f})." + return PriceRecommendation( + recommended_price=recommended, + rationale=rationale, + sold_listings=sold_listings, + ) \ No newline at end of file diff --git a/mcp/images.py b/mcp/images.py new file mode 100644 index 0000000..5884921 --- /dev/null +++ b/mcp/images.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +import base64 +from pathlib import Path + + +def encode_image_to_base64(path: Path) -> str: + return base64.b64encode(path.read_bytes()).decode("utf-8") diff --git a/mcp/listings.py b/mcp/listings.py new file mode 100644 index 0000000..afa22d4 --- /dev/null +++ b/mcp/listings.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from pathlib import Path +from urllib.parse import urlparse + +from core.env import AppConfig +from core.logging import get_logger +from core.models import InventoryRow, MCPListingRequest +from core.csv_io import overwrite_inventory_csv, read_inventory_csv +from mcp.client import MCPClient +from utils.concurrency import run_with_concurrency +from utils.files import is_image + +logger = get_logger(__name__) + + +def _resolve_image_paths(row: InventoryRow) -> list[Path]: + folder = Path(row.folder) + if not folder.exists(): + raise FileNotFoundError(f"Folder {folder} does not exist for row {row.sku}") + + paths: list[Path] = [] + if row.image_urls: + for image_ref in row.image_urls: + if image_ref.startswith("http"): + continue + image_path = Path(image_ref) + if not image_path.is_absolute(): + image_path = folder / image_ref + paths.append(image_path) + else: + paths = [path for path in folder.iterdir() if path.is_file() and is_image(path)] + return paths + + +def _is_remote(url: str) -> bool: + try: + return urlparse(url).scheme in {"http", "https"} + except ValueError: + return False + + +async def _create_listing_for_row( + client: MCPClient, + row: InventoryRow, + publish_after_create: bool, + upload_limit: int, +) -> InventoryRow: + image_paths = _resolve_image_paths(row) + uploads = await run_with_concurrency( + image_paths, + limit=max(1, min(upload_limit, len(image_paths) or 1)), + worker=lambda path: client.upload_image(row.sku, path), + ) + remote_urls = [upload.remote_url for upload in uploads] + row.image_urls = remote_urls + + request = MCPListingRequest( + sku=row.sku, + title=row.title, + description=row.description, + price=row.recommended_price, + listing_format=row.listing_format, + image_urls=remote_urls, + ) + listing = await client.create_listing(request) + row.mcp_listing_id = listing.listing_id + if publish_after_create: + await client.publish_listing(listing.listing_id) + return row + + +async def _publish_existing_listing(client: MCPClient, row: InventoryRow) -> InventoryRow: + if not row.mcp_listing_id: + logger.warning("Row %s has no listing id; skipping publish.", row.sku) + return row + await client.publish_listing(row.mcp_listing_id) + return row + + +async def process_uploads( + csv_path: Path, + draft_only: bool, + publish_only: bool, + config: AppConfig, +) -> list[InventoryRow]: + rows = read_inventory_csv(csv_path) + async with MCPClient.from_config(config) as client: + + async def worker(row: InventoryRow) -> InventoryRow: + if publish_only: + return await _publish_existing_listing(client, row) + publish_after_create = not draft_only + return await _create_listing_for_row( + client, + row, + publish_after_create, + upload_limit=config.concurrency_limit, + ) + + updated_rows = await run_with_concurrency( + rows, + limit=config.concurrency_limit, + worker=worker, + ) + + overwrite_inventory_csv(csv_path, updated_rows) + return updated_rows diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7b6fd37 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "ebay-agent" +version = "0.1.0" +description = "Inventory and listing automation tool powered by MCP" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "typer>=0.9.0", + "httpx>=0.27.0", + "pydantic>=2.6.0", + "python-dotenv>=1.0.1", + "openai>=1.13.3", +] + +[project.scripts] +ebay-agent = "app:main" + +[build-system] +requires = ["setuptools>=65.0"] +build-backend = "setuptools.build_meta" diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/concurrency.py b/utils/concurrency.py new file mode 100644 index 0000000..35b280e --- /dev/null +++ b/utils/concurrency.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Iterable +from typing import TypeVar, cast + +T = TypeVar("T") +R = TypeVar("R") + + +async def bounded_gather( + tasks: Iterable[Callable[[], Awaitable[R]]], + limit: int, +) -> list[R]: + semaphore = asyncio.Semaphore(limit) + factories = list(tasks) + results: list[R | None] = [None] * len(factories) + + async def runner(index: int, task_factory: Callable[[], Awaitable[R]]) -> None: + async with semaphore: + results[index] = await task_factory() + + await asyncio.gather(*(runner(idx, factory) for idx, factory in enumerate(factories))) + return [cast(R, result) for result in results] + + +async def run_with_concurrency( + items: Iterable[T], + limit: int, + worker: Callable[[T], Awaitable[R]], +) -> list[R]: + semaphore = asyncio.Semaphore(limit) + item_list = list(items) + results: list[R | None] = [None] * len(item_list) + + async def bound_work(index: int, item: T) -> None: + async with semaphore: + results[index] = await worker(item) + + await asyncio.gather(*(bound_work(idx, item) for idx, item in enumerate(item_list))) + return [cast(R, result) for result in results] diff --git a/utils/files.py b/utils/files.py new file mode 100644 index 0000000..f5fd06b --- /dev/null +++ b/utils/files.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from typing import Iterable + +from core.models import FolderImages, ImageAssignment + +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".webp"} + + +def is_image(path: Path) -> bool: + return path.suffix.lower() in IMAGE_EXTENSIONS + + +def iter_image_files(root: Path) -> Iterable[Path]: + for entry in sorted(root.iterdir()): + if entry.is_file() and is_image(entry): + yield entry + + +def ensure_folder(path: Path) -> Path: + path.mkdir(parents=True, exist_ok=True) + return path + + +def move_assignments(assignments: Iterable[ImageAssignment], root: Path) -> None: + for assignment in assignments: + if not assignment.source_path.exists(): + continue + target_dir = ensure_folder(root / assignment.destination_folder) + destination = target_dir / assignment.source_path.name + shutil.move(str(assignment.source_path), destination) + if assignment.is_label_image: + label_name = f"{assignment.destination_folder}{assignment.source_path.suffix}" + destination.rename(target_dir / label_name) + + +def gather_folders(root: Path) -> list[FolderImages]: + folders: list[FolderImages] = [] + for entry in sorted(root.iterdir()): + if not entry.is_dir(): + continue + images = [img for img in entry.iterdir() if img.is_file() and is_image(img)] + label_image = next( + (img for img in images if img.stem.lower() == entry.name.lower()), None + ) + folders.append( + FolderImages(folder=entry, sku=entry.name, label_image=label_image, images=images) + ) + return folders \ No newline at end of file From 421e448d0fa5686636cf714810bcfd92a7b5752c Mon Sep 17 00:00:00 2001 From: David Lozzi Date: Sat, 15 Nov 2025 17:59:26 -0500 Subject: [PATCH 2/5] new files --- .gitignore | 5 +++++ AGENTS.md | 31 +++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 .gitignore create mode 100644 AGENTS.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4302edc --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.env +.venv +__pycache__ +*.log +*.csv diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..2585840 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,31 @@ +# eBay Inventory & Listing Automation Tool + +A tool to automate the process of adding inventory to eBay and creating listings for it. + +## details + +- Organize raw photo drops into SKU-specific folders, rename label shots to `SKU.jpg`, and cap GPT batches at 50 images. +- Generate inventory CSVs by extracting metadata with GPT, pulling sold comps from MCP, and writing timestamped CSV files. +- Upload listing CSVs via MCP, supporting draft-only, publish-only, or combined create-and-publish workflows. + +## coding requirements + +### Python + +- always use `httpx` over `requests` +- if performing multiple API calls, consider if it can be done in parallel and use `asyncio` to run multiple calls in parallel + +## requirements + +- CLI workflow spans three modes: `--organize` for raw photo sorting, default inventory run for CSV generation, and `--upload` for MCP listing creation or publishing. +- Python 3.11+ in a virtual environment (`python -m venv .venv`, `pip install -e .`). +- Access to the eBay MCP server (`MCP_ENDPOINT`, `MCP_API_KEY`) for inventory generation and publishing. +- OpenAI API key with GPT-5 access (optionally configure `OPENAI_API_BASE`, `LOG_LEVEL`, `CONCURRENCY_LIMIT`). +- Mode-specific environment variables: `OPENAI_API_KEY` required for organize/inventory flows; MCP credentials required for inventory/publish steps. + +## learnings + +As you interact with the user and if you find anything that is out of the norm or an area where you and the user went back-and-forth, or the user had to correct you, save your learnings below. + +### learnings +Let your learnings in bullets below \ No newline at end of file From 3ad6e1477a1ac3ed86b819c985cd655549b900ed Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 15 Nov 2025 23:01:16 +0000 Subject: [PATCH 3/5] Refactor: Add agent context and operational notes to README Co-authored-by: davidlozzi1 --- AGENTS.md | 71 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 2585840..5c72c7c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,31 +1,62 @@ -# eBay Inventory & Listing Automation Tool +# eBay Inventory & Listing Automation Tool — Agent Context -A tool to automate the process of adding inventory to eBay and creating listings for it. +This file gives LLM-based agents the minimum context needed to navigate and extend the project safely. -## details +## Mission -- Organize raw photo drops into SKU-specific folders, rename label shots to `SKU.jpg`, and cap GPT batches at 50 images. -- Generate inventory CSVs by extracting metadata with GPT, pulling sold comps from MCP, and writing timestamped CSV files. -- Upload listing CSVs via MCP, supporting draft-only, publish-only, or combined create-and-publish workflows. +Automate the pipeline from raw photo dumps to live eBay listings: -## coding requirements +1. **Organize mode (`--organize`)**: GPT-vision batches (≤50 images) identify handwritten SKU labels, group images, and rename the label shot to `SKU.`. +2. **Inventory mode (default argument path)**: For each organized folder, GPT summarizes metadata while the MCP server provides sold-price comps; results are written to `listings_YYYYMMDD_HHMMSS.csv`. +3. **Publish mode (`--upload`)**: Reads a CSV, uploads images via MCP, creates listings (draft or live), publishes when requested, and persists the returned `mcp_listing_id` back into the CSV. -### Python +## Architecture Cheat Sheet -- always use `httpx` over `requests` -- if performing multiple API calls, consider if it can be done in parallel and use `asyncio` to run multiple calls in parallel +| Module | Purpose | +| --- | --- | +| `app.py` | Typer CLI entrypoint. Determines mode, sets up logging/config, and runs async workflows. | +| `core/env.py` | Loads `.env`, validates mode-specific requirements (`OPENAI_API_KEY`, `MCP_ENDPOINT`, etc.), exposes `AppConfig`. | +| `core/models.py` | Pydantic models for folders, image assignments, CSV rows (see `CSV_HEADERS`), MCP payloads, and pricing responses. | +| `core/csv_io.py` | Timestamped CSV writer, CSV reader, and overwrite helper for publish mode. | +| `gpt/organize.py` | Handles batching, image base64 encoding, prompt construction, and JSON parsing for folder assignments. | +| `gpt/analyze.py` | Builds listing metadata via GPT, appends description suffix `\n\nItem: `, and combines MCP sold-price data. | +| `mcp/client.py` | Async `httpx` client with retry/backoff wrapping MCP tools (`upload_image`, `create_listing`, `publish_listing`, `search_sold`). | +| `mcp/listings.py` | Orchestrates CSV-driven listing creation/publishing, resolves local image paths, enforces concurrency limits, and rewrites CSV rows. | +| `utils/files.py` | Filesystem helpers to detect images, ensure folders, move/rename files after organize mode, and collect SKU folders. | +| `utils/concurrency.py` | Semaphore-bound gather helpers that preserve ordering for deterministic CSV updates. | -## requirements +## External Services -- CLI workflow spans three modes: `--organize` for raw photo sorting, default inventory run for CSV generation, and `--upload` for MCP listing creation or publishing. -- Python 3.11+ in a virtual environment (`python -m venv .venv`, `pip install -e .`). -- Access to the eBay MCP server (`MCP_ENDPOINT`, `MCP_API_KEY`) for inventory generation and publishing. -- OpenAI API key with GPT-5 access (optionally configure `OPENAI_API_BASE`, `LOG_LEVEL`, `CONCURRENCY_LIMIT`). -- Mode-specific environment variables: `OPENAI_API_KEY` required for organize/inventory flows; MCP credentials required for inventory/publish steps. +- **OpenAI Responses API** (`gpt-5`): used for both organization (vision) and inventory analysis (vision + text). Always request JSON, respect batch limits, and send base64 images. +- **eBay MCP server** (`MCP_ENDPOINT`, `MCP_API_KEY`): provides search/comps, image upload, listing creation, and publishing. All eBay actions go through MCP tool invocations—never call eBay REST APIs directly. -## learnings +## Environment & Configuration -As you interact with the user and if you find anything that is out of the norm or an area where you and the user went back-and-forth, or the user had to correct you, save your learnings below. +- Copy `.env.example` → `.env`. Variables per mode: + - Organize: `OPENAI_API_KEY` + - Inventory: `OPENAI_API_KEY`, `MCP_ENDPOINT`, `MCP_API_KEY` + - Publish: `MCP_ENDPOINT`, `MCP_API_KEY` +- Optional: `OPENAI_API_BASE`, `LOG_LEVEL`, `CONCURRENCY_LIMIT` (defaults to 5; controls folder/image parallelism). +- Config access pattern: `config = get_config_for_mode("")`, then pass through to downstream modules. -### learnings -Let your learnings in bullets below \ No newline at end of file +## Operational Notes + +- **Concurrency:** Use helpers in `utils.concurrency`; don’t spawn unbounded tasks. Inventory folders and MCP uploads must respect `config.concurrency_limit`. +- **HTTP layer:** Always use `httpx.AsyncClient` with retries (already implemented in `mcp/client.py`). No `requests`. +- **Filesystem:** `utils.files.move_assignments` renames the label image to match folder name and skips missing files gracefully. +- **CSV schema:** `core/models.CSV_HEADERS` defines 24 columns. Descriptions must end with `Item: `. `mcp_listing_id` starts blank and is set during publish flow. +- **Logging:** `core/logging.setup_logging` provides a uniform format. Use `get_logger(__name__)`. +- **Error handling:** Missing env vars raise `ConfigError`. CLI catches and prints messages via Typer before exiting with status 1. + +## How to Extend Safely + +- Keep functions small/pure where possible; inject side effects (clients, paths, configs) for easier testing. +- When adding new MCP interactions, update `mcp/client.py` so retries/backoff stay consistent. +- Any new GPT flow should mirror existing JSON-only prompting style to keep parsing predictable. +- Update both `README.md` (user-facing) and this file (agent-facing) when workflows change. + +## Learnings + +Record notable clarifications or user-specific expectations below. + +- _None yet_ \ No newline at end of file From 12a2884654536b2873480a4141e4ac454084b299 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 15 Nov 2025 23:08:38 +0000 Subject: [PATCH 4/5] Configure setuptools to find app modules Co-authored-by: davidlozzi1 --- pyproject.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 7b6fd37..fc9f36b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,13 @@ dependencies = [ [project.scripts] ebay-agent = "app:main" +[tool.setuptools] +py-modules = ["app"] + +[tool.setuptools.packages.find] +where = ["."] +include = ["core*", "gpt*", "mcp*", "utils*"] + [build-system] requires = ["setuptools>=65.0"] build-backend = "setuptools.build_meta" From 1b359456e6bbbb25c54bfecc62a712d342810a72 Mon Sep 17 00:00:00 2001 From: David Lozzi Date: Sat, 15 Nov 2025 18:24:47 -0500 Subject: [PATCH 5/5] install --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 4302edc..0407c96 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ __pycache__ *.log *.csv +*.egg-info \ No newline at end of file