Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 241 additions & 94 deletions apps/minio/scan/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import importlib.util
import io
import json
Expand Down Expand Up @@ -30,19 +31,33 @@ def ensure_packages(packages: Dict[str, str]) -> List[str]:

INSTALLED_PACKAGES = ensure_packages(REQUIRED_PACKAGES)

import pandas as pd # noqa: E402 # pylint: disable=wrong-import-position
import streamlit as st # noqa: E402 # pylint: disable=wrong-import-position
from minio import Minio # noqa: E402 # pylint: disable=wrong-import-position
from streamlit.delta_generator import DeltaGenerator # noqa: E402 # pylint: disable=wrong-import-position

import pandas as pd
import streamlit as st
from minio import Minio
from streamlit.delta_generator import DeltaGenerator

VENV_PATH_DISPLAY = "~/devel_opemnt/venv/bin"
print(f"Virtual environment path: {VENV_PATH_DISPLAY}")


def running_in_streamlit() -> bool:
"""Return ``True`` when executed through ``streamlit run``."""

if getattr(st, "_is_running_with_streamlit", False):
return True
try:
from streamlit.runtime.scriptrunner import get_script_run_ctx

return get_script_run_ctx() is not None
except Exception: # pylint: disable=broad-except
return False


def get_client(endpoint: str, access_key: str, secret_key: str, secure: bool) -> Minio:
"""Return a MinIO client instance."""


return Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)


