diff --git a/airflow_pipeline_example/Dockerfile b/airflow_pipeline_example/Dockerfile deleted file mode 100644 index 52d388c..0000000 --- a/airflow_pipeline_example/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM debian:buster-slim - -MAINTAINER Theja Tulabandhula - -RUN apt-get update \ - && apt-get install -y python3-pip python3-dev \ - && cd /usr/local/bin \ - && ln -s /usr/bin/python3 python - -RUN pip3 install pandas-gbq numpy - -RUN pip3 install torch==1.7.1+cpu torchvision==0.8.2+cpu torchaudio==0.7.2 -f https://download.pytorch.org/whl/torch_stable.html - -COPY . . - -RUN chmod +x run_transient_pipeline.sh - -CMD ./run_transient_pipeline.sh \ No newline at end of file diff --git a/airflow_pipeline_example/airflow_install.sh b/airflow_pipeline_example/airflow_install.sh deleted file mode 100755 index a190561..0000000 --- a/airflow_pipeline_example/airflow_install.sh +++ /dev/null @@ -1,31 +0,0 @@ -# airflow needs a home, ~/airflow is the default, -# but you can lay foundation somewhere else if you prefer -# (optional) -export AIRFLOW_HOME=~/airflow - -AIRFLOW_VERSION=2.1.4 -PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" -# For example: 3.6 -CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" -# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.4/constraints-3.6.txt -pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" - -# initialize the database -airflow db init - -airflow users create \ - --username admin \ - --firstname Peter \ - --lastname Parker \ - --role Admin \ - --email spiderman@superhero.org - -# start the web server, default port is 8080 -airflow webserver --port 8080 - -# start the scheduler -# open a new terminal or else run webserver with ``-D`` option to run it as a daemon -airflow scheduler - -# visit localhost:8080 in the browser and use the admin account you just -# created to login. Enable the example_bash_operator dag in the home page diff --git a/airflow_pipeline_example/recommend_airflow.py b/airflow_pipeline_example/recommend_airflow.py deleted file mode 100644 index f65b1ba..0000000 --- a/airflow_pipeline_example/recommend_airflow.py +++ /dev/null @@ -1,67 +0,0 @@ -# [START import_module] -from datetime import timedelta - -# The DAG object; we'll need this to instantiate a DAG -from airflow import DAG -# Operators; we need this to operate! -from airflow.operators.bash_operator import BashOperator -from airflow.utils.dates import days_ago - -# [END import_module] - -# [START default_args] -# These args will get passed on to each operator -# You can override them on a per-task basis during operator initialization -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': days_ago(2), - 'email': ['myself@theja.org'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5), - # 'queue': 'bash_queue', - # 'pool': 'backfill', - # 'priority_weight': 10, - # 'end_date': datetime(2016, 1, 1), - # 'wait_for_downstream': False, - # 'dag': dag, - # 'sla': timedelta(hours=2), - # 'execution_timeout': timedelta(seconds=300), - # 'on_failure_callback': some_function, - # 'on_success_callback': some_other_function, - # 'on_retry_callback': another_function, - # 'sla_miss_callback': yet_another_function, - # 'trigger_rule': 'all_success' -} -# [END default_args] - -# [START instantiate_dag] -dag = DAG( - 'recommend-pipeline', - default_args=default_args, - description='Run the transient training pipeline', - schedule_interval=timedelta(days=1), -) -# [END instantiate_dag] - -t1 = BashOperator( - task_id='docker-pipeline-run', - bash_command='docker run recommend_pipeline', - dag=dag, -) - - -# [START documentation] -dag.doc_md = __doc__ - -t1.doc_md = """\ -#### Transient Pipeline -Downloads movielens-100k, trains a recommendation model and saves top 10 recommendations to Google BigQuery. -""" -# [END documentation] - - -t1 -# [END tutorial] diff --git a/airflow_pipeline_example/recommend_pytorch_inf.py b/airflow_pipeline_example/recommend_pytorch_inf.py deleted file mode 100644 index 5e43a88..0000000 --- a/airflow_pipeline_example/recommend_pytorch_inf.py +++ /dev/null @@ -1,91 +0,0 @@ -from recommend_pytorch_train import MF -from surprise import Dataset -import numpy as np -import torch -import torch.nn as nn -import pandas as pd -import pprint - - -def get_top_n(model, testset, trainset, uid_input, movies_df, n=10): - - preds = [] - try: - uid_input = int(trainset.to_inner_uid(uid_input)) - except KeyError: - return preds - - # First map the predictions to each user. - for uid, iid, _ in testset: # inefficient - try: - uid_internal = int(trainset.to_inner_uid(uid)) - except KeyError: - continue - if uid_internal == uid_input: - try: - iid_internal = int(trainset.to_inner_iid(iid)) - movie_name = movies_df.loc[int(iid), 'name'] - preds.append((iid, movie_name, float( - model(torch.tensor([[uid_input, iid_internal]]))))) - except KeyError: - pass - # Then sort the predictions for each user and retrieve the k highest ones - if preds is not None: - preds.sort(key=lambda x: x[1], reverse=True) - if len(preds) > n: - preds = preds[:n] - return preds - - -def get_previously_seen(trainset, uid, movies_df): - seen = [] - for (iid, _) in trainset.ur[int(uid)]: - try: - seen.append(movies_df.loc[int(iid), 'name']) - except KeyError: - pass - if len(seen) > 10: - break - return seen - - -def main(): - # Data - movies_df = pd.read_csv('../data/ml-1m/movies.dat', sep="::", - header=None, engine='python') - movies_df.columns = ['iid', 'name', 'genre'] - movies_df.set_index('iid', inplace=True) - data = Dataset.load_builtin('ml-1m') - trainset = data.build_full_trainset() - testset = trainset.build_anti_testset() - - k = 100 # latent dimension - c_bias = 1e-6 - c_vector = 1e-6 - - model = MF(trainset.n_users, trainset.n_items, - k=k, c_bias=c_bias, c_vector=c_vector) - model.load_state_dict(torch.load('../data/models/recommendation_model_pytorch.pkl')) - model.eval() - - # Print the recommended items for sample users - sample_users = list(set([x[0] for x in testset]))[:4] - - for uid in sample_users: - - print('User:', uid) - print('\n') - - print('\tSeen:') - seen = get_previously_seen(trainset, uid, movies_df) - pprint.pprint(seen) - print('\n') - - print('\tRecommendations:') - recommended = get_top_n(model, testset, trainset, uid, movies_df, n=10) - pprint.pprint([x[1] for x in recommended]) - print('\n') - - -if __name__ == "__main__": - main() diff --git a/airflow_pipeline_example/recommend_pytorch_to_database.py b/airflow_pipeline_example/recommend_pytorch_to_database.py deleted file mode 100644 index f5e6ed2..0000000 --- a/airflow_pipeline_example/recommend_pytorch_to_database.py +++ /dev/null @@ -1,67 +0,0 @@ -from recommend_pytorch_train import MF -from recommend_pytorch_inf import get_top_n -import torch -import pandas as pd -import surprise -import datetime -import time -from google.oauth2 import service_account -import pandas_gbq - -def get_model_from_disk(): - start_time = time.time() - - # data preload - data = surprise.Dataset.load_builtin('ml-1m') - trainset = data.build_full_trainset() - testset = trainset.build_anti_testset() - movies_df = pd.read_csv('./movies.dat', - sep="::", header=None, engine='python') - movies_df.columns = ['iid', 'name', 'genre'] - movies_df.set_index('iid', inplace=True) - - # model preload - k = 100 # latent dimension - c_bias = 1e-6 - c_vector = 1e-6 - model = MF(trainset.n_users, trainset.n_items, - k=k, c_bias=c_bias, c_vector=c_vector) - model.load_state_dict(torch.load( - './recommendation_model_pytorch.pkl')) # TODO: prevent overwriting - model.eval() - - print('Model and data preloading completed in ', time.time()-start_time) - - return model, testset, trainset, movies_df - - -def get_predictions(model, testset, trainset, movies_df): - - # save the recommended items for a given set of users - sample_users = list(set([x[0] for x in testset]))[:4] - - - df_list = [] - for uid in sample_users: - recommended = get_top_n(model, testset, trainset, uid, movies_df, n=10) - df_list.append(pd.DataFrame(data={'uid':[uid]*len(recommended), - 'recommended': [x[1] for x in recommended]}, - columns=['uid','recommended'])) - - df = pd.concat(df_list, sort=False) - df['pred_time'] = str(datetime.datetime.now()) - return df - -def upload_to_bigquery(df): - #Send predictions to BigQuery - #requires a credential file in the current working directory - table_id = "movie_recommendation_service.predicted_movies" - project_id = "authentic-realm-276822" - credentials = service_account.Credentials.from_service_account_file('../model-user.json') - pandas_gbq.to_gbq(df, table_id, project_id=project_id, if_exists = 'replace', credentials=credentials) - -if __name__ == '__main__': - model, testset, trainset, movies_df = get_model_from_disk() - df = get_predictions(model, testset, trainset, movies_df) - print(df) - upload_to_bigquery(df) diff --git a/airflow_pipeline_example/recommend_pytorch_train.py b/airflow_pipeline_example/recommend_pytorch_train.py deleted file mode 100644 index 72e3604..0000000 --- a/airflow_pipeline_example/recommend_pytorch_train.py +++ /dev/null @@ -1,136 +0,0 @@ -# https://github.com/NicolasHug/Surprise -# can be replaced by explicitly importing the movielens data -from surprise import Dataset -import numpy as np -import torch -import torch.nn as nn -import torch.nn.functional as F -from sklearn.utils import shuffle - -class Loader(): - current = 0 - - def __init__(self, x, y, batchsize=1024, do_shuffle=True): - self.shuffle = shuffle - self.x = x - self.y = y - self.batchsize = batchsize - self.batches = range(0, len(self.y), batchsize) - if do_shuffle: - # Every epoch re-shuffle the dataset - self.x, self.y = shuffle(self.x, self.y) - - def __iter__(self): - # Reset & return a new iterator - self.x, self.y = shuffle(self.x, self.y, random_state=0) - self.current = 0 - return self - - def __len__(self): - # Return the number of batches - return int(len(self.x) / self.batchsize) - - def __next__(self): - n = self.batchsize - if self.current + n >= len(self.y): - raise StopIteration - i = self.current - xs = torch.from_numpy(self.x[i:i + n]) - ys = torch.from_numpy(self.y[i:i + n]) - self.current += n - return (xs, ys) - - -class MF(nn.Module): - - def __init__(self, n_user, n_item, k=18, c_vector=1.0, c_bias=1.0): - super(MF, self).__init__() - self.k = k - self.n_user = n_user - self.n_item = n_item - self.c_bias = c_bias - self.c_vector = c_vector - - self.user = nn.Embedding(n_user, k) - self.item = nn.Embedding(n_item, k) - - # We've added new terms here: - self.bias_user = nn.Embedding(n_user, 1) - self.bias_item = nn.Embedding(n_item, 1) - self.bias = nn.Parameter(torch.ones(1)) - - def forward(self, train_x): - user_id = train_x[:, 0] - item_id = train_x[:, 1] - vector_user = self.user(user_id) - vector_item = self.item(item_id) - - # Pull out biases - bias_user = self.bias_user(user_id).squeeze() - bias_item = self.bias_item(item_id).squeeze() - biases = (self.bias + bias_user + bias_item) - - ui_interaction = torch.sum(vector_user * vector_item, dim=1) - - # Add bias prediction to the interaction prediction - prediction = ui_interaction + biases - return prediction - - def loss(self, prediction, target): - - def l2_regularize(array): - loss = torch.sum(array**2) - return loss - - loss_mse = F.mse_loss(prediction, target.squeeze()) - - # Add new regularization to the biases - prior_bias_user = l2_regularize(self.bias_user.weight) * self.c_bias - prior_bias_item = l2_regularize(self.bias_item.weight) * self.c_bias - - prior_user = l2_regularize(self.user.weight) * self.c_vector - prior_item = l2_regularize(self.item.weight) * self.c_vector - total = loss_mse + prior_user + prior_item + prior_bias_user + prior_bias_item - return total - - -def main(): - # Data - data = Dataset.load_builtin('ml-1m') - trainset = data.build_full_trainset() - uir = np.array([x for x in trainset.all_ratings()]) - train_x = test_x = uir[:, :2].astype(np.int64) # for simplicity - train_y = test_y = uir[:, 2].astype(np.float32) - - # Parameters - lr = 5e-3 - k = 100 # latent dimension - c_bias = 1e-6 - c_vector = 1e-6 - batchsize = 1024 - num_epochs = 40 - - model = MF(trainset.n_users, trainset.n_items, - k=k, c_bias=c_bias, c_vector=c_vector) - optimizer = torch.optim.Adam(model.parameters(), lr=lr) - - - for epoch in range(num_epochs): - dataloader = Loader(train_x, train_y, batchsize=batchsize) - itr = 0 - for batch in dataloader: - itr += 1 - prediction = model(batch[0]) - loss = model.loss(prediction, batch[1]) - optimizer.zero_grad() - loss.backward() - optimizer.step() - if itr % 100 == 0: - print(f"epoch: {epoch}. iteration: {itr}. training loss: {loss}") - - torch.save(model.state_dict(), - "../data/models/recommendation_model_pytorch.pkl") - - -if __name__ == '__main__': - main() diff --git a/airflow_pipeline_example/run_transient_pipeline.sh b/airflow_pipeline_example/run_transient_pipeline.sh deleted file mode 100644 index b906195..0000000 --- a/airflow_pipeline_example/run_transient_pipeline.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -export GOOGLE_APPLICATION_CREDENTIALS=/model-user.json -python recommend_pytorch_to_database.py \ No newline at end of file diff --git a/airflow_setup/setup.sh b/airflow_setup/setup.sh new file mode 100755 index 0000000..75c469c --- /dev/null +++ b/airflow_setup/setup.sh @@ -0,0 +1,10 @@ +AIRFLOW_VERSION=2.10.5 + +# Extract the version of Python you have installed. If you're currently using a Python version that is not supported by Airflow, you may want to set this manually. +# See above for supported versions. +PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')" + +CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" +# For example this would install 2.10.5 with python 3.8: https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.8.txt + +pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" \ No newline at end of file diff --git a/cron_example/date_job_every_minute.txt b/cron_example/date_job_every_minute.txt index 9b1c40b..f12ee39 100644 --- a/cron_example/date_job_every_minute.txt +++ b/cron_example/date_job_every_minute.txt @@ -19,3 +19,5 @@ Wed Feb 14 04:01:01 PM CST 2024 Wed Feb 14 04:02:01 PM CST 2024 Wed Feb 14 04:03:01 PM CST 2024 Wed Feb 14 04:04:01 PM CST 2024 +Wed Feb 12 22:17:01 UTC 2025 +Wed Feb 12 22:18:01 UTC 2025 diff --git a/cron_example/run_this_job.sh b/cron_example/run_this_job.sh index d6f79cc..59d381a 100755 --- a/cron_example/run_this_job.sh +++ b/cron_example/run_this_job.sh @@ -1 +1 @@ -date >> /home/theja/class/mlops-code-examples/cron_example/date_job_every_minute.txt +date >> /home/theja/mlops-code-examples/cron_example/date_job_every_minute.txt diff --git a/cron_example/run_this_job.txt b/cron_example/run_this_job.txt index 3500d6b..8b3b3da 100644 --- a/cron_example/run_this_job.txt +++ b/cron_example/run_this_job.txt @@ -4,5 +4,5 @@ Step 1: open cron editor on the command line with `crontab -e` Step 2: insert the following line (replace localmachine with your username) -* * * * * /home/localmachine/run_this_job.sh +* * * * * /home/theja/mlops-code-examples/cron_example/run_this_job.sh diff --git a/flask_examples/flask_example_regression/flask_simple_regression_service.py b/flask_examples/flask_example_regression/flask_simple_regression_service.py index a5c0f33..6a8e578 100644 --- a/flask_examples/flask_example_regression/flask_simple_regression_service.py +++ b/flask_examples/flask_example_regression/flask_simple_regression_service.py @@ -1,5 +1,6 @@ from flask import Flask, jsonify, request import flask +import os def model(x): return 2*x+1 @@ -10,6 +11,19 @@ def model(x): def hello(): return f"flask version: {flask.__version__}" +""" +Endpoint to make a prediction based on the input parameter 'x'. + +This function handles GET requests to the /mypredict route. It checks if the +'x' parameter is present in the request arguments. If 'x' is present, it attempts +to convert 'x' to a float and pass it to the model for prediction. The result is +returned as a JSON response containing the input and the prediction. If 'x' is +not present or an error occurs during processing, an error message is returned. + +Returns: + Response: A JSON response containing the input and prediction if successful, + or an error message if 'x' is not provided or an error occurs. +""" @app.route("/mypredict", methods=["GET"]) def predict(): # check if x is in the arguments @@ -24,4 +38,7 @@ def predict(): if __name__ == '__main__': - app.run() + host = os.getenv('FLASK_RUN_HOST', '127.0.0.1') + port = int(os.getenv('FLASK_RUN_PORT', 5000)) + + app.run(host=host, port=port) diff --git a/flask_examples/flask_example_regression/requirements.txt b/flask_examples/flask_example_regression/requirements.txt new file mode 100644 index 0000000..174be8e --- /dev/null +++ b/flask_examples/flask_example_regression/requirements.txt @@ -0,0 +1 @@ +flask==3.1.0 \ No newline at end of file diff --git a/mlflow_example/mlflow_example.py b/mlflow_example/mlflow_example.py index 12c6e22..70bc9d0 100644 --- a/mlflow_example/mlflow_example.py +++ b/mlflow_example/mlflow_example.py @@ -1,11 +1,9 @@ import mlflow from mlflow.models import infer_signature - -import pandas as pd from sklearn import datasets from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression -from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score +from sklearn.metrics import accuracy_score # Load the Iris dataset @@ -19,7 +17,7 @@ # Define the model hyperparameters params = { "solver": "lbfgs", - "max_iter": 1000, + "max_iter": 10, "multi_class": "auto", "random_state": 8888, } @@ -39,7 +37,7 @@ mlflow.set_tracking_uri(uri="http://127.0.0.1:8080") # Create a new MLflow Experiment -mlflow.set_experiment("MLflow Quickstart") +mlflow.set_experiment("MLops Course") # Start an MLflow run with mlflow.start_run(): diff --git a/mlflow_example/requirements.txt b/mlflow_example/requirements.txt index b86c37f..f2e9409 100644 --- a/mlflow_example/requirements.txt +++ b/mlflow_example/requirements.txt @@ -1 +1 @@ -mlflow==2.10.1 +mlflow==2.20.1