-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpush_data.py
More file actions
57 lines (48 loc) · 1.92 KB
/
push_data.py
File metadata and controls
57 lines (48 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import sys
import json
import certifi
import pandas as pd
import numpy as np
import pymongo
from dotenv import load_dotenv
from Networksecurity.exception.exception import NetworkSecurityException
from Networksecurity.logging.logger import logger
dotenv_path = os.path.join(os.path.dirname(__file__), "../.env")
load_dotenv(dotenv_path)
MONGO_DB_URL = os.getenv("MONGO_DB_URL")
print(f"MongoDB URL: {MONGO_DB_URL}")
class NetworkDataExtract():
def __init__(self):
try:
logger.info("Initializing NetworkDataExtract class")
except Exception as e:
raise NetworkSecurityException(e, sys)
def csv_to_json_convertor(self, file_path):
try:
logger.info(f"Converting CSV to JSON: {file_path}")
data = pd.read_csv(file_path)
data.reset_index(drop=True, inplace=True)
records = list(json.loads(data.T.to_json()).values())
return records
except Exception as e:
raise NetworkSecurityException(e, sys)
def insert_data_mongodb(self, records, database, collection):
try:
logger.info(f"Inserting {len(records)} records into MongoDB: {database}.{collection}")
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL, tlsCAFile=certifi.where())
db = self.mongo_client[database]
col = db[collection]
col.insert_many(records)
return len(records)
except Exception as e:
raise NetworkSecurityException(e, sys)
if __name__ == '__main__':
FILE_PATH = os.path.join(os.path.dirname(__file__), "Network_Data/phisingData.csv")
DATABASE = "OMISDAMI"
COLLECTION = "NetworkData"
networkobj = NetworkDataExtract()
records = networkobj.csv_to_json_convertor(file_path=FILE_PATH)
print(records)
no_of_records = networkobj.insert_data_mongodb(records, DATABASE, COLLECTION)
print(f"Inserted Records: {no_of_records}")