Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions apps/singlestore/minio_pandasai_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Analyze MinIO CSV data with PandasAI in a Streamlit app.

This script downloads a CSV file from a MinIO bucket into a Pandas
``DataFrame`` and exposes it via a small web application powered by
`streamlit`. The app shows basic statistics and lets the user provide an
arbitrary natural-language prompt for `pandas-ai` to answer. It demonstrates
how to compute statistics such as:

* number of unique ``dev_id`` values
* per ``dev_id`` row counts
* oldest and most recent ``coll_dt`` timestamp per ``dev_id``
* total memory footprint (in bytes) per ``dev_id``

The script requires the following environment variables:

``MINIO_ENDPOINT`` – URL of the MinIO service (e.g. "play.min.io:9000")
``MINIO_ACCESS_KEY`` – MinIO access key
``MINIO_SECRET_KEY`` – MinIO secret key
``MINIO_BUCKET`` – Bucket containing the CSV file
``MINIO_OBJECT`` – Object name of the CSV file
``MINIO_SECURE`` – Set to "true" to use HTTPS (optional)
``OPENAI_API_KEY`` – API token used by pandas-ai for LLM access
``OPENAI_MODEL`` – Optional OpenAI model name (defaults to
``gpt-3.5-turbo``)

Running ``streamlit run minio_pandasai_stats.py`` will start a web interface
that displays the statistics table and, when the user submits a prompt, shows
the LLM's reply along with the active GPU and model information.
"""

from __future__ import annotations

import io
import os
from dataclasses import dataclass

import pandas as pd
from minio import Minio

try: # pandas-ai and streamlit are optional so the module can compile without them
import streamlit as st
from pandasai import SmartDataframe
from pandasai.llm.openai import OpenAI
except Exception: # pragma: no cover - fallback when optional deps missing
st = None # type: ignore
SmartDataframe = None # type: ignore
OpenAI = None # type: ignore


@dataclass
class MinioConfig:
endpoint: str
access_key: str
secret_key: str
bucket: str
obj_name: str
secure: bool = False


def read_csv_from_minio(cfg: MinioConfig, time_field: str) -> pd.DataFrame:
"""Return a DataFrame built from a CSV object stored in MinIO."""
client = Minio(
cfg.endpoint,
access_key=cfg.access_key,
secret_key=cfg.secret_key,
secure=cfg.secure,
)
response = client.get_object(cfg.bucket, cfg.obj_name)
try:
data = response.read().decode("utf-8")
return pd.read_csv(io.StringIO(data), parse_dates=[time_field])
finally:
response.close()
response.release_conn()


def compute_stats(df: pd.DataFrame, id_field: str, time_field: str) -> pd.DataFrame:
"""Return per-id row counts, time range, and memory usage."""
grouped = (
df.groupby(id_field)
.agg(rows=(id_field, "size"), oldest=(time_field, "min"), newest=(time_field, "max"))
)
sizes = df.groupby(id_field).apply(lambda x: x.memory_usage(deep=True).sum())
grouped["bytes"] = sizes
return grouped


def analyze_with_pandasai(df: pd.DataFrame, prompt: str, model_name: str) -> str:
"""Ask an LLM for a natural language summary using pandas-ai."""
if SmartDataframe is None or OpenAI is None:
raise RuntimeError("pandas-ai is not installed")
llm = OpenAI(api_token=os.environ["OPENAI_API_KEY"], model_name=model_name)
sdf = SmartDataframe(df, config={"llm": llm})
return sdf.chat(prompt)


def gpu_info() -> str:
"""Return the name of the active GPU, or a fallback description."""
try:
import torch

if torch.cuda.is_available():
return torch.cuda.get_device_name(0)
return "CPU"
except Exception:
return "unknown"


def main() -> None: # pragma: no cover - entry point for streamlit
if st is None:
raise RuntimeError("streamlit is required to run this script")

id_field = os.environ.get("ID_FIELD", "dev_id")
time_field = os.environ.get("TIME_FIELD", "coll_dt")
model_name = os.environ.get("OPENAI_MODEL", "gpt-3.5-turbo")
cfg = MinioConfig(
endpoint=os.environ["MINIO_ENDPOINT"],
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
bucket=os.environ["MINIO_BUCKET"],
obj_name=os.environ["MINIO_OBJECT"],
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)

df = read_csv_from_minio(cfg, time_field=time_field)
stats = compute_stats(df, id_field=id_field, time_field=time_field)

st.title("MinIO CSV statistics")
st.sidebar.write(f"GPU: {gpu_info()}")
st.sidebar.write(f"Model: {model_name}")

st.write(f"Unique {id_field} count: {len(stats)}")
st.dataframe(stats)

default_prompt = (
f"Count the unique values in '{id_field}'. For each '{id_field}', give the row count, "
f"earliest '{time_field}', latest '{time_field}', and total memory usage in bytes. "
"Return the answer as a table with columns id, rows, oldest, newest, bytes."
)
user_prompt = st.text_area("LLM Prompt", value=default_prompt)

if st.button("Run LLM"):
try:
reply = analyze_with_pandasai(df, user_prompt, model_name)
st.subheader("LLM Reply")
st.write(reply)
except Exception as exc: # pragma: no cover - runtime failures
st.error(f"pandas-ai analysis skipped: {exc}")


if __name__ == "__main__": # pragma: no cover - entry point
main()
159 changes: 159 additions & 0 deletions apps/singlestore/minio_to_singlestore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Load CSV data from MinIO into SingleStore and compute storage periods.

This script demonstrates how to read a CSV object stored in a MinIO bucket,
insert the rows into a SingleStore (MySQL compatible) table and calculate
for each record identifier the period during which data has been stored.

The script expects the following environment variables for configuration:

MINIO_ENDPOINT - URL to the MinIO service (e.g. "play.min.io:9000")
MINIO_ACCESS_KEY - MinIO access key
MINIO_SECRET_KEY - MinIO secret key
MINIO_BUCKET - Name of the bucket containing the CSV file
MINIO_OBJECT - Object name of the CSV file

S2_HOST - SingleStore host address
S2_PORT - SingleStore port (default: 3306)
S2_USER - Username for SingleStore
S2_PASSWORD - Password for SingleStore
S2_DATABASE - Target database name
S2_TABLE - Target table name
S2_ID_FIELD - Field name representing the identifier (default: "dev_id")
S2_TIME_FIELD - Field name representing the timestamp (default: "coll_dt")

The table is expected to already exist in the database with columns matching
those found in the CSV file.
"""