Expand Down Expand Up @@ -192,7 +207,8 @@ def copy_csv_without_field(
return original_fields, list(df.columns)


def load_config() -> dict:
def load_config(*, show_feedback: bool = True) -> dict:

"""Load connection settings from nocommit_minio.json.

The configuration file is stored alongside this module in
Expand All @@ -201,7 +217,6 @@ def load_config() -> dict:
"""

cfg_path = Path(__file__).resolve().parent / "nocommit_minio.json"

required_fields = {
"endpoint": "your-minio-endpoint:port",
"access_key": "your-access-key",
Expand All @@ -211,42 +226,53 @@ def load_config() -> dict:

if not cfg_path.exists():
cfg_path.write_text(json.dumps(required_fields, indent=4), encoding="utf-8")
msg = (
message = (
f"Configuration file not found. Created template configuration at {cfg_path}"
)
st.error(msg)
st.write(
"Update the following values in the generated file (set secure to true if"
" your deployment uses HTTPS):"

)
st.code(json.dumps(required_fields, indent=4), language="json")
print(msg)
print(message)
print("Required configuration fields:")
print(json.dumps(required_fields, indent=4))
if show_feedback:
st.error(message)
st.write(
"Update the following values in the generated file (set secure to true if"
" your deployment uses HTTPS):"
)
st.code(json.dumps(required_fields, indent=4), language="json")
else:
print(
"Update the generated configuration file with your MinIO credentials and"
" rerun the command."
)

return {}

try:
with open(cfg_path, encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as exc:
error_msg = f"Invalid JSON in {cfg_path}: {exc}"
print(error_msg)
if show_feedback:
st.error(error_msg)
return {}

except json.JSONDecodeError as e:
st.error(f"Invalid JSON in {cfg_path}: {e}")
return {}

def run_streamlit_app() -> None:
"""Render the Streamlit interface."""

st.title("MinIO CSV scanner")
st.write(f"Virtual environment path: {VENV_PATH_DISPLAY}")
st.title("MinIO CSV scanner")
st.write(f"Virtual environment path: {VENV_PATH_DISPLAY}")

if INSTALLED_PACKAGES:
st.success(
"Installed missing packages: " + ", ".join(sorted(set(INSTALLED_PACKAGES)))
)
if INSTALLED_PACKAGES:
st.success(
"Installed missing packages: " + ", ".join(sorted(set(INSTALLED_PACKAGES)))
)

config = load_config(show_feedback=True)
if not config:
return

config = load_config()
client = None
if config:
secure = bool(config.get("secure", False))
conn_msg = (
f"Connecting to MinIO at {config['endpoint']} as {config['access_key']} "
Expand All @@ -261,75 +287,196 @@ def load_config() -> dict:
secure,
)

fields_tab, stats_tab, modify_tab = st.tabs(
["List fields", "Field stats", "Copy without field"]
)

with fields_tab:
bucket1 = st.text_input("Bucket", key="bucket1")
prefix1 = st.text_input("Path prefix", key="prefix1")
if st.button("Scan", key="scan_btn") and client and bucket1:
progress = st.progress(0)
status = st.empty()
results = list_csv_fields(client, bucket1, prefix1, progress, status)
for path, fields in results:
st.write(f"**{path}**")
st.write(", ".join(fields))

with stats_tab:
bucket2 = st.text_input("Bucket", key="bucket2")
prefix2 = st.text_input("Path prefix", key="prefix2")
field_name = st.text_input("Field name")
if st.button("Analyze", key="analyze_btn") and client and bucket2 and field_name:
progress = st.progress(0)
status = st.empty()
total, missing = field_stats(
client, bucket2, prefix2, field_name, progress, status
)
st.write(f"Total rows: {total}")
st.write(f"Missing values: {missing}")
if total:
st.write(f"Missing ratio: {missing / total:.2%}")

with modify_tab:
bucket3 = st.text_input("Source bucket", key="bucket3")
object_path = st.text_input("CSV object path", key="object_path")
field_to_remove = st.text_input("Field to remove", key="field_to_remove")
dest_bucket = st.text_input(
"Destination bucket (optional)", key="dest_bucket"
fields_tab, stats_tab, modify_tab = st.tabs(
["List fields", "Field stats", "Copy without field"]
)
dest_object = st.text_input("Destination object name", key="dest_object")
if st.button("Copy CSV without field", key="copy_btn") and client:
if not bucket3 or not object_path or not field_to_remove or not dest_object:
st.error(
"Please provide bucket, object path, field, and destination object name."

with fields_tab:
bucket1 = st.text_input("Bucket", key="bucket1")
prefix1 = st.text_input("Path prefix", key="prefix1")
if st.button("Scan", key="scan_btn") and bucket1:
progress = st.progress(0)
status = st.empty()
results = list_csv_fields(client, bucket1, prefix1, progress, status)
for path, fields in results:
st.write(f"**{path}**")
st.write(", ".join(fields))

with stats_tab:
bucket2 = st.text_input("Bucket", key="bucket2")
prefix2 = st.text_input("Path prefix", key="prefix2")
field_name = st.text_input("Field name")
if st.button("Analyze", key="analyze_btn") and bucket2 and field_name:
progress = st.progress(0)
status = st.empty()
total, missing = field_stats(
client, bucket2, prefix2, field_name, progress, status
)
else:
try:
original_fields, remaining_fields = copy_csv_without_field(
client,
bucket3,
object_path,
field_to_remove,
dest_object,
dest_bucket or None,
st.write(f"Total rows: {total}")
st.write(f"Missing values: {missing}")
if total:
st.write(f"Missing ratio: {missing / total:.2%}")

with modify_tab:
bucket3 = st.text_input("Source bucket", key="bucket3")
object_path = st.text_input("CSV object path", key="object_path")
field_to_remove = st.text_input("Field to remove", key="field_to_remove")
dest_bucket = st.text_input(
"Destination bucket (optional)", key="dest_bucket"
)
dest_object = st.text_input("Destination object name", key="dest_object")
if st.button("Copy CSV without field", key="copy_btn"):
if not bucket3 or not object_path or not field_to_remove or not dest_object:
st.error(
"Please provide bucket, object path, field, and destination object name."
)
except ValueError as exc:
st.error(str(exc))
except Exception as exc: # pylint: disable=broad-except
st.error(f"Failed to copy object: {exc}")
else:
target_bucket = dest_bucket or bucket3
st.success(
f"Copied to {target_bucket}/{dest_object} with field '{field_to_remove}' removed."
)
st.write("Original fields:")
st.write(", ".join(original_fields))
st.write("Remaining fields:")
st.write(", ".join(remaining_fields))
print(
"Copied",
f"{bucket3}/{object_path} -> {target_bucket}/{dest_object}",
f"without field '{field_to_remove}'",
)
try:
original_fields, remaining_fields = copy_csv_without_field(
client,
bucket3,
object_path,
field_to_remove,
dest_object,
dest_bucket or None,
)
except ValueError as exc:
st.error(str(exc))
except Exception as exc: # pylint: disable=broad-except
st.error(f"Failed to copy object: {exc}")
else:
target_bucket = dest_bucket or bucket3
st.success(
f"Copied to {target_bucket}/{dest_object} with field '{field_to_remove}' removed."
)
st.write("Original fields:")
st.write(", ".join(original_fields))
st.write("Remaining fields:")
st.write(", ".join(remaining_fields))
print(
"Copied",
f"{bucket3}/{object_path} -> {target_bucket}/{dest_object}",
f"without field '{field_to_remove}'",
)


def run_cli() -> None:
"""Provide a CLI alternative to the Streamlit interface."""

parser = argparse.ArgumentParser(
description=(
"MinIO CSV utilities. Use `streamlit run main.py` for the web UI or the"
" commands below for terminal usage."
)
)
subparsers = parser.add_subparsers(dest="command")

list_parser = subparsers.add_parser(
"list",
help="List CSV files and show their fields",
)
list_parser.add_argument("bucket", help="Bucket name")
list_parser.add_argument(
"prefix",
nargs="?",
default="",
help="Prefix path to scan (defaults to entire bucket)",
)

stats_parser = subparsers.add_parser(
"stats",
help="Show total rows and missing values for a field",
)
stats_parser.add_argument("bucket", help="Bucket name")
stats_parser.add_argument("field", help="Field/column to analyze")
stats_parser.add_argument(
"prefix",
nargs="?",
default="",
help="Prefix path to scan (defaults to entire bucket)",
)

copy_parser = subparsers.add_parser(
"copy",
help="Copy a CSV object while removing a column",
)
copy_parser.add_argument("src_bucket", help="Source bucket")
copy_parser.add_argument("src_object", help="Source CSV object path")
copy_parser.add_argument("field", help="Field/column to remove")
copy_parser.add_argument("dest_object", help="Destination object path")
copy_parser.add_argument(
"--dest-bucket",
dest="dest_bucket",
help="Destination bucket (defaults to the source bucket)",
)

args = parser.parse_args()

if INSTALLED_PACKAGES:
print(
"Installed missing packages:", ", ".join(sorted(set(INSTALLED_PACKAGES)))
)

config = load_config(show_feedback=False)
if not config:
return

secure = bool(config.get("secure", False))
conn_msg = (
f"Connecting to MinIO at {config['endpoint']} as {config['access_key']} "
f"(secure={secure})"
)
print(conn_msg)
client = get_client(
config["endpoint"],
config["access_key"],
config["secret_key"],
secure,
)

if args.command == "list":
results = list_csv_fields(client, args.bucket, args.prefix)
if results:
for path, fields in results:
print(f"{path}: {', '.join(fields)}")
print(
"Scanned directory list written to",
Path(__file__).resolve().parent / "scanned_dirs.txt",
)
else:
print("No CSV files found for the provided bucket and prefix.")
elif args.command == "stats":
total, missing = field_stats(client, args.bucket, args.prefix, args.field)
print(f"Total rows: {total}")
print(f"Missing values: {missing}")
if total:
print(f"Missing ratio: {missing / total:.2%}")
elif args.command == "copy":
try:
original_fields, remaining_fields = copy_csv_without_field(
client,
args.src_bucket,
args.src_object,
args.field,
args.dest_object,
args.dest_bucket,
)
except ValueError as exc:
print(f"Error: {exc}")
return
print(
"Copied",
f"{args.src_bucket}/{args.src_object} -> {(args.dest_bucket or args.src_bucket)}/{args.dest_object}",
f"without field '{args.field}'",
)
print("Original fields:", ", ".join(original_fields))
print("Remaining fields:", ", ".join(remaining_fields))
else:
parser.print_help()


if running_in_streamlit():
run_streamlit_app()
elif __name__ == "__main__":
run_cli()

Loading