diff --git a/.gitignore b/.gitignore index 0e4eeeb..0f2a5d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,16 +1,55 @@ +# Ignore archive files archive + +# Ignore private key files *.pem + +# Ignore VSCode settings .vscode + +# Ignore Python bytecode files *.pyc +__pycache__/ + +# Ignore output and input directories outputs inputs + +# Ignore specific model files model-user.json + +# Ignore Jupyter Notebook checkpoints .ipynb_checkpoints + +# Ignore log files *.log + +# Ignore macOS system files .DS_Store + +# Ignore development directories dev + +# Ignore pickle and data files *.pkl *.dat -ab_testing_example/ml-1m -*/ml-1m -.env \ No newline at end of file + +# Ignore specific directories +mlruns +mlartifacts +ml-1m + +# Ignore environment files +.env + +# Ignore virtual environment directories +venv/ +.env/ +env/ +ENV/ +env.bak/ +venv.bak/ + +# Ignore temporary files created by editors +*.swp +*~ \ No newline at end of file diff --git a/ab_testing_example_posthog/requirements.txt b/ab_testing_example_posthog/requirements.txt index 7deaf3b..7e55383 100644 --- a/ab_testing_example_posthog/requirements.txt +++ b/ab_testing_example_posthog/requirements.txt @@ -1 +1,3 @@ -Flask==1.1.2 \ No newline at end of file +Flask==1.1.2 +posthog +python-dotenv \ No newline at end of file diff --git a/bigquery_example/bigquery_example.ipynb b/bigquery_example/bigquery_example.ipynb deleted file mode 100644 index 6e2e567..0000000 --- a/bigquery_example/bigquery_example.ipynb +++ /dev/null @@ -1,68 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "df = pd.DataFrame(data={'uid':[1],'rec':[1],'pred_time':[1]})\n", - "df" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Send predictions to BigQuery\n", - "from google.oauth2 import service_account\n", - "import pandas_gbq\n", - "table_id = \"movie_recommendation_service.predicted_movies\" #change this\n", - "project_id = \"authentic-realm-276822\" #change this\n", - "credentials = service_account.Credentials.from_service_account_file('../model-user.json') #change this\n", - "pandas_gbq.to_gbq(df, table_id, project_id=project_id, if_exists = 'replace', credentials=credentials)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#Read predictions from BigQuery\n", - "import pandas as pd\n", - "from google.cloud import bigquery\n", - "import os\n", - "os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './model-user.json'\n", - "client = bigquery.Client()\n", - "sql = \"select * from movie_recommendation_service.predicted_movies\"\n", - "df = client.query(sql).to_dataframe()\n", - "df.head()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/bigquery_example/requirements.txt b/bigquery_example/requirements.txt deleted file mode 100644 index bce4cb8..0000000 --- a/bigquery_example/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -scikit-surprise -pandas -pandas_gbq \ No newline at end of file diff --git a/kafka_client_example/Untitled.ipynb b/kafka_client_example/Untitled.ipynb deleted file mode 100644 index f4a59b6..0000000 --- a/kafka_client_example/Untitled.ipynb +++ /dev/null @@ -1,90 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "c8be037a-946a-4174-b264-a6fb23692a3f", - "metadata": {}, - "outputs": [], - "source": [ - "from confluent_kafka import Consumer\n", - "\n", - "conf = {'bootstrap.servers': \"localhost:9092\",\n", - " 'group.id': \"foo4\",\n", - " 'enable.auto.commit': False,\n", - " 'auto.offset.reset': 'earliest'}\n", - "\n", - "consumer = Consumer(conf)" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "6b446731-c8a4-4402-89f3-8beb9352ddb3", - "metadata": {}, - "outputs": [], - "source": [ - "consumer.subscribe([\"mlops-topic\"])" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "cc869a19-6959-47b1-a664-0b3336785fc0", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "b'this is mlops course 3'" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "msg = consumer.poll(timeout=1.0)\n", - "msg.value()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c0e34d9b-d004-47e9-8c02-3337837b8a2c", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2b92160e-9991-4f61-96ff-79c08cb5af99", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.2" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/kafka_client_example/consumer_local_example.ipynb b/kafka_example/client_notebooks/consumer_local_example.ipynb similarity index 100% rename from kafka_client_example/consumer_local_example.ipynb rename to kafka_example/client_notebooks/consumer_local_example.ipynb diff --git a/kafka_client_example/producer_local_example.ipynb b/kafka_example/client_notebooks/producer_local_example.ipynb similarity index 100% rename from kafka_client_example/producer_local_example.ipynb rename to kafka_example/client_notebooks/producer_local_example.ipynb diff --git a/kafka_example/docker-compose.yml b/kafka_example/docker-compose.yml index a91362a..30a64d1 100644 --- a/kafka_example/docker-compose.yml +++ b/kafka_example/docker-compose.yml @@ -1,3 +1,5 @@ +version: '3.8' + services: zookeeper: image: 'confluentinc/cp-zookeeper:latest' @@ -6,6 +8,11 @@ services: ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" + networks: + - kafka-net + volumes: + - zookeeper_data:/var/lib/zookeeper/data + - zookeeper_log:/var/lib/zookeeper/log kafka: image: 'confluentinc/cp-kafka:latest' @@ -16,5 +23,19 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: /var/lib/kafka/data ports: - "9092:9092" + networks: + - kafka-net + volumes: + - kafka_data:/var/lib/kafka/data + +networks: + kafka-net: + driver: bridge + +volumes: + zookeeper_data: + zookeeper_log: + kafka_data: \ No newline at end of file diff --git a/kafka_example/readme.md b/kafka_example/readme.md index e610c94..7b3a734 100644 --- a/kafka_example/readme.md +++ b/kafka_example/readme.md @@ -10,30 +10,7 @@ Ensure you have the following installed on your machine: ## Setup Instructions -1. **Clone the repository** or create the following `docker-compose.yml` file in your project directory: - - ```yaml - services: - zookeeper: - image: 'confluentinc/cp-zookeeper:latest' - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - "2181:2181" - - kafka: - image: 'confluentinc/cp-kafka:latest' - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - ports: - - "9092:9092" - ``` +1. **Clone the repository** using git clone and navigate to kafka example directory. 2. **Start the Services**: diff --git a/model_example_recommendation_pytorch/recommend_pytorch_inf.py b/model_example_recommendation_pytorch/recommend_pytorch_inf.py deleted file mode 100644 index 5e43a88..0000000 --- a/model_example_recommendation_pytorch/recommend_pytorch_inf.py +++ /dev/null @@ -1,91 +0,0 @@ -from recommend_pytorch_train import MF -from surprise import Dataset -import numpy as np -import torch -import torch.nn as nn -import pandas as pd -import pprint - - -def get_top_n(model, testset, trainset, uid_input, movies_df, n=10): - - preds = [] - try: - uid_input = int(trainset.to_inner_uid(uid_input)) - except KeyError: - return preds - - # First map the predictions to each user. - for uid, iid, _ in testset: # inefficient - try: - uid_internal = int(trainset.to_inner_uid(uid)) - except KeyError: - continue - if uid_internal == uid_input: - try: - iid_internal = int(trainset.to_inner_iid(iid)) - movie_name = movies_df.loc[int(iid), 'name'] - preds.append((iid, movie_name, float( - model(torch.tensor([[uid_input, iid_internal]]))))) - except KeyError: - pass - # Then sort the predictions for each user and retrieve the k highest ones - if preds is not None: - preds.sort(key=lambda x: x[1], reverse=True) - if len(preds) > n: - preds = preds[:n] - return preds - - -def get_previously_seen(trainset, uid, movies_df): - seen = [] - for (iid, _) in trainset.ur[int(uid)]: - try: - seen.append(movies_df.loc[int(iid), 'name']) - except KeyError: - pass - if len(seen) > 10: - break - return seen - - -def main(): - # Data - movies_df = pd.read_csv('../data/ml-1m/movies.dat', sep="::", - header=None, engine='python') - movies_df.columns = ['iid', 'name', 'genre'] - movies_df.set_index('iid', inplace=True) - data = Dataset.load_builtin('ml-1m') - trainset = data.build_full_trainset() - testset = trainset.build_anti_testset() - - k = 100 # latent dimension - c_bias = 1e-6 - c_vector = 1e-6 - - model = MF(trainset.n_users, trainset.n_items, - k=k, c_bias=c_bias, c_vector=c_vector) - model.load_state_dict(torch.load('../data/models/recommendation_model_pytorch.pkl')) - model.eval() - - # Print the recommended items for sample users - sample_users = list(set([x[0] for x in testset]))[:4] - - for uid in sample_users: - - print('User:', uid) - print('\n') - - print('\tSeen:') - seen = get_previously_seen(trainset, uid, movies_df) - pprint.pprint(seen) - print('\n') - - print('\tRecommendations:') - recommended = get_top_n(model, testset, trainset, uid, movies_df, n=10) - pprint.pprint([x[1] for x in recommended]) - print('\n') - - -if __name__ == "__main__": - main() diff --git a/model_example_recommendation_pytorch/recommend_pytorch_train.py b/model_example_recommendation_pytorch/recommend_pytorch_train.py deleted file mode 100644 index 72e3604..0000000 --- a/model_example_recommendation_pytorch/recommend_pytorch_train.py +++ /dev/null @@ -1,136 +0,0 @@ -# https://github.com/NicolasHug/Surprise -# can be replaced by explicitly importing the movielens data -from surprise import Dataset -import numpy as np -import torch -import torch.nn as nn -import torch.nn.functional as F -from sklearn.utils import shuffle - -class Loader(): - current = 0 - - def __init__(self, x, y, batchsize=1024, do_shuffle=True): - self.shuffle = shuffle - self.x = x - self.y = y - self.batchsize = batchsize - self.batches = range(0, len(self.y), batchsize) - if do_shuffle: - # Every epoch re-shuffle the dataset - self.x, self.y = shuffle(self.x, self.y) - - def __iter__(self): - # Reset & return a new iterator - self.x, self.y = shuffle(self.x, self.y, random_state=0) - self.current = 0 - return self - - def __len__(self): - # Return the number of batches - return int(len(self.x) / self.batchsize) - - def __next__(self): - n = self.batchsize - if self.current + n >= len(self.y): - raise StopIteration - i = self.current - xs = torch.from_numpy(self.x[i:i + n]) - ys = torch.from_numpy(self.y[i:i + n]) - self.current += n - return (xs, ys) - - -class MF(nn.Module): - - def __init__(self, n_user, n_item, k=18, c_vector=1.0, c_bias=1.0): - super(MF, self).__init__() - self.k = k - self.n_user = n_user - self.n_item = n_item - self.c_bias = c_bias - self.c_vector = c_vector - - self.user = nn.Embedding(n_user, k) - self.item = nn.Embedding(n_item, k) - - # We've added new terms here: - self.bias_user = nn.Embedding(n_user, 1) - self.bias_item = nn.Embedding(n_item, 1) - self.bias = nn.Parameter(torch.ones(1)) - - def forward(self, train_x): - user_id = train_x[:, 0] - item_id = train_x[:, 1] - vector_user = self.user(user_id) - vector_item = self.item(item_id) - - # Pull out biases - bias_user = self.bias_user(user_id).squeeze() - bias_item = self.bias_item(item_id).squeeze() - biases = (self.bias + bias_user + bias_item) - - ui_interaction = torch.sum(vector_user * vector_item, dim=1) - - # Add bias prediction to the interaction prediction - prediction = ui_interaction + biases - return prediction - - def loss(self, prediction, target): - - def l2_regularize(array): - loss = torch.sum(array**2) - return loss - - loss_mse = F.mse_loss(prediction, target.squeeze()) - - # Add new regularization to the biases - prior_bias_user = l2_regularize(self.bias_user.weight) * self.c_bias - prior_bias_item = l2_regularize(self.bias_item.weight) * self.c_bias - - prior_user = l2_regularize(self.user.weight) * self.c_vector - prior_item = l2_regularize(self.item.weight) * self.c_vector - total = loss_mse + prior_user + prior_item + prior_bias_user + prior_bias_item - return total - - -def main(): - # Data - data = Dataset.load_builtin('ml-1m') - trainset = data.build_full_trainset() - uir = np.array([x for x in trainset.all_ratings()]) - train_x = test_x = uir[:, :2].astype(np.int64) # for simplicity - train_y = test_y = uir[:, 2].astype(np.float32) - - # Parameters - lr = 5e-3 - k = 100 # latent dimension - c_bias = 1e-6 - c_vector = 1e-6 - batchsize = 1024 - num_epochs = 40 - - model = MF(trainset.n_users, trainset.n_items, - k=k, c_bias=c_bias, c_vector=c_vector) - optimizer = torch.optim.Adam(model.parameters(), lr=lr) - - - for epoch in range(num_epochs): - dataloader = Loader(train_x, train_y, batchsize=batchsize) - itr = 0 - for batch in dataloader: - itr += 1 - prediction = model(batch[0]) - loss = model.loss(prediction, batch[1]) - optimizer.zero_grad() - loss.backward() - optimizer.step() - if itr % 100 == 0: - print(f"epoch: {epoch}. iteration: {itr}. training loss: {loss}") - - torch.save(model.state_dict(), - "../data/models/recommendation_model_pytorch.pkl") - - -if __name__ == '__main__': - main() diff --git a/ray_cluster_example/readme.md b/ray_cluster_example/readme.md index c12d979..866cdbc 100644 --- a/ray_cluster_example/readme.md +++ b/ray_cluster_example/readme.md @@ -29,7 +29,8 @@ Make sure you have the following installed on your system: ### 1. Clone the Repository ```bash -git clone https://github.com/yourusername/ray_cluster_example.git +git clone https://github.com/thejat/mlops-code-examples.git +cd mlops-code-examples cd ray_cluster_example ``` @@ -76,7 +77,7 @@ if __name__ == "__main__": To run the script inside the head node container: ```bash -docker exec -it ray-head python /app/example.py +docker exec -it ray_cluster_example-ray-head-1 python /app/example.py ``` This will distribute the computation across the Ray cluster and return the results.