from __future__ import annotations

import csv
import io
import os
import sys
from dataclasses import dataclass
from typing import Iterable, List, Tuple

from minio import Minio
import pymysql

csv.field_size_limit(min(sys.maxsize, 10 ** 7))
@dataclass
class MinioConfig:
endpoint: str
access_key: str
secret_key: str
bucket: str
obj_name: str
secure: bool = False


@dataclass
class DBConfig:
host: str
user: str
password: str
database: str
table: str
port: int = 3306
id_field: str = "dev_id"
time_field: str = "coll_dt"


def read_csv_from_minio(cfg: MinioConfig) -> List[dict]:
"""Return sanitized rows from the CSV object stored in MinIO."""
client = Minio(
cfg.endpoint,
access_key=cfg.access_key,
secret_key=cfg.secret_key,
secure=cfg.secure,
)
response = client.get_object(cfg.bucket, cfg.obj_name)
try:
data = response.read().decode("utf-8")
reader = csv.DictReader(io.StringIO(data))
return [sanitize_row(row) for row in reader]
finally:
response.close()
response.release_conn()


def sanitize_row(row: dict) -> dict:
"""Convert empty strings to ``None`` so they become SQL NULL values."""
return {k: (v if v != "" else None) for k, v in row.items()}


def insert_rows(conn, table: str, rows: Iterable[dict]) -> None:
"""Insert iterable of dictionaries into the given table."""
rows = list(rows)
if not rows:
return
columns = list(rows[0].keys())
placeholders = ",".join(["%s"] * len(columns))
column_list = ",".join(columns)
sql = f"INSERT INTO {table} ({column_list}) VALUES ({placeholders})"
values = [tuple(row[col] for col in columns) for row in rows]
with conn.cursor() as cur:
cur.executemany(sql, values)
conn.commit()


