-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathdocdbExportUsers.py
More file actions
133 lines (100 loc) · 4.91 KB
/
docdbExportUsers.py
File metadata and controls
133 lines (100 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import sys
import argparse
import pymongo
rolesToExport = {}
def exportUsers(appConfig):
client = pymongo.MongoClient(host=appConfig['uri'], appname='userexp')
database_names = client.list_database_names()
database_names.append("$external")
f = open(appConfig['usersFile'], "w+", encoding='utf-8')
for database_name in database_names:
print("")
if (database_name == 'local'):
print(f"Skipping database: {database_name}")
continue
print(f"Checking database: {database_name}")
database = client[database_name]
users = database.command('usersInfo')
if len(users['users']) == 0:
print(f"No users in database: {database_name}")
continue
use_db_printed = False
for user in users['users']:
""" Exclude serviceadmin user """
if user['user'] == "serviceadmin":
continue
if (database_name == "$external") and (user['user'].startswith("arn:aws:iam::") == False):
print(f"Skipping user: {user['user']}, user must start with 'arn:aws:iam::'")
continue
print(f"Exporting user: {user['user']}")
if (use_db_printed == False):
print(f"use {database_name}", file=f)
use_db_printed = True
print('db.createUser({user: "' + user['user'] + '", pwd: "REPLACE_THIS_PASS",' + ' roles: ' + str(user['roles']) + '});', file=f)
print(f"Checking roles for user: {user['user']}")
for userRole in user['roles']:
checkRole(database, userRole, database_name)
f.close()
print(f"Done! Users exported to {appConfig['usersFile']}")
def checkRole(database, userRole, database_name):
print (f"Checking role {userRole}")
""" A role can be assigned to multiple users so we only want to export the role definition once """
""" Build a dictionary to keep track of all user-defined roles assigned to users being exported """
try:
roleInfo = database.command({'rolesInfo': {'role': userRole['role'], 'db': userRole['db']}, 'showPrivileges': True, 'showBuiltinRoles': False})
if len(roleInfo['roles']) == 1:
role = roleInfo['roles'][0]
if (role['isBuiltin'] == False):
""" Check role against list of roles supported by DocumentDB """
if not role['role'] in rolesToExport:
""" If this is a user-defined role not already marked for export, mark it for export """
rolesToExport[role['role']] = role
except pymongo.errors.OperationFailure as e:
# DocumentDB does not allow custom roles in $external database
if (database_name == "$external"):
pass
else:
raise e
def exportRoles(appConfig):
with open(appConfig['rolesFile'], "w+", encoding='utf-8') as f:
print("use admin", file=f)
for role in rolesToExport:
print(f"Exporting role: {role}")
privileges = str(rolesToExport[role]['privileges'])
""" convert Python True/False to JSON true/false """
privileges = privileges.replace(": True}", ": true}")
privileges = privileges.replace(": False}", ": false}")
print('db.createRole({role: "' + rolesToExport[role]['role'] + '", privileges: ' + privileges + ', roles: ' + str(rolesToExport[role]['roles']) + '});', file=f)
f.close()
print(f"Done! Roles exported to {appConfig['rolesFile']}")
def main():
""" v1: Initial script, export users to a file """
parser = argparse.ArgumentParser(description='Export Amazon DocumentDB users and user defined roles to user_output.js file, can be used to import them to other instance. Note: Passwords are not exported.')
parser.add_argument('--skip-python-version-check',
required=False,
action='store_true',
help='Permit execution on Python 3.6 and prior')
parser.add_argument('--uri',
required=True,
type=str,
help='MongoDB Connection URI')
parser.add_argument('--users-file',
required=True,
type=str,
help='The users output file')
parser.add_argument('--roles-file',
required=True,
type=str,
help='The roles output file')
args = parser.parse_args()
MIN_PYTHON = (3, 7)
if (not args.skip_python_version_check) and (sys.version_info < MIN_PYTHON):
sys.exit("\nPython %s.%s or later is required.\n" % MIN_PYTHON)
appConfig = {}
appConfig['uri'] = args.uri
appConfig['usersFile'] = args.users_file
appConfig['rolesFile'] = args.roles_file
exportUsers(appConfig)
exportRoles(appConfig)
if __name__ == "__main__":
main()