Diffusez les métriques des vidéos YouTube dans Kafka et traitez-les avec ksqlDB, avec en option l'envoi vers Slack via le connecteur HTTP de Kafka Connect.
- Producteurs: Scripts Python (
list.py,youtubeanalytic.py) qui récupèrent les statistiques via l'API YouTube Data et publient dans le topic Kafkayoutube_videos. - Pile Kafka: Zookeeper, Broker Kafka, Schema Registry, Kafka Connect, ksqlDB Server et Confluent Control Center sont définis dans
docker-compose.yaml. - Traitement de flux: Les streams/tables ksqlDB définis dans
ksql/create-stream-table.mdcalculent les deltas de métriques et formatent les messages. - Sorties: Le sink HTTP Slack via Kafka Connect lisant depuis
slack_output.
docker-compose.yaml– Services Confluent Platform.constants.py– Lit les valeurs de config/clé API depuisconfig/config.local. Le config est privélist.py– Publie les métriques des vidéos d'une playlist vers Kafka (youtube_videos).youtubeanalytic.py– Publie les métriques d'une seule vidéo vers Kafka.ksql/create-stream-table.md– SQL ksqlDB pour créer les streams/tables + le config du sink Slack.connectors/– Point de montage des plugins personnalisés pour Kafka Connect.requirement.txt– Dépendances Python.
- Docker et Docker Compose
- Python 3.10+
- Une clé API YouTube Data
Créez config/config.local avec votre clé API et vos identifiants :
[youtube]
API_KEY = <VOTRE_CLE_API_YOUTUBE>
PLAYLIST_ID = <PLAYLIST_ID_OPTIONNEL>
VIDEO_ID = <VIDEO_ID_OPTIONNEL>constants.py charge ces valeurs pour les producteurs.
python -m venv .venv
source .venv/bin/activate
pip install -r requirement.txtdocker compose up -dServices (ports par défaut) :
- Broker Kafka : 9092 (hôte), 29092 (interne)
- Zookeeper : 2181
- Schema Registry : 8081
- Kafka Connect : 8083
- ksqlDB Server : 8088
- Control Center : 9021 (UI: http://localhost:9021)
Pour tous les détails ksqlDB, consultez :
Utilisez le SQL dans ksql/create-stream-table.md. Le topic source youtube_videos doit exister (les producteurs le créeront en envoyant des messages).
Flux standard défini dans le document :
youtube_videosentrée des producteurs (JSON)youtube_analytics_changestable (deux dernières mesures par vidéo)youtube_analytics_change_stream_basestream (sur le changelog de la table)youtube_analytics_change_streamstream dérivé avec texte formaté et filtresslack_outputstream (Avro) pour le sink Slack
Flow des topics:
Avec la pile démarrée et config/config.local renseigné :
-
Publier les métriques d'une seule vidéo
python3 youtubeanalytic.py
-
Publier les métriques pour tous les éléments d'une playlist
python3 list.py
Les deux scripts envoient des messages JSON vers le topic Kafka youtube_videos avec la clé = video_id (dans list.py).
La configuration du connecteur se trouve en bas du fichier ksql/create-stream-table.md. Une fois que les topics/streams sont créés et que vous disposez d’une URL de webhook Slack, envoyez cette configuration à Kafka Connect.
Le connecteur doit lire depuis le topic slack_output et appliquer une transformation pour renommer le champ TEXT en text.