def calculate_periods(conn, cfg: DBConfig) -> List[Tuple]:
"""Return list of (id, start_time, end_time, seconds) tuples."""
query = (
f"SELECT {cfg.id_field}, MIN({cfg.time_field}) AS start_time, "
f"MAX({cfg.time_field}) AS end_time, "
f"TIMESTAMPDIFF(SECOND, MIN({cfg.time_field}), MAX({cfg.time_field})) "
f"AS duration_seconds FROM {cfg.table} GROUP BY {cfg.id_field}"
)
with conn.cursor() as cur:
cur.execute(query)
return cur.fetchall()


def load_csv_to_singlestore(minio_cfg: MinioConfig, db_cfg: DBConfig) -> List[Tuple]:
"""Load CSV data from MinIO into SingleStore and compute storage periods."""
rows = read_csv_from_minio(minio_cfg)
conn = pymysql.connect(
host=db_cfg.host,
user=db_cfg.user,
password=db_cfg.password,
database=db_cfg.database,
port=db_cfg.port,
cursorclass=pymysql.cursors.Cursor,
)
try:
insert_rows(conn, db_cfg.table, rows)
return calculate_periods(conn, db_cfg)
finally:
conn.close()


def main() -> None: # pragma: no cover - convenience wrapper
minio_cfg = MinioConfig(
endpoint=os.environ["MINIO_ENDPOINT"],
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
bucket=os.environ["MINIO_BUCKET"],
obj_name=os.environ["MINIO_OBJECT"],
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)
db_cfg = DBConfig(
host=os.environ["S2_HOST"],
user=os.environ["S2_USER"],
password=os.environ["S2_PASSWORD"],
database=os.environ["S2_DATABASE"],
table=os.environ["S2_TABLE"],
port=int(os.environ.get("S2_PORT", "3306")),
id_field=os.environ.get("S2_ID_FIELD", "dev_id"),
time_field=os.environ.get("S2_TIME_FIELD", "coll_dt"),
)

periods = load_csv_to_singlestore(minio_cfg, db_cfg)
for row in periods:
ident, start, end, seconds = row
print(f"{ident}: {start} -> {end} ({seconds} seconds)")


if __name__ == "__main__": # pragma: no cover - manual execution
main()
34 changes: 34 additions & 0 deletions apps/singlestore/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# MinIO Tools

This module contains example scripts for interacting with CSV data stored in
MinIO buckets.

## `minio_to_singlestore.py`

1. Downloads a CSV file from a MinIO bucket.
2. Inserts the CSV rows into a SingleStore database table.
3. Calculates the period of stored data for each identifier by taking the
difference between the minimum and maximum timestamp values.

Configuration is done via environment variables as documented in the script's
module docstring. By default the script expects the identifier field to be
`dev_id` and the timestamp field to be `coll_dt` as shown in the provided CSV
header. Empty cells in the CSV are converted to SQL NULL values during upload.

## `minio_pandasai_stats.py`

Launches a small [Streamlit](https://streamlit.io) web app that reads a CSV
object from MinIO into a Pandas DataFrame and uses
[pandas-ai](https://github.com/gventuri/pandas-ai) to answer user-supplied
natural-language prompts. The app displays a per-`dev_id` statistics table and
shows the active GPU and LLM model in the sidebar. The prompt used for the LLM
can be changed directly in the browser and the reply is rendered beneath the
table.

Set the same `MINIO_*` variables as above and provide `OPENAI_API_KEY` for LLM
access. Optionally set `OPENAI_MODEL` to select a different OpenAI model. Run
the app with:

```bash
streamlit run apps/singlestore/minio_pandasai_stats.py
```
Loading