Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# AgentOps Configuration
AGENTOPS_API_KEY=your_agentops_api_key_here

# YouTube API Configuration
YOUTUBE_CREDENTIALS_PATH=path/to/your/youtube_credentials.json
YOUTUBE_TOKEN_PATH=token.json

# Optional: Default upload settings
DEFAULT_PRIVACY_STATUS=private
DEFAULT_CATEGORY_ID=22

# Security Settings
MAX_FILE_SIZE_MB=128
AUTO_CLEANUP_TEMP_FILES=true
90 changes: 90 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import json
from typing import Optional, Dict, Any
import streamlit as st

class ConfigManager:
def __init__(self):
self.config_file = "app_config.json"
self.load_config()

def load_config(self) -> Dict[str, Any]:
"""Load configuration from file or create default"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
pass

return {
"agentops_api_key": "",
"youtube_credentials_path": "",
"default_privacy": "private",
"default_category": "People & Blogs",
"auto_cleanup_temp_files": True,
"max_file_size_mb": 128,
"supported_video_formats": ["mp4", "avi", "mov", "mkv", "webm"],
"supported_thumbnail_formats": ["png", "jpg", "jpeg", "gif"]
}

def save_config(self, config: Dict[str, Any]):
"""Save configuration to file"""
try:
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
st.error(f"Failed to save configuration: {str(e)}")

def get_env_vars(self) -> Dict[str, Optional[str]]:
"""Get configuration from environment variables"""
return {
"AGENTOPS_API_KEY": os.getenv("AGENTOPS_API_KEY"),
"YOUTUBE_CREDENTIALS_PATH": os.getenv("YOUTUBE_CREDENTIALS_PATH"),
"YOUTUBE_TOKEN_PATH": os.getenv("YOUTUBE_TOKEN_PATH", "token.json"),
}

def validate_api_key(self, api_key: str) -> bool:
"""Basic validation for AgentOps API key format"""
return len(api_key) > 10 and api_key.replace('-', '').replace('_', '').isalnum()

def validate_credentials_file(self, file_content: bytes) -> bool:
"""Validate YouTube API credentials file"""
try:
credentials = json.loads(file_content)
required_fields = ["client_id", "client_secret", "auth_uri", "token_uri"]

if "installed" in credentials:
client_info = credentials["installed"]
elif "web" in credentials:
client_info = credentials["web"]
else:
return False

return all(field in client_info for field in required_fields)
except (json.JSONDecodeError, KeyError):
return False

def get_secure_api_key(self) -> Optional[str]:
"""Get API key from session state or environment"""
if 'agentops_api_key' in st.session_state:
return st.session_state.agentops_api_key
return os.getenv("AGENTOPS_API_KEY")

def cleanup_temp_files(self):
"""Clean up temporary files"""
temp_patterns = ["temp_video_", "temp_thumbnail_", "scheduled_video_", "scheduled_thumbnail_"]

for filename in os.listdir("."):
if any(filename.startswith(pattern) for pattern in temp_patterns):
try:
os.remove(filename)
except OSError:
pass # File might be in use

def get_file_size_mb(self, file_obj) -> float:
"""Get file size in MB"""
file_obj.seek(0, 2) # Seek to end
size = file_obj.tell()
file_obj.seek(0) # Reset to beginning
return size / (1024 * 1024) # Convert to MB
9 changes: 8 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
streamlit
streamlit
agentops
google-auth
google-auth-oauthlib
google-auth-httplib2
google-api-python-client
schedule
python-dateutil
251 changes: 248 additions & 3 deletions streamlit_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,251 @@
import streamlit as st
import os
import datetime
import json
from youtube_uploader import YouTubeUploaderService, VideoSchedule

st.title("🎈 My new app")
st.write(
"Let's start building! For help and inspiration, head over to [docs.streamlit.io](https://docs.streamlit.io/)."
st.set_page_config(
page_title="YouTube Video Uploader & Scheduler",
page_icon="📹",
layout="wide"
)

st.title("📹 YouTube Video Uploader & Scheduler")
st.write("Upload videos to YouTube immediately or schedule them for later.")

if 'uploader_service' not in st.session_state:
st.session_state.uploader_service = None
st.session_state.authenticated = False

st.sidebar.header("🔑 Configuration")

agentops_api_key = st.sidebar.text_input(
"AgentOps API Key",
type="password",
help="Get your API key from https://www.agentops.ai/"
)

credentials_file = st.sidebar.file_uploader(
"YouTube API Credentials (JSON)",
type=['json'],
help="Download from Google Cloud Console"
)

if agentops_api_key and credentials_file and not st.session_state.authenticated:
if st.sidebar.button("Initialize Service"):
try:
credentials_content = credentials_file.read()
credentials_path = "temp_credentials.json"

with open(credentials_path, 'wb') as f:
f.write(credentials_content)

st.session_state.uploader_service = YouTubeUploaderService(
agentops_api_key=agentops_api_key,
credentials_path=credentials_path
)

if st.session_state.uploader_service.initialize():
st.session_state.authenticated = True
st.sidebar.success("✅ Service initialized successfully!")
st.rerun()
else:
st.sidebar.error("❌ Failed to authenticate with YouTube API")

except Exception as e:
st.sidebar.error(f"❌ Initialization failed: {str(e)}")

if st.session_state.authenticated and st.session_state.uploader_service:
tab1, tab2, tab3 = st.tabs(["📤 Upload Now", "⏰ Schedule Upload", "📊 Status"])

with tab1:
st.header("Upload Video Now")

col1, col2 = st.columns(2)

with col1:
video_file = st.file_uploader("Select Video File", type=['mp4', 'avi', 'mov', 'mkv'])
thumbnail_file = st.file_uploader("Select Thumbnail (Optional)", type=['png', 'jpg', 'jpeg'])

title = st.text_input("Video Title", max_chars=100)
description = st.text_area("Video Description", max_chars=5000)

with col2:
tags_input = st.text_area("Tags (one per line)")
tags = [tag.strip() for tag in tags_input.split('\n') if tag.strip()]

category_options = {
"Film & Animation": "1",
"Autos & Vehicles": "2",
"Music": "10",
"Pets & Animals": "15",
"Sports": "17",
"Travel & Events": "19",
"Gaming": "20",
"People & Blogs": "22",
"Comedy": "23",
"Entertainment": "24",
"News & Politics": "25",
"Howto & Style": "26",
"Education": "27",
"Science & Technology": "28",
"Nonprofits & Activism": "29"
}

category = st.selectbox("Category", list(category_options.keys()))
privacy = st.selectbox("Privacy", ["private", "unlisted", "public"])

if st.button("Upload Video", type="primary"):
if video_file and title:
with st.spinner("Uploading video..."):
video_path = f"temp_video_{video_file.name}"
with open(video_path, 'wb') as f:
f.write(video_file.read())

thumbnail_path = None
if thumbnail_file:
thumbnail_path = f"temp_thumbnail_{thumbnail_file.name}"
with open(thumbnail_path, 'wb') as f:
f.write(thumbnail_file.read())

try:
result = st.session_state.uploader_service.upload_video_now(
video_file_path=video_path,
title=title,
description=description,
tags=tags,
category_id=category_options[category],
privacy_status=privacy,
thumbnail_path=thumbnail_path
)

os.remove(video_path)
if thumbnail_path:
os.remove(thumbnail_path)

if result['success']:
st.success(f"✅ Video uploaded successfully!")
st.info(f"Video URL: {result['video_url']}")
else:
st.error(f"❌ Upload failed: {result['error']}")

except Exception as e:
st.error(f"❌ Upload error: {str(e)}")
else:
st.error("Please provide both video file and title")

with tab2:
st.header("Schedule Video Upload")

col1, col2 = st.columns(2)

with col1:
sched_video_file = st.file_uploader("Select Video File", type=['mp4', 'avi', 'mov', 'mkv'], key="sched")
sched_thumbnail_file = st.file_uploader("Select Thumbnail (Optional)", type=['png', 'jpg', 'jpeg'], key="sched_thumb")

sched_title = st.text_input("Video Title", max_chars=100, key="sched_title")
sched_description = st.text_area("Video Description", max_chars=5000, key="sched_desc")

with col2:
sched_tags_input = st.text_area("Tags (one per line)", key="sched_tags")
sched_tags = [tag.strip() for tag in sched_tags_input.split('\n') if tag.strip()]

sched_category = st.selectbox("Category", list(category_options.keys()), key="sched_cat")
sched_privacy = st.selectbox("Privacy", ["private", "unlisted", "public"], key="sched_priv")

schedule_date = st.date_input("Upload Date", min_value=datetime.date.today())
schedule_time = st.time_input("Upload Time")

if st.button("Schedule Upload", type="primary"):
if sched_video_file and sched_title:
try:
sched_video_path = f"scheduled_video_{sched_video_file.name}"
with open(sched_video_path, 'wb') as f:
f.write(sched_video_file.read())

sched_thumbnail_path = None
if sched_thumbnail_file:
sched_thumbnail_path = f"scheduled_thumbnail_{sched_thumbnail_file.name}"
with open(sched_thumbnail_path, 'wb') as f:
f.write(sched_thumbnail_file.read())

scheduled_datetime = datetime.datetime.combine(schedule_date, schedule_time)

success = st.session_state.uploader_service.schedule_video_upload(
scheduled_time=scheduled_datetime.isoformat(),
title=sched_title,
description=sched_description,
tags=sched_tags,
category_id=category_options[sched_category],
privacy_status=sched_privacy,
video_file_path=sched_video_path,
thumbnail_path=sched_thumbnail_path
)

if success:
st.success(f"✅ Video scheduled for {scheduled_datetime}")
else:
st.error("❌ Failed to schedule upload")

except Exception as e:
st.error(f"❌ Scheduling error: {str(e)}")
else:
st.error("Please provide both video file and title")

with tab3:
st.header("Upload Status")

if st.button("Refresh Status"):
st.rerun()

try:
status = st.session_state.uploader_service.get_upload_status()

st.subheader("Scheduler Status")
if status['scheduler_running']:
st.success("🟢 Scheduler is running")
else:
st.info("🟡 Scheduler is stopped")

st.subheader("Scheduled Uploads")
if status['scheduled_uploads']:
for upload in status['scheduled_uploads']:
with st.expander(f"📹 {upload['title']}"):
st.write(f"**Scheduled Time:** {upload['scheduled_time']}")
st.write(f"**Video File:** {upload['video_file']}")

if st.button(f"Cancel", key=f"cancel_{upload['title']}"):
if st.session_state.uploader_service.uploader.cancel_scheduled_upload(upload['title']):
st.success("Upload cancelled")
st.rerun()
else:
st.error("Failed to cancel upload")
else:
st.info("No scheduled uploads")

except Exception as e:
st.error(f"Error getting status: {str(e)}")

else:
st.info("👆 Please configure your API keys and credentials in the sidebar to get started.")

with st.expander("📖 Setup Instructions"):
st.markdown("""
### 1. Get AgentOps API Key
1. Visit [AgentOps](https://www.agentops.ai/)
2. Create an account
3. Generate an API key from settings

### 2. Get YouTube API Credentials
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
2. Create a new project or select existing
3. Enable YouTube Data API v3
4. Create credentials (OAuth 2.0 Client ID)
5. Download the JSON credentials file

### 3. Upload and Initialize
1. Enter your AgentOps API key
2. Upload your YouTube credentials JSON file
3. Click "Initialize Service"
4. Complete OAuth flow in browser if prompted
""")
Loading