From c1c27630f9852ff077932d12ec082998de8a6499 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 20:12:48 -0400 Subject: [PATCH 1/9] Start refactor --- src/functionality.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/functionality.py b/src/functionality.py index ee2985e..08e16f7 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -77,7 +77,6 @@ async def on_ready(): # This is a task loop, where it will self-update every 10 minutes. @tasks.loop(seconds=600.0) async def second_passing(): - print(time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(round(time.time())))) update_rp_list() await inform_update_list() @@ -166,14 +165,13 @@ async def _relay_message(message, prefix='', suffix=''): # Checks Admin Status def admin_check(ctx): - admin_roles = [role for role in ctx.guild.roles if role.permissions.administrator] - user_roles = ctx.author.roles - dro_bot_maintainer_role = ctx.guild.get_role(892524724230434826) - - for role in user_roles: - if role == dro_bot_maintainer_role: + # Bot Maintainer always gets privilege + for role in ctx.author.roles: + if role == guild_details.bot_maintainer_role_id: return True + # Otherwise, check if user has admin privilges + admin_roles = [role for role in ctx.guild.roles if role.permissions.administrator] for admin_role in admin_roles: if admin_role in ctx.author.roles: return True From 7536b13287af5a1cf6623f6c4d3c518e61856319 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 20:18:00 -0400 Subject: [PATCH 2/9] Finish first pass --- src/functionality.py | 407 ++++++++++++++++++++++++++++++------------- 1 file changed, 285 insertions(+), 122 deletions(-) diff --git a/src/functionality.py b/src/functionality.py index 08e16f7..f980b51 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -4,10 +4,9 @@ import csv import string import random -import asyncio from discord.ext import commands, tasks -from typing import Dict +from typing import Dict, List # This is a string generator for RP Serial Codes. But it can be used for something more in the future. @@ -54,13 +53,36 @@ def rp_id_check(): except FileNotFoundError: with open("rp_collection.csv", "w") as file: print("rp_collection.csv does not exist; the bot will now create one...") - fieldnames = ["serial_code", "approved", "approved_id", "local", "rp_name", "main_host", "main_host_id", - "rp_start_date", "rp_start_time", "rp_duration", "doc", "sign_up", "ongoing", "ended"] + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() return rp_list +# Converts Seconds to Days +def second_to_day(second: int) -> int: + answer = second / 86400 + return round(answer) + +# Converts Seconds to Hours +def second_to_hour(second: int) -> int: + answer = second / 3600 + return round(answer) class Functionality: def __init__(self, bot: commands.Bot = None, guild_details: Dict = None): @@ -104,11 +126,13 @@ async def on_message(message): return if message.channel.name in guild_details.relaying_channels: - # Right here, it will delete the message and notify the user who tried using the bot that they are banned. + # Right here, it will delete the message and notify the user who tried using the + # bot that they are banned. for user in ban_id_: if message.author.id == user["discord_id"] and not int(user["ended"]): await message.delete(message) - return await message.author.send("**You cannot use the bot due to your ban.**") + return await message.author.send("**You cannot use the bot due to your " + "ban.**") await _relay_message( message, @@ -133,6 +157,8 @@ def _validate_command(ctx: commands.Context) -> bool: return True for role in ctx.author.roles: + if role.id == guild_details.bot_maintainer_role_id: + return True if role.id in guild_details.command_always_accept_from_roles: return True @@ -164,7 +190,7 @@ async def _relay_message(message, prefix='', suffix=''): await to_channel.send(final_message) # Checks Admin Status - def admin_check(ctx): + def _check_bot_admin(ctx) -> bool: # Bot Maintainer always gets privilege for role in ctx.author.roles: if role == guild_details.bot_maintainer_role_id: @@ -176,38 +202,18 @@ def admin_check(ctx): if admin_role in ctx.author.roles: return True - return None - - # Converts Seconds to Days - def second_to_day(second: int): - answer = second / 86400 - return round(answer) - - # Converts Seconds to Hours - def second_to_hour(second: int): - answer = second / 3600 - return round(answer) + return False - def ban_profile_check(_id): + def _browse_ban_profile(user_id: int = None) -> List: ban_list = ban_id_check() found = [] for user in ban_list: - if str(_id) == user["discord_id"]: + if user_id is None or user['discord_id'] == str(user_id): found.append(user) return found - def ban_profile_check_all(): - - ban_list = ban_id_check() - found = [] - - for ban in ban_list: - found.append(ban) - - return found - def update_unban(_id): ban_ids = ban_id_check() updated_ban_list = list() @@ -215,9 +221,14 @@ def update_unban(_id): for user in ban_ids: if str(_id) == user["discord_id"]: - update_dict = {"discord_id": user["discord_id"], "discord_name": user["discord_name"], "ban_timestamp": user["ban_timestamp"], - "ban_length": user["ban_length"], "reason": user["reason"], "ended": 1} - + update_dict = { + "discord_id": user["discord_id"], + "discord_name": user["discord_name"], + "ban_timestamp": user["ban_timestamp"], + "ban_length": user["ban_length"], + "reason": user["reason"], + "ended": 1 + } inform_ban_list.append(update_dict) else: @@ -226,7 +237,14 @@ def update_unban(_id): updated_ban_list.append(update_dict) with open("ban_ids.csv", "w+") as file: - fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() @@ -250,8 +268,14 @@ def update_ban_list(): update_dict = user if user["ended"] == "0": - update_dict = {"discord_id": user["discord_id"], "discord_name": user["discord_name"], "ban_timestamp": user["ban_timestamp"], - "ban_length": user["ban_length"], "reason": user["reason"], "ended": 1} + update_dict = { + "discord_id": user["discord_id"], + "discord_name": user["discord_name"], + "ban_timestamp": user["ban_timestamp"], + "ban_length": user["ban_length"], + "reason": user["reason"], + "ended": 1 + } inform_ban_list.append(update_dict) @@ -261,7 +285,14 @@ def update_ban_list(): updated_ban_list.append(update_dict) with open("ban_ids.csv", "w+") as file: - fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() @@ -280,7 +311,8 @@ async def inform_update_list(): if answer >= 0: target = await bot.fetch_user(int(user['discord_id'])) - await target.send(f'**You are now unbanned from using the Server Bot. Please do not make the same offense again.**') + await target.send("**You are now unbanned from using the Server Bot. " + "Please do not commit the same offense again.**") return @@ -297,10 +329,22 @@ def update_rp_list(): update_dict = user if user["ongoing"] == "0": - update_dict = {"serial_code": user['serial_code'], "approved": user['approved'], "approved_id": user['approved_id'], "local": user['local'], - "rp_name": user['rp_name'], "main_host": user['main_host'], "main_host_id": user['main_host_id'], "rp_start_date": user['rp_start_date'], - "rp_start_time": user['rp_start_time'], "rp_duration": user['rp_duration'], "doc": user['doc'], - "sign_up": 0, "ongoing": 1, "ended": 0} + update_dict = { + "serial_code": user['serial_code'], + "approved": user['approved'], + "approved_id": user['approved_id'], + "local": user['local'], + "rp_name": user['rp_name'], + "main_host": user['main_host'], + "main_host_id": user['main_host_id'], + "rp_start_date": user['rp_start_date'], + "rp_start_time": user['rp_start_time'], + "rp_duration": user['rp_duration'], + "doc": user['doc'], + "sign_up": 0, + "ongoing": 1, + "ended": 0 + } else: update_dict = user @@ -308,8 +352,22 @@ def update_rp_list(): updated_rp_list.append(update_dict) with open("rp_collection.csv", "w+") as file: - fieldnames = ["serial_code", "approved", "approved_id", "local", "rp_name", "main_host", "main_host_id", - "rp_start_date", "rp_start_time", "rp_duration", "doc", "sign_up", "ongoing", "ended"] + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() @@ -335,10 +393,22 @@ def update_rp_list_choice(_id, val): value_change = change_dict[val] if str(_id) == user["serial_code"]: - update_dict = {"serial_code": user['serial_code'], "approved": user['approved'], "approved_id": user['approved_id'], "local": user['local'], - "rp_name": user['rp_name'], "main_host": user['main_host'], "main_host_id": user['main_host_id'], "rp_start_date": user['rp_start_date'], - "rp_start_time": user['rp_start_time'], "rp_duration": user['rp_duration'], "doc": user['doc'], - "sign_up": value_change[0], "ongoing": value_change[1], "ended": value_change[2]} + update_dict = { + "serial_code": user['serial_code'], + "approved": user['approved'], + "approved_id": user['approved_id'], + "local": user['local'], + "rp_name": user['rp_name'], + "main_host": user['main_host'], + "main_host_id": user['main_host_id'], + "rp_start_date": user['rp_start_date'], + "rp_start_time": user['rp_start_time'], + "rp_duration": user['rp_duration'], + "doc": user['doc'], + "sign_up": value_change[0], + "ongoing": value_change[1], + "ended": value_change[2] + } inform_update.append(update_dict) @@ -348,8 +418,22 @@ def update_rp_list_choice(_id, val): updated_rp_list.append(update_dict) with open("rp_collection.csv", "w+") as file: - fieldnames = ["serial_code", "approved", "approved_id", "local", "rp_name", "main_host", "main_host_id", - "rp_start_date", "rp_start_time", "rp_duration", "doc", "sign_up", "ongoing", "ended"] + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() @@ -443,15 +527,18 @@ def timezone_time_check(hour_change: int, inc_time: int): @bot.command( name='ban_id', brief='Bans a Discord User from using the Bot.', - help=('Bans a Discord User from using the Bot. There are multiple arguments needed to be filled.' + help=('Bans a Discord User from using the Bot. There are multiple arguments needed to be ' + 'filled.' '\nArguments: $ban_id ' - '\nExample: $ban_id 332456386946531328 259200 "There must be a open and close quotes for reason."') + '\nExample: $ban_id 332456386946531328 259200 "There must be a open and close ' + 'quotes for reason."') ) - async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, reason="Unstated Reason"): + async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, + reason="Unstated Reason"): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") ban_ids = ban_id_check() @@ -459,16 +546,32 @@ async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, try: target = await bot.fetch_user(user_id) except discord.NotFound as p: - return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the server.**") + return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " + "server.**") for dict_ban in ban_ids: if str(user_id) == str(dict_ban["discord_id"]) and not int(dict_ban["ended"]): - return await ctx.send(f"**{target.name} is already in the list and his ban has not ended.**") + return await ctx.send(f"**{target.name} is already in the list and his ban has " + "not ended.**") with open("ban_ids.csv", "a") as file_: - fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] writer = csv.DictWriter(file_, fieldnames=fieldnames) - writer.writerow({"discord_id": user_id, "discord_name": target.name, "ban_timestamp": round(time.time()), "ban_length": ban_length, "reason": reason, "ended": 0}) + writer.writerow({ + "discord_id": user_id, + "discord_name": target.name, + "ban_timestamp": round(time.time()), + "ban_length": ban_length, + "reason": reason, + "ended": 0 + }) if ban_length < 86400: days = second_to_hour(ban_length) @@ -478,7 +581,8 @@ async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, days = second_to_day(ban_length) word_ = f"{days} days" - await ctx.channel.send(f'**{target.name} ({user_id}) is now banned from using the Server Bot for {word_}.**' + await ctx.channel.send(f'**{target.name} ({user_id}) is now banned from using the ' + f'Server Bot for {word_}.**' f'\n`Reason: {reason}`') await target.send(f'**You are now banned from using the Server Bot for {word_}**' f'\n`Reason: {reason}`') @@ -486,7 +590,8 @@ async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, @bot.command( name='unban', brief='Unbans a Discord User from using the Bot.', - help=('Unbans a Discord User from using the Bot. There is only a single argument needed to be filled.' + help=('Unbans a Discord User from using the Bot. There is only a single argument needed ' + 'to be filled.' '\nArguments: $unban ' '\nExample: $unban 332456386946531328'), ) @@ -494,10 +599,10 @@ async def unban(ctx: commands.Context, _id: int): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - initial_check = ban_profile_check(_id) + initial_check = _browse_ban_profile(user_id=_id) if not initial_check: return await ctx.send("`Invalid Discord ID.`") @@ -506,14 +611,16 @@ async def unban(ctx: commands.Context, _id: int): for user in updated_list: target = await bot.fetch_user(int(user['discord_id'])) - await target.send(f'**You are now unbanned from using the Server Bot. Please do not make the same offense again.**') + await target.send('**You are now unbanned from using the Server Bot. Please do not ' + 'commit the same offense again.**') return await ctx.send("**Updated! Those whose ban is revoked will be notified.**") @bot.command( name='ban_profile', brief='Returns a profile of the banned user.', - help=('Returns a profile of the banned user. There is only a single mandatory argument needed to be filled.' + help=('Returns a profile of the banned user. There is only a single mandatory argument ' + 'needed to be filled.' '\nArguments: $ban_profile <_all>' '\nExample: $ban_profile 332456386946531328 False' "\n\n<_all> is optional, but it must be a boolean of either True or False. " @@ -524,10 +631,10 @@ async def ban_profile(ctx: commands.Context, _id: int, _all=False): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - profile = ban_profile_check(_id) + profile = _browse_ban_profile(user_id=_id) embed = None if not profile: @@ -535,14 +642,16 @@ async def ban_profile(ctx: commands.Context, _id: int, _all=False): for user in profile: date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) + description = ( + f'**Discord Name**: {user["discord_name"]}\n' + f'**Discord ID**: {user["discord_id"]}\n' + f'**Ban Date**: {date_}\n' + f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Reason**: {user["reason"]}\n' + f'**Ban Ended**: {user["ended"]}' + ) embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', - description=f'**Discord Name**: {user["discord_name"]}\n' - f'**Discord ID**: {user["discord_id"]}\n' - f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' - f'**Reason**: {user["reason"]}\n' - f'**Ban Ended**: {user["ended"]}', - + description=description, colour=discord.Color.dark_blue()) embed.set_thumbnail(url=ctx.author.avatar_url) embed.set_footer(text=ctx.author) @@ -558,7 +667,8 @@ async def ban_profile(ctx: commands.Context, _id: int, _all=False): @bot.command( name='ban_profile_all', brief='Returns all ban profiles from all banned or previously banned users.', - help=('Returns all ban profiles from all banned or previously banned users. An argument is not necessary.' + help=('Returns all ban profiles from all banned or previously banned users. An argument ' + 'is not necessary.' '\nArguments: $ban_profile_all' '\nExample: $ban_profile_all') ) @@ -566,24 +676,26 @@ async def ban_profile_all(ctx: commands.Context): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - profile = ban_profile_check_all() + profile = _browse_ban_profile() if not profile: return await ctx.send("`There are no bans in the Database.`") for user in profile: date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) + description = ( + f'**Discord Name**: {user["discord_name"]}\n' + f'**Discord ID**: {user["discord_id"]}\n' + f'**Ban Date**: {date_}\n' + f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Reason**: {user["reason"]}\n' + f'**Ban Ended**: {user["ended"]}' + ) embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', - description=f'**Discord Name**: {user["discord_name"]}\n' - f'**Discord ID**: {user["discord_id"]}\n' - f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' - f'**Reason**: {user["reason"]}\n' - f'**Ban Ended**: {user["ended"]}', - + description=description, colour=discord.Color.dark_blue()) embed.set_thumbnail(url=ctx.author.avatar_url) embed.set_footer(text=ctx.author) @@ -603,7 +715,7 @@ async def ban_list_update(ctx: commands.Context): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") updated_list = update_ban_list()[1] @@ -615,33 +727,44 @@ async def ban_list_update(ctx: commands.Context): if answer >= 0: target = await bot.fetch_user(int(user['discord_id'])) - await target.send(f'**You are now unbanned from using the Server Bot. Please do not make the same offense again.**') + await target.send('**You are now unbanned from using the Server Bot. ' + 'Please do not make the same offense again.**') await ctx.send("**Updated! Those whose ban is over will be notified.**") @bot.command( name='add_roleplay', brief='Adds a roleplay into the Database.', - help=('Adds a roleplay into the Database. There are a lot of arguments needed to be filled; ensure you do "open and close" quotes IF they are not integers.' - '\nArguments: $add_roleplay ' - ' ' - '\nExample: $add_roleplay "Helvetica Neue" 332456386946531328 42069 5 "https://www.epochconverter.com"' + help=('Adds a roleplay into the Database. There are a lot of arguments needed to be ' + 'filled; ensure you do "open and close" quotes IF they are not integers.' + '\nArguments: $add_roleplay ' + ' ' + ' ' + ' ' + '\nExample: $add_roleplay "Helvetica Neue" 332456386946531328 42069 5 ' + '"https://www.epochconverter.com"' '"HN" 1 0 0' "\n\n is the Main Host's Discord ID." "\n is an epoch number. Use https://www.epochconverter.com" "\n is an integer. In hours, how long the Roleplay is." - "\n is an optional field, but can be customized. It is used to find your RP in [rp_profile] and store in the database." - "\n is an optional field, can only be 0 or 1. --0: False (Hosted outside ODROS) | 1: True (Hosted within ODROS)--" - "\n is an optional field, can only be 0 or 1. --0: False (Sign ups are closed) | 1: True (Sign ups are open)--" - "\n is an optional field, can only be 0 or 1. --0: False (RP is not ongoing) | 1: True (RP is ongoing)--" - "\n is an optional field, can only be 0 or 1. --0: False (RP has not ended) | 1: True (RP ended)--"), + "\n is an optional field, but can be customized. It is used to find " + "your RP in [rp_profile] and store in the database." + "\n is an optional field, can only be 0 or 1. --0: False " + "(Hosted outside ODROS) | 1: True (Hosted within ODROS)--" + "\n is an optional field, can only be 0 or 1. --0: False " + "(Sign ups are closed) | 1: True (Sign ups are open)--" + "\n is an optional field, can only be 0 or 1. --0: False " + "(RP is not ongoing) | 1: True (RP is ongoing)--" + "\n is an optional field, can only be 0 or 1. --0: False " + "(RP has not ended) | 1: True (RP ended)--"), ) - async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, rp_start_date: int, rp_duration: int, - doc, serial_code=None, local: int = 1, sign_up: int = 1, ongoing: int = 0, ended: int = 0): + async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, + rp_start_date: int, rp_duration: int, doc, serial_code=None, + local: int = 1, sign_up: int = 1, ongoing: int = 0, ended: int = 0): if not _validate_command(ctx): return - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") rp_list = rp_id_check() @@ -651,7 +774,8 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, rp_sta try: arg = int(arg) if arg not in [0, 1]: - return await ctx.send(f"`, , , and Arguments must be either 0 or 1.`") + return await ctx.send("`, , , and " + "Arguments must be either 0 or 1.`") except ValueError: return await ctx.send(f"`Please input a valid integer. {arg} is not valid.`") @@ -668,16 +792,21 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, rp_sta ongoing_note = "still on its sign up phase" if serial_code == rp["serial_code"]: - return await ctx.send(f"**Serial code {rp['serial_code']} was already used. RP Name: `{rp['rp_name']}`" - f"\n**Hosted by {rp['main_host']}. Please use a different Serial Code.**") + return await ctx.send(f"**Serial code {rp['serial_code']} was already used. " + f"RP Name: `{rp['rp_name']}`" + f"\n**Hosted by {rp['main_host']}. Please use a different " + f"Serial Code.**") if rp_name.lower() == str(rp["rp_name"]).lower() and not int(rp["ended"]): - return await ctx.send(f"**{rp['rp_name']} is already in the Database and {ongoing_note} under the serial code:** `{rp['serial_code']}`**" + return await ctx.send(f"**{rp['rp_name']} is already in the Database and " + f"{ongoing_note} under the serial code:** " + f"`{rp['serial_code']}`**" f"\n**Hosted by {rp['main_host']} {local_note}.**") try: target = await bot.fetch_user(int(main_host_id)) except discord.NotFound as p: - return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the server.**") + return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " + "server.**") irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp_start_date))) other_timezones = str(irl_time) @@ -687,17 +816,45 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, rp_sta noted_time = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp_start_date))) with open("rp_collection.csv", "a") as file: - fieldnames = ["serial_code", "approved", "approved_id", "local", "rp_name", "main_host", - "main_host_id", "rp_start_date", "rp_start_time", "rp_duration", "doc", "sign_up", "ongoing", "ended"] + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended"] writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writerow({"serial_code": serial_code, "approved": ctx.author.name, "approved_id": ctx.author.id, "local": local, "rp_name": rp_name, "main_host": target.name, - "main_host_id": main_host_id, "rp_start_date": rp_start_date, "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", - "rp_duration": rp_duration, "doc": doc, "sign_up": sign_up, "ongoing": ongoing, "ended": ended}) - - await ctx.channel.send(f'**{rp_name} ({serial_code}) was inserted to the Database; hosted by {target.name}.**' + writer.writerow({ + "serial_code": serial_code, + "approved": ctx.author.name, + "approved_id": ctx.author.id, + "local": local, + "rp_name": rp_name, + "main_host": target.name, + "main_host_id": main_host_id, + "rp_start_date": rp_start_date, + "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", + "rp_duration": rp_duration, + "doc": doc, + "sign_up": sign_up, + "ongoing": ongoing, + "ended": ended + }) + + await ctx.channel.send(f'**{rp_name} ({serial_code}) was inserted to the Database; ' + f'hosted by {target.name}.**' f'\n**First Session Date: {noted_time}**' f'\n`Doc:` {doc}') - await target.send(f'**Your RP; {rp_name} ({serial_code}) was inserted to the Database; approved by {ctx.author.name}.**' + await target.send(f'**Your RP; {rp_name} ({serial_code}) was inserted to the Database; ' + f'approved by {ctx.author.name}.**' f'\n**You may announce it now and ensure to include the serial code.**' f'\n**First Session Date: {noted_time}**') @@ -707,7 +864,8 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, rp_sta help=('Returns a profile of an RP. There is only one required argument.' '\nArguments: $rp_profile <_id>' '\nExample: $rp_profile HN' - '\n\n<_id> is the Serial Code of said RP. It is case sensitive, so make sure it is correct.') + '\n\n<_id> is the Serial Code of said RP. It is case sensitive, so make sure it is ' + 'correct.') ) async def rp_profile(ctx: commands.Context, _id): if not _validate_command(ctx): @@ -836,7 +994,7 @@ async def rp_change_status(ctx: commands.Context, _id, value="0"): string_inform = None - if not admin_check(ctx): + if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") initial_check = rp_profile_check(_id) @@ -852,8 +1010,10 @@ async def rp_change_status(ctx: commands.Context, _id, value="0"): update = update[1] for rp in update: - string_inform = f"**You have updated {rp['rp_name']} ({rp['serial_code']}) by {change_dict[value]}!**" - + string_inform = ( + f"**You have updated {rp['rp_name']} ({rp['serial_code']}) by " + f"{change_dict[value]}!**" + ) return await ctx.send(string_inform) @bot.command( @@ -958,17 +1118,17 @@ async def timezone(ctx: commands.Context, seconds=None): hour_split = other_timezones[1].split(":") hour_change = int(hour_split[0]) text_ = [ - f"**-- Standard Time --**", + "**-- Standard Time --**", f"**EST**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", f"**CST**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", f"**MST**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", f"**PST**: {timezone_time_check(hour_change, -8)}:{hour_split[1]}:{hour_split[2]}", - f"**-- Daylight Time --**", + "**-- Daylight Time --**", f"**EDT**: {timezone_time_check(hour_change, -4)}:{hour_split[1]}:{hour_split[2]}", f"**CDT**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", f"**MDT**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", f"**PDT**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", - f"**-- Europe Time --**", + "**-- Europe Time --**", f"**UTC-1**: {timezone_time_check(hour_change, -1)}:{hour_split[1]}:{hour_split[2]}", f"**UTC**: {timezone_time_check(hour_change, 0)}:{hour_split[1]}:{hour_split[2]}", f"**UTC+1**: {timezone_time_check(hour_change, 1)}:{hour_split[1]}:{hour_split[2]}", @@ -989,11 +1149,13 @@ async def timezone(ctx: commands.Context, seconds=None): @bot.command( name='utc', brief='Lists the time for UTC timezones', - help=('Lists the time for UTC timezones. There is an optional argument; if you input an epoch/unix time after the command, ' + help=('Lists the time for UTC timezones. There is an optional argument; if you input an ' + 'epoch/unix time after the command, ' 'you will get said time instead. Date format: YYYY-MM-DD.' '\nArguments: $utc ' '\nExample: $utc 1' - '\n\n is an optional field; it is an epoch/unix second argument and MUST be an integer.') + '\n\n is an optional field; it is an epoch/unix second argument and MUST ' + 'be an integer.') ) async def utc(ctx: commands.Context, seconds=None): if not _validate_command(ctx): @@ -1054,7 +1216,8 @@ async def utc(ctx: commands.Context, seconds=None): @ban_profile.error async def ban_profile_error(ctx: commands.Context, error): if isinstance(error, commands.BadBoolArgument): - return await ctx.send("`Please input <_all> as either 0, 1 or False, True respectively.`") + return await ctx.send("`Please input <_all> as either 0, 1 or False, True " + "respectively.`") if isinstance(error, commands.BadArgument): return await ctx.send("`Please input <_id> as an integer, their Discord ID.`") @@ -1062,8 +1225,8 @@ async def ban_profile_error(ctx: commands.Context, error): @add_roleplay.error async def add_roleplay_error(ctx: commands.Context, error): if isinstance(error, commands.BadArgument): - return await ctx.send("`Please input , , , , " - ", , and as integers.`") + return await ctx.send("`Please input , , , " + ", , , and as integers.`") @ban_id.error async def ban_id_error(ctx: commands.Context, error): From 058dc1b28b081ba49daf49bbf53f77786116b4d5 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 20:50:47 -0400 Subject: [PATCH 3/9] Separate roleplays specific code to new file --- src/functionality.py | 441 +--------------------------------------- src/roleplays.py | 468 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 479 insertions(+), 430 deletions(-) create mode 100644 src/roleplays.py diff --git a/src/functionality.py b/src/functionality.py index f980b51..0d6705c 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -2,18 +2,11 @@ import discord import time import csv -import string -import random from discord.ext import commands, tasks from typing import Dict, List - -# This is a string generator for RP Serial Codes. But it can be used for something more in the future. -def create_code(): - lowercase_letter, uppercase_letter, digits = string.ascii_lowercase, string.ascii_uppercase, str(string.digits) - code = ''.join(random.sample(lowercase_letter + uppercase_letter + digits, 12)) - return code +from src import roleplays # This is the initial check for ban_ids.csv and obtaining its data. @@ -38,42 +31,6 @@ def ban_id_check(): return ban_list -# This is the initial check for rp_collection.csv and obtaining its data. -def rp_id_check(): - rp_list = None - try: - with open("rp_collection.csv", "r+", newline="") as file: - reader = csv.DictReader(file) - rp_list = [] - for x in reader: - rp_list.append(x) - - return rp_list - - except FileNotFoundError: - with open("rp_collection.csv", "w") as file: - print("rp_collection.csv does not exist; the bot will now create one...") - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - return rp_list - # Converts Seconds to Days def second_to_day(second: int) -> int: answer = second / 86400 @@ -90,7 +47,7 @@ def __init__(self, bot: commands.Bot = None, guild_details: Dict = None): async def on_ready(): # This is to start the checks and if said file does not exist, will create one. ban_id_check() - rp_id_check() + roleplays.rp_id_check() # This is to begin the task loop. second_passing.start() @@ -99,7 +56,7 @@ async def on_ready(): # This is a task loop, where it will self-update every 10 minutes. @tasks.loop(seconds=600.0) async def second_passing(): - update_rp_list() + roleplays.update_rp_list() await inform_update_list() # This will execute before the function will run. @@ -316,193 +273,6 @@ async def inform_update_list(): return - def update_rp_list(): - rp_list = rp_id_check() - updated_rp_list = list() - update_dict = None - - for user in rp_list: - answer = round(time.time()) - int(user["rp_start_date"]) - - if answer >= 0: - if user["ongoing"] == "1": - update_dict = user - - if user["ongoing"] == "0": - update_dict = { - "serial_code": user['serial_code'], - "approved": user['approved'], - "approved_id": user['approved_id'], - "local": user['local'], - "rp_name": user['rp_name'], - "main_host": user['main_host'], - "main_host_id": user['main_host_id'], - "rp_start_date": user['rp_start_date'], - "rp_start_time": user['rp_start_time'], - "rp_duration": user['rp_duration'], - "doc": user['doc'], - "sign_up": 0, - "ongoing": 1, - "ended": 0 - } - - else: - update_dict = user - - updated_rp_list.append(update_dict) - - with open("rp_collection.csv", "w+") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_rp_list: - writer.writerow(update) - - return updated_rp_list - - def update_rp_list_choice(_id, val): - change_dict = { - 0: [0, 0, 0], # Close Sign Ups - 1: [1, 0, 0], # Open Sign Ups - 2: [0, 1, 0], # Ongoing RP - 3: [0, 0, 1] # Ends RP - } - - rp_list = rp_id_check() - updated_rp_list = list() - inform_update = list() - - for user in rp_list: - value_change = change_dict[val] - - if str(_id) == user["serial_code"]: - update_dict = { - "serial_code": user['serial_code'], - "approved": user['approved'], - "approved_id": user['approved_id'], - "local": user['local'], - "rp_name": user['rp_name'], - "main_host": user['main_host'], - "main_host_id": user['main_host_id'], - "rp_start_date": user['rp_start_date'], - "rp_start_time": user['rp_start_time'], - "rp_duration": user['rp_duration'], - "doc": user['doc'], - "sign_up": value_change[0], - "ongoing": value_change[1], - "ended": value_change[2] - } - - inform_update.append(update_dict) - - else: - update_dict = user - - updated_rp_list.append(update_dict) - - with open("rp_collection.csv", "w+") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_rp_list: - writer.writerow(update) - - return updated_rp_list, inform_update - - def rp_profile_check(_id): - rp_list = rp_id_check() - found = [] - - for rp in rp_list: - if str(_id) == rp["serial_code"]: - found.append(rp) - - return found - - def rp_profile_check_sign_up(val): - if val != 0: - return - - rp_list = rp_id_check() - found = [] - - for rp in rp_list: - if rp["sign_up"] == "1": - found.append(rp) - - return found - - def rp_profile_check_ongoing(val): - if val != 1: - return - - rp_list = rp_id_check() - found = [] - - for rp in rp_list: - if rp["ongoing"] == "1": - found.append(rp) - - return found - - def rp_profile_check_ended(val): - if val != 2: - return - - rp_list = rp_id_check() - found = [] - - for rp in rp_list: - if rp["ended"] == "1": - found.append(rp) - - return found - - def rp_profile_check_all(val): - if val != 3: - return - - rp_list = rp_id_check() - found = [] - - for rp in rp_list: - found.append(rp) - - return found def timezone_time_check(hour_change: int, inc_time: int): hour_change = hour_change + inc_time @@ -767,96 +537,9 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - rp_list = rp_id_check() - - arg_check = [local, sign_up, ongoing, ended] - for arg in arg_check: - try: - arg = int(arg) - if arg not in [0, 1]: - return await ctx.send("`, , , and " - "Arguments must be either 0 or 1.`") - except ValueError: - return await ctx.send(f"`Please input a valid integer. {arg} is not valid.`") - - if not serial_code: - serial_code = create_code() - - for rp in rp_list: - local_note = "in DRO servers" - if rp["local"] == "0": - local_note = "outside of DRO servers" - - ongoing_note = "currently ongoing" - if rp["ongoing"] == "0": - ongoing_note = "still on its sign up phase" - - if serial_code == rp["serial_code"]: - return await ctx.send(f"**Serial code {rp['serial_code']} was already used. " - f"RP Name: `{rp['rp_name']}`" - f"\n**Hosted by {rp['main_host']}. Please use a different " - f"Serial Code.**") - if rp_name.lower() == str(rp["rp_name"]).lower() and not int(rp["ended"]): - return await ctx.send(f"**{rp['rp_name']} is already in the Database and " - f"{ongoing_note} under the serial code:** " - f"`{rp['serial_code']}`**" - f"\n**Hosted by {rp['main_host']} {local_note}.**") - - try: - target = await bot.fetch_user(int(main_host_id)) - except discord.NotFound as p: - return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " - "server.**") - - irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp_start_date))) - other_timezones = str(irl_time) - other_timezones = other_timezones.split(" ") - hour_split = other_timezones[1].split(":") - - noted_time = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp_start_date))) - - with open("rp_collection.csv", "a") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended"] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writerow({ - "serial_code": serial_code, - "approved": ctx.author.name, - "approved_id": ctx.author.id, - "local": local, - "rp_name": rp_name, - "main_host": target.name, - "main_host_id": main_host_id, - "rp_start_date": rp_start_date, - "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", - "rp_duration": rp_duration, - "doc": doc, - "sign_up": sign_up, - "ongoing": ongoing, - "ended": ended - }) - - await ctx.channel.send(f'**{rp_name} ({serial_code}) was inserted to the Database; ' - f'hosted by {target.name}.**' - f'\n**First Session Date: {noted_time}**' - f'\n`Doc:` {doc}') - await target.send(f'**Your RP; {rp_name} ({serial_code}) was inserted to the Database; ' - f'approved by {ctx.author.name}.**' - f'\n**You may announce it now and ensure to include the serial code.**' - f'\n**First Session Date: {noted_time}**') + await roleplays.command_add_roleplay( + bot, ctx, rp_name, main_host_id, rp_start_date, rp_duration, doc + ) @bot.command( name='rp_profile', @@ -867,39 +550,11 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, '\n\n<_id> is the Serial Code of said RP. It is case sensitive, so make sure it is ' 'correct.') ) - async def rp_profile(ctx: commands.Context, _id): + async def rp_profile(ctx: commands.Context, _id: str): if not _validate_command(ctx): return - profile = rp_profile_check(_id) - - embed = None - bool_string = {"0": False, "1": True} - - if not profile: - return await ctx.send("`That Serial Code does not exist in the database.`") - - for rp in profile: - date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) - embed = discord.Embed(title=f'RP Profile : {rp["rp_name"]}', - description=f'**RP Name**: {rp["rp_name"]}\n' - f'**Serial Code**: {rp["serial_code"]}\n' - f'**Approved by**: {rp["approved"]}\n' - f'**Main Host**: {rp["main_host"]}\n' - f'**RP First Session Date**: {date_}\n' - f'**RP Start Time**: {rp["rp_start_time"]}\n' - f'**RP Length in Hours**: {rp["rp_duration"]}\n' - f'**Local ODROS**: {bool_string[rp["local"]]}\n' - f'**Sign Up Open**: {bool_string[rp["sign_up"]]}\n' - f'**Ongoing**: {bool_string[rp["ongoing"]]}\n' - f'**Ended**: {bool_string[rp["ended"]]}\n' - f'**Document**: {rp["doc"]}', - - colour=discord.Color.dark_blue()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - return await ctx.send(embed=embed) + await roleplays.command_rp_profile(bot, ctx, _id) @bot.command( name='rp_profile_filter', @@ -917,51 +572,8 @@ async def rp_profile_filter(ctx: commands.Context, value="0"): if not _validate_command(ctx): return - try: - value = int(value) - except ValueError: - return await ctx.send("`Input a valid value from 0-3 only.`") - - rp_dict = { - 0: rp_profile_check_sign_up(value), - 1: rp_profile_check_ongoing(value), - 2: rp_profile_check_ended(value), - 3: rp_profile_check_all(value) - } + await roleplays.command_rp_profile_filter(bot, ctx, value) - bool_string = {"0": False, "1": True} - - try: - profile = rp_dict[value] - except KeyError: - return await ctx.send("`Input a valid value from 0-3 only.`") - - if not profile: - return await ctx.send("`Unfortunately, there is none in the database as of yet.`") - - for rp in profile: - date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) - embed = discord.Embed(title=f'RP Profile : {rp["rp_name"]}', - description=f'**RP Name**: {rp["rp_name"]}\n' - f'**Serial Code**: {rp["serial_code"]}\n' - f'**Approved by**: {rp["approved"]}\n' - f'**Main Host**: {rp["main_host"]}\n' - f'**RP First Session Date**: {date_}\n' - f'**RP Start Time**: {rp["rp_start_time"]}\n' - f'**RP Length in Hours**: {rp["rp_duration"]}\n' - f'**Local ODROS**: {bool_string[rp["local"]]}\n' - f'**Sign Up Open**: {bool_string[rp["sign_up"]]}\n' - f'**Ongoing**: {bool_string[rp["ongoing"]]}\n' - f'**Ended**: {bool_string[rp["ended"]]}\n' - f'**Document**: {rp["doc"]}', - - colour=discord.Color.dark_blue()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - await ctx.send(embed=embed) - - return @bot.command( name='rp_change_status', @@ -980,41 +592,10 @@ async def rp_change_status(ctx: commands.Context, _id, value="0"): if not _validate_command(ctx): return - change_dict = { - 0: "closing sign ups", - 1: "opening sign ups", - 2: "labelling the RP as ongoing", - 3: "ending the RP" - } - - try: - value = int(value) - except ValueError: - return await ctx.send("`Input a valid value from 0-3 only.`") - - string_inform = None - if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - initial_check = rp_profile_check(_id) - - if not initial_check: - return await ctx.send('`Invalid RP Serial Code, cannot update.`') - - try: - update = update_rp_list_choice(_id, value) - except KeyError: - return await ctx.send("`Input a valid value from 0-3 only.`") - - update = update[1] - - for rp in update: - string_inform = ( - f"**You have updated {rp['rp_name']} ({rp['serial_code']}) by " - f"{change_dict[value]}!**" - ) - return await ctx.send(string_inform) + await roleplays.command_rp_change_status(bot, ctx, _id, value=value) @bot.command( name='ping', @@ -1104,7 +685,7 @@ async def timezone(ctx: commands.Context, seconds=None): title_embed = f'It is {irl_time} UTC' except ValueError: - rp_list = rp_id_check() + rp_list = roleplays.rp_id_check() for rp in rp_list: if seconds == rp["serial_code"]: irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) diff --git a/src/roleplays.py b/src/roleplays.py new file mode 100644 index 0000000..1a27961 --- /dev/null +++ b/src/roleplays.py @@ -0,0 +1,468 @@ +# -*- coding: utf-8 -*- + +import csv +import random +import string +import time + +from typing import List, Tuple + +import discord +from discord.ext import commands + + +async def command_add_roleplay(bot, ctx: commands.Context, rp_name, main_host_id: int, + rp_start_date: int, rp_duration: int, doc, serial_code=None, + local: int = 1, sign_up: int = 1, ongoing: int = 0, ended: int = 0): + rp_list = rp_id_check() + + arg_check = [local, sign_up, ongoing, ended] + for arg in arg_check: + try: + arg = int(arg) + if arg not in [0, 1]: + return await ctx.send("`, , , and " + "Arguments must be either 0 or 1.`") + except ValueError: + return await ctx.send(f"`Please input a valid integer. {arg} is not valid.`") + + if not serial_code: + serial_code = create_code() + + for rp in rp_list: + local_note = "in DRO servers" + if rp["local"] == "0": + local_note = "outside of DRO servers" + + ongoing_note = "currently ongoing" + if rp["ongoing"] == "0": + ongoing_note = "still on its sign up phase" + + if serial_code == rp["serial_code"]: + return await ctx.send(f"**Serial code {rp['serial_code']} was already used. " + f"RP Name: `{rp['rp_name']}`" + f"\n**Hosted by {rp['main_host']}. Please use a different " + f"Serial Code.**") + if rp_name.lower() == str(rp["rp_name"]).lower() and not int(rp["ended"]): + return await ctx.send(f"**{rp['rp_name']} is already in the Database and " + f"{ongoing_note} under the serial code:** " + f"`{rp['serial_code']}`**" + f"\n**Hosted by {rp['main_host']} {local_note}.**") + + try: + target = await bot.fetch_user(int(main_host_id)) + except discord.NotFound as p: + return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " + "server.**") + + irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp_start_date))) + other_timezones = str(irl_time) + other_timezones = other_timezones.split(" ") + hour_split = other_timezones[1].split(":") + + noted_time = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp_start_date))) + + with open("rp_collection.csv", "a") as file: + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended"] + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writerow({ + "serial_code": serial_code, + "approved": ctx.author.name, + "approved_id": ctx.author.id, + "local": local, + "rp_name": rp_name, + "main_host": target.name, + "main_host_id": main_host_id, + "rp_start_date": rp_start_date, + "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", + "rp_duration": rp_duration, + "doc": doc, + "sign_up": sign_up, + "ongoing": ongoing, + "ended": ended + }) + + await ctx.channel.send(f'**{rp_name} ({serial_code}) was inserted to the Database; ' + f'hosted by {target.name}.**' + f'\n**First Session Date: {noted_time}**' + f'\n`Doc:` {doc}') + await target.send(f'**Your RP; {rp_name} ({serial_code}) was inserted to the Database; ' + f'approved by {ctx.author.name}.**' + f'\n**You may announce it now and ensure to include the serial code.**' + f'\n**First Session Date: {noted_time}**') + + +async def command_rp_profile(bot, ctx: commands.Context, _id): + profile = rp_profile_check(_id) + + embed = None + bool_string = {"0": False, "1": True} + + if not profile: + await ctx.send("`That Serial Code does not exist in the database.`") + return + + for rp in profile: + date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) + embed = discord.Embed(title=f'RP Profile : {rp["rp_name"]}', + description=f'**RP Name**: {rp["rp_name"]}\n' + f'**Serial Code**: {rp["serial_code"]}\n' + f'**Approved by**: {rp["approved"]}\n' + f'**Main Host**: {rp["main_host"]}\n' + f'**RP First Session Date**: {date_}\n' + f'**RP Start Time**: {rp["rp_start_time"]}\n' + f'**RP Length in Hours**: {rp["rp_duration"]}\n' + f'**Local ODROS**: {bool_string[rp["local"]]}\n' + f'**Sign Up Open**: {bool_string[rp["sign_up"]]}\n' + f'**Ongoing**: {bool_string[rp["ongoing"]]}\n' + f'**Ended**: {bool_string[rp["ended"]]}\n' + f'**Document**: {rp["doc"]}', + + colour=discord.Color.dark_blue()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + await ctx.send(embed=embed) + + +async def command_rp_profile_filter(bot, ctx: commands.Context, value): + try: + value = int(value) + except ValueError: + await ctx.send("`Input a valid value from 0-3 only.`") + return + + rp_dict = { + 0: rp_profile_check_sign_up(value), + 1: rp_profile_check_ongoing(value), + 2: rp_profile_check_ended(value), + 3: rp_profile_check_all(value) + } + + bool_string = {"0": False, "1": True} + + try: + profile = rp_dict[value] + except KeyError: + await ctx.send("`Input a valid value from 0-3 only.`") + return + + if not profile: + await ctx.send("`Unfortunately, there is none in the database as of yet.`") + return + + for rp in profile: + date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) + embed = discord.Embed(title=f'RP Profile : {rp["rp_name"]}', + description=f'**RP Name**: {rp["rp_name"]}\n' + f'**Serial Code**: {rp["serial_code"]}\n' + f'**Approved by**: {rp["approved"]}\n' + f'**Main Host**: {rp["main_host"]}\n' + f'**RP First Session Date**: {date_}\n' + f'**RP Start Time**: {rp["rp_start_time"]}\n' + f'**RP Length in Hours**: {rp["rp_duration"]}\n' + f'**Local ODROS**: {bool_string[rp["local"]]}\n' + f'**Sign Up Open**: {bool_string[rp["sign_up"]]}\n' + f'**Ongoing**: {bool_string[rp["ongoing"]]}\n' + f'**Ended**: {bool_string[rp["ended"]]}\n' + f'**Document**: {rp["doc"]}', + + colour=discord.Color.dark_blue()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + await ctx.send(embed=embed) + + +async def command_rp_change_status(bot, ctx: commands.Context, _id, value="0"): + change_dict = { + 0: "closing sign ups", + 1: "opening sign ups", + 2: "labelling the RP as ongoing", + 3: "ending the RP" + } + + try: + value = int(value) + except ValueError: + await ctx.send("`Input a valid value from 0-3 only.`") + return + + string_inform = None + + initial_check = rp_profile_check(_id) + + if not initial_check: + await ctx.send('`Invalid RP Serial Code, cannot update.`') + return + + try: + update = update_rp_list_choice(_id, value) + except KeyError: + await ctx.send("`Input a valid value from 0-3 only.`") + return + + update = update[1] + + for rp in update: + string_inform = ( + f"**You have updated {rp['rp_name']} ({rp['serial_code']}) by " + f"{change_dict[value]}!**" + ) + await ctx.send(string_inform) + return + + +# This is a string generator for RP Serial Codes. But it can be used for something more in the future. +def create_code() -> str: + lowercase_letter = string.ascii_lowercase + uppercase_letter = string.ascii_uppercase + digits = str(string.digits) + code = ''.join(random.sample(lowercase_letter + uppercase_letter + digits, 12)) + return code + + +# This is the initial check for rp_collection.csv and obtaining its data. +def rp_id_check() -> List: + rp_list = None + try: + with open("rp_collection.csv", "r+", newline="") as file: + reader = csv.DictReader(file) + rp_list = [] + for x in reader: + rp_list.append(x) + + return rp_list + + except FileNotFoundError: + with open("rp_collection.csv", "w") as file: + print("rp_collection.csv does not exist; the bot will now create one...") + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + return rp_list + + +def update_rp_list() -> List: + rp_list = rp_id_check() + updated_rp_list = list() + update_dict = None + + for user in rp_list: + answer = round(time.time()) - int(user["rp_start_date"]) + + if answer >= 0: + if user["ongoing"] == "1": + update_dict = user + + if user["ongoing"] == "0": + update_dict = { + "serial_code": user['serial_code'], + "approved": user['approved'], + "approved_id": user['approved_id'], + "local": user['local'], + "rp_name": user['rp_name'], + "main_host": user['main_host'], + "main_host_id": user['main_host_id'], + "rp_start_date": user['rp_start_date'], + "rp_start_time": user['rp_start_time'], + "rp_duration": user['rp_duration'], + "doc": user['doc'], + "sign_up": 0, + "ongoing": 1, + "ended": 0 + } + + else: + update_dict = user + + updated_rp_list.append(update_dict) + + with open("rp_collection.csv", "w+") as file: + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] + + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + for update in updated_rp_list: + writer.writerow(update) + + return updated_rp_list + + +def update_rp_list_choice(_id, val) -> Tuple[List, List]: + change_dict = { + 0: [0, 0, 0], # Close Sign Ups + 1: [1, 0, 0], # Open Sign Ups + 2: [0, 1, 0], # Ongoing RP + 3: [0, 0, 1] # Ends RP + } + + rp_list = rp_id_check() + updated_rp_list = list() + inform_update = list() + + for user in rp_list: + value_change = change_dict[val] + + if str(_id) == user["serial_code"]: + update_dict = { + "serial_code": user['serial_code'], + "approved": user['approved'], + "approved_id": user['approved_id'], + "local": user['local'], + "rp_name": user['rp_name'], + "main_host": user['main_host'], + "main_host_id": user['main_host_id'], + "rp_start_date": user['rp_start_date'], + "rp_start_time": user['rp_start_time'], + "rp_duration": user['rp_duration'], + "doc": user['doc'], + "sign_up": value_change[0], + "ongoing": value_change[1], + "ended": value_change[2] + } + + inform_update.append(update_dict) + + else: + update_dict = user + + updated_rp_list.append(update_dict) + + with open("rp_collection.csv", "w+") as file: + fieldnames = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] + + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + for update in updated_rp_list: + writer.writerow(update) + + return updated_rp_list, inform_update + + +def rp_profile_check(_id) -> List: + rp_list = rp_id_check() + found = [] + + for rp in rp_list: + if str(_id) == rp["serial_code"]: + found.append(rp) + + return found + + +def rp_profile_check_sign_up(val) -> List: + if val != 0: + return + + rp_list = rp_id_check() + found = [] + + for rp in rp_list: + if rp["sign_up"] == "1": + found.append(rp) + + return found + + +def rp_profile_check_ongoing(val) -> List: + if val != 1: + return + + rp_list = rp_id_check() + found = [] + + for rp in rp_list: + if rp["ongoing"] == "1": + found.append(rp) + + return found + + +def rp_profile_check_ended(val) -> List: + if val != 2: + return + + rp_list = rp_id_check() + found = [] + + for rp in rp_list: + if rp["ended"] == "1": + found.append(rp) + + return found + + +def rp_profile_check_all(val) -> List: + if val != 3: + return + + rp_list = rp_id_check() + found = [] + + for rp in rp_list: + found.append(rp) + + return found From c607deaceac408213e7b7d61d092b9c57651cefb Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 20:58:44 -0400 Subject: [PATCH 4/9] Separate roles functionality to separate file --- src/functionality.py | 43 +++++++++++++------------------------------ src/roleplays.py | 8 ++++---- src/roles.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 34 deletions(-) create mode 100644 src/roles.py diff --git a/src/functionality.py b/src/functionality.py index 0d6705c..3f1190d 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -6,6 +6,7 @@ from typing import Dict, List +from src import roles from src import roleplays @@ -538,7 +539,7 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, return await ctx.send("`Insufficient Privileges.`") await roleplays.command_add_roleplay( - bot, ctx, rp_name, main_host_id, rp_start_date, rp_duration, doc + bot, guild_details, ctx, rp_name, main_host_id, rp_start_date, rp_duration, doc ) @bot.command( @@ -554,7 +555,7 @@ async def rp_profile(ctx: commands.Context, _id: str): if not _validate_command(ctx): return - await roleplays.command_rp_profile(bot, ctx, _id) + await roleplays.command_rp_profile(bot, guild_details, ctx, _id) @bot.command( name='rp_profile_filter', @@ -572,8 +573,7 @@ async def rp_profile_filter(ctx: commands.Context, value="0"): if not _validate_command(ctx): return - await roleplays.command_rp_profile_filter(bot, ctx, value) - + await roleplays.command_rp_profile_filter(bot, guild_details, ctx, value) @bot.command( name='rp_change_status', @@ -595,7 +595,7 @@ async def rp_change_status(ctx: commands.Context, _id, value="0"): if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - await roleplays.command_rp_change_status(bot, ctx, _id, value=value) + await roleplays.command_rp_change_status(bot, guild_details, ctx, _id, value=value) @bot.command( name='ping', @@ -611,25 +611,6 @@ async def ping(ctx: commands.Context): await ctx.channel.send('Pong.') - async def _optin_role(ctx: commands.Context, role_name: str, role_id: int): - if not _validate_command(ctx): - return - role = discord.utils.get(ctx.message.guild.roles, name=role_name) - - user = ctx.author - has_role = False - - for user_role in user.roles: - if user_role.id == role_id: - has_role = True - - if has_role: - await user.remove_roles(role) - await ctx.send(f'Removed role **{role_name}**.') - else: - await user.add_roles(role) - await ctx.send(f'Added role **{role_name}**.') - @bot.command( name='rpactive', brief='Changes your RP Active status', @@ -639,9 +620,10 @@ async def _optin_role(ctx: commands.Context, role_name: str, role_id: int): '\nExample: $rpactive'), ) async def rpactive(ctx: commands.Context): - await _optin_role(ctx, - guild_details.rp_active_role_name, - guild_details.rp_active_role_id) + if not _validate_command(ctx): + return + + await roles.command_rpactive(bot, guild_details, ctx) @bot.command( name='devtester', @@ -652,9 +634,10 @@ async def rpactive(ctx: commands.Context): '\nExample: $devtester'), ) async def devtester(ctx: commands.Context): - await _optin_role(ctx, - guild_details.dev_tester_role_name, - guild_details.dev_tester_role_id) + if not _validate_command(ctx): + return + + await roles.command_devtester(bot, guild_details, ctx) @bot.command( name='timezone', diff --git a/src/roleplays.py b/src/roleplays.py index 1a27961..c947fdc 100644 --- a/src/roleplays.py +++ b/src/roleplays.py @@ -11,7 +11,7 @@ from discord.ext import commands -async def command_add_roleplay(bot, ctx: commands.Context, rp_name, main_host_id: int, +async def command_add_roleplay(bot, guild_details, ctx: commands.Context, rp_name, main_host_id: int, rp_start_date: int, rp_duration: int, doc, serial_code=None, local: int = 1, sign_up: int = 1, ongoing: int = 0, ended: int = 0): rp_list = rp_id_check() @@ -106,7 +106,7 @@ async def command_add_roleplay(bot, ctx: commands.Context, rp_name, main_host_id f'\n**First Session Date: {noted_time}**') -async def command_rp_profile(bot, ctx: commands.Context, _id): +async def command_rp_profile(bot, guild_details, ctx: commands.Context, _id): profile = rp_profile_check(_id) embed = None @@ -139,7 +139,7 @@ async def command_rp_profile(bot, ctx: commands.Context, _id): await ctx.send(embed=embed) -async def command_rp_profile_filter(bot, ctx: commands.Context, value): +async def command_rp_profile_filter(bot, guild_details, ctx: commands.Context, value): try: value = int(value) except ValueError: @@ -188,7 +188,7 @@ async def command_rp_profile_filter(bot, ctx: commands.Context, value): await ctx.send(embed=embed) -async def command_rp_change_status(bot, ctx: commands.Context, _id, value="0"): +async def command_rp_change_status(bot, guild_details, ctx: commands.Context, _id, value="0"): change_dict = { 0: "closing sign ups", 1: "opening sign ups", diff --git a/src/roles.py b/src/roles.py new file mode 100644 index 0000000..5803266 --- /dev/null +++ b/src/roles.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import discord +from discord.ext import commands + + +async def _optin_role(ctx: commands.Context, role_name: str, role_id: int): + role = discord.utils.get(ctx.message.guild.roles, name=role_name) + + user = ctx.author + has_role = False + + for user_role in user.roles: + if user_role.id == role_id: + has_role = True + + if has_role: + await user.remove_roles(role) + await ctx.send(f'Removed role **{role_name}**.') + else: + await user.add_roles(role) + await ctx.send(f'Added role **{role_name}**.') + + +async def command_rpactive(bot, guild_details, ctx): + await _optin_role(ctx, + guild_details.rp_active_role_name, + guild_details.rp_active_role_id) + + +async def command_devtester(bot, guild_details, ctx): + await _optin_role(ctx, + guild_details.dev_tester_role_name, + guild_details.dev_tester_role_id) From ab042b75ad7c1e6c4c9e1025ff9325794406f82c Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 21:48:36 -0400 Subject: [PATCH 5/9] Separate bans from functionality code --- src/bans.py | 294 +++++++++++++++++++++++++++++++++++++++++++ src/functionality.py | 288 ++---------------------------------------- src/roleplays.py | 2 +- 3 files changed, 308 insertions(+), 276 deletions(-) create mode 100644 src/bans.py diff --git a/src/bans.py b/src/bans.py new file mode 100644 index 0000000..5f96b05 --- /dev/null +++ b/src/bans.py @@ -0,0 +1,294 @@ +# -*- coding: utf-8 -*- +import csv +import time + +from typing import List + +import discord +from discord.ext import commands + + +async def command_ban_id(bot, guild_details, ctx: commands.Context, user_id: int, + ban_length: int = 259200, reason="Unstated Reason"): + ban_ids = ban_id_check() + + try: + target = await bot.fetch_user(user_id) + except discord.NotFound as p: + return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " + "server.**") + + for dict_ban in ban_ids: + if str(user_id) == str(dict_ban["discord_id"]) and not int(dict_ban["ended"]): + return await ctx.send(f"**{target.name} is already in the list and his ban has " + "not ended.**") + + with open("ban_ids.csv", "a") as file_: + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] + writer = csv.DictWriter(file_, fieldnames=fieldnames) + writer.writerow({ + "discord_id": user_id, + "discord_name": target.name, + "ban_timestamp": round(time.time()), + "ban_length": ban_length, + "reason": reason, + "ended": 0 + }) + + if ban_length < 86400: + days = second_to_hour(ban_length) + word_ = f"{days} hours" + + else: + days = second_to_day(ban_length) + word_ = f"{days} days" + + await ctx.channel.send(f'**{target.name} ({user_id}) is now banned from using the ' + f'Server Bot for {word_}.**' + f'\n`Reason: {reason}`') + await target.send(f'**You are now banned from using the Server Bot for {word_}**' + f'\n`Reason: {reason}`') + + +async def command_unban(bot, guild_details, ctx: commands.Context, _id: int): + initial_check = _browse_ban_profile(user_id=_id) + if not initial_check: + return await ctx.send("`Invalid Discord ID.`") + + updated_list = update_unban(_id) + updated_list = updated_list[1] + + for user in updated_list: + target = await bot.fetch_user(int(user['discord_id'])) + await target.send('**You are now unbanned from using the Server Bot. Please do not ' + 'commit the same offense again.**') + + return await ctx.send("**Updated! Those whose ban is revoked will be notified.**") + + +async def command_ban_profile(bot, guild_details, ctx: commands.Context, _id: int, _all=False): + profile = _browse_ban_profile(user_id=_id) + embed = None + + if not profile: + return await ctx.send("`That ID does not exist in the database.`") + + for user in profile: + date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) + description = ( + f'**Discord Name**: {user["discord_name"]}\n' + f'**Discord ID**: {user["discord_id"]}\n' + f'**Ban Date**: {date_}\n' + f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Reason**: {user["reason"]}\n' + f'**Ban Ended**: {user["ended"]}' + ) + embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', + description=description, + colour=discord.Color.dark_blue()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + if _all: + await ctx.send(embed=embed) + + if not _all: + await ctx.send(embed=embed) + + +async def command_ban_profile_all(bot, guild_details, ctx: commands.Context): + profile = _browse_ban_profile() + + if not profile: + return await ctx.send("`There are no bans in the Database.`") + + for user in profile: + date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) + description = ( + f'**Discord Name**: {user["discord_name"]}\n' + f'**Discord ID**: {user["discord_id"]}\n' + f'**Ban Date**: {date_}\n' + f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Reason**: {user["reason"]}\n' + f'**Ban Ended**: {user["ended"]}' + ) + embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', + description=description, + colour=discord.Color.dark_blue()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + await ctx.send(embed=embed) + + +async def command_ban_list_update(bot, guild_details, ctx: commands.Context): + updated_list = update_ban_list()[1] + + if updated_list: + for user in updated_list: + answer = int(user["ban_timestamp"]) + int(user["ban_length"]) + answer = round(time.time()) - answer + + if answer >= 0: + target = await bot.fetch_user(int(user['discord_id'])) + await target.send('**You are now unbanned from using the Server Bot. ' + 'Please do not make the same offense again.**') + + await ctx.send("**Updated! Those whose ban is over will be notified.**") + + +# This is the initial check for ban_ids.csv and obtaining its data. +def ban_id_check(): + ban_list = None + try: + with open("ban_ids.csv", "r+", newline="") as file: + reader = csv.DictReader(file) + ban_list = [] + for x in reader: + ban_list.append(x) + + return ban_list + + except FileNotFoundError: + with open("ban_ids.csv", "w") as file: + print("ban_ids.csv does not exist; the bot will now create one...") + fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + return ban_list + + +# Converts Seconds to Days +def second_to_day(second: int) -> int: + answer = second / 86400 + return round(answer) + + +# Converts Seconds to Hours +def second_to_hour(second: int) -> int: + answer = second / 3600 + return round(answer) + + +def _browse_ban_profile(user_id: int = None) -> List: + ban_list = ban_id_check() + found = [] + + for user in ban_list: + if user_id is None or user['discord_id'] == str(user_id): + found.append(user) + + return found + + +def update_unban(_id): + ban_ids = ban_id_check() + updated_ban_list = list() + inform_ban_list = list() # This is to inform players when their ban is over. + + for user in ban_ids: + if str(_id) == user["discord_id"]: + update_dict = { + "discord_id": user["discord_id"], + "discord_name": user["discord_name"], + "ban_timestamp": user["ban_timestamp"], + "ban_length": user["ban_length"], + "reason": user["reason"], + "ended": 1 + } + inform_ban_list.append(update_dict) + + else: + update_dict = user + + updated_ban_list.append(update_dict) + + with open("ban_ids.csv", "w+") as file: + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + for update in updated_ban_list: + writer.writerow(update) + + return updated_ban_list, inform_ban_list + + +def update_ban_list(): + ban_ids = ban_id_check() + updated_ban_list = list() + inform_ban_list = list() # This is to inform players when their ban is over. + update_dict = None + + for user in ban_ids: + answer = int(user["ban_timestamp"]) + int(user["ban_length"]) + answer = round(time.time()) - answer + + if answer >= 0: + if user["ended"] == "1": + update_dict = user + + if user["ended"] == "0": + update_dict = { + "discord_id": user["discord_id"], + "discord_name": user["discord_name"], + "ban_timestamp": user["ban_timestamp"], + "ban_length": user["ban_length"], + "reason": user["reason"], + "ended": 1 + } + + inform_ban_list.append(update_dict) + + else: + update_dict = user + + updated_ban_list.append(update_dict) + + with open("ban_ids.csv", "w+") as file: + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + + for update in updated_ban_list: + writer.writerow(update) + + return updated_ban_list, inform_ban_list + + +async def inform_update_list(bot): + updated_list = update_ban_list()[1] + + if updated_list: + for user in updated_list: + answer = int(user["ban_timestamp"]) + int(user["ban_length"]) + answer = round(time.time()) - answer + + if answer >= 0: + target = await bot.fetch_user(int(user['discord_id'])) + await target.send("**You are now unbanned from using the Server Bot. " + "Please do not commit the same offense again.**") + + return diff --git a/src/functionality.py b/src/functionality.py index 3f1190d..84068ed 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -6,48 +6,16 @@ from typing import Dict, List +from src import bans from src import roles from src import roleplays - -# This is the initial check for ban_ids.csv and obtaining its data. -def ban_id_check(): - ban_list = None - try: - with open("ban_ids.csv", "r+", newline="") as file: - reader = csv.DictReader(file) - ban_list = [] - for x in reader: - ban_list.append(x) - - return ban_list - - except FileNotFoundError: - with open("ban_ids.csv", "w") as file: - print("ban_ids.csv does not exist; the bot will now create one...") - fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - return ban_list - - -# Converts Seconds to Days -def second_to_day(second: int) -> int: - answer = second / 86400 - return round(answer) - -# Converts Seconds to Hours -def second_to_hour(second: int) -> int: - answer = second / 3600 - return round(answer) - class Functionality: def __init__(self, bot: commands.Bot = None, guild_details: Dict = None): @bot.event async def on_ready(): # This is to start the checks and if said file does not exist, will create one. - ban_id_check() + bans.ban_id_check() roleplays.rp_id_check() # This is to begin the task loop. @@ -58,7 +26,7 @@ async def on_ready(): @tasks.loop(seconds=600.0) async def second_passing(): roleplays.update_rp_list() - await inform_update_list() + await bans.inform_update_list(bot) # This will execute before the function will run. @second_passing.before_loop @@ -78,7 +46,7 @@ async def on_command_error(ctx, error): @bot.event async def on_message(message): # Looks through the ban_id.csv - ban_id_ = ban_id_check() + ban_id_ = bans.ban_id_check() if message.author == bot.user: return @@ -162,119 +130,6 @@ def _check_bot_admin(ctx) -> bool: return False - def _browse_ban_profile(user_id: int = None) -> List: - ban_list = ban_id_check() - found = [] - - for user in ban_list: - if user_id is None or user['discord_id'] == str(user_id): - found.append(user) - - return found - - def update_unban(_id): - ban_ids = ban_id_check() - updated_ban_list = list() - inform_ban_list = list() # This is to inform players when their ban is over. - - for user in ban_ids: - if str(_id) == user["discord_id"]: - update_dict = { - "discord_id": user["discord_id"], - "discord_name": user["discord_name"], - "ban_timestamp": user["ban_timestamp"], - "ban_length": user["ban_length"], - "reason": user["reason"], - "ended": 1 - } - inform_ban_list.append(update_dict) - - else: - update_dict = user - - updated_ban_list.append(update_dict) - - with open("ban_ids.csv", "w+") as file: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_ban_list: - writer.writerow(update) - - return updated_ban_list, inform_ban_list - - def update_ban_list(): - ban_ids = ban_id_check() - updated_ban_list = list() - inform_ban_list = list() # This is to inform players when their ban is over. - update_dict = None - - for user in ban_ids: - answer = int(user["ban_timestamp"]) + int(user["ban_length"]) - answer = round(time.time()) - answer - - if answer >= 0: - if user["ended"] == "1": - update_dict = user - - if user["ended"] == "0": - update_dict = { - "discord_id": user["discord_id"], - "discord_name": user["discord_name"], - "ban_timestamp": user["ban_timestamp"], - "ban_length": user["ban_length"], - "reason": user["reason"], - "ended": 1 - } - - inform_ban_list.append(update_dict) - - else: - update_dict = user - - updated_ban_list.append(update_dict) - - with open("ban_ids.csv", "w+") as file: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_ban_list: - writer.writerow(update) - - return updated_ban_list, inform_ban_list - - async def inform_update_list(): - updated_list = update_ban_list()[1] - - if updated_list: - for user in updated_list: - answer = int(user["ban_timestamp"]) + int(user["ban_length"]) - answer = round(time.time()) - answer - - if answer >= 0: - target = await bot.fetch_user(int(user['discord_id'])) - await target.send("**You are now unbanned from using the Server Bot. " - "Please do not commit the same offense again.**") - - return - - def timezone_time_check(hour_change: int, inc_time: int): hour_change = hour_change + inc_time @@ -312,51 +167,8 @@ async def ban_id(ctx: commands.Context, user_id: int, ban_length: int = 259200, if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - ban_ids = ban_id_check() - - try: - target = await bot.fetch_user(user_id) - except discord.NotFound as p: - return await ctx.send(f"`{p}`\n**Please input a valid Discord ID that is in the " - "server.**") - - for dict_ban in ban_ids: - if str(user_id) == str(dict_ban["discord_id"]) and not int(dict_ban["ended"]): - return await ctx.send(f"**{target.name} is already in the list and his ban has " - "not ended.**") - - with open("ban_ids.csv", "a") as file_: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file_, fieldnames=fieldnames) - writer.writerow({ - "discord_id": user_id, - "discord_name": target.name, - "ban_timestamp": round(time.time()), - "ban_length": ban_length, - "reason": reason, - "ended": 0 - }) - - if ban_length < 86400: - days = second_to_hour(ban_length) - word_ = f"{days} hours" - - else: - days = second_to_day(ban_length) - word_ = f"{days} days" - - await ctx.channel.send(f'**{target.name} ({user_id}) is now banned from using the ' - f'Server Bot for {word_}.**' - f'\n`Reason: {reason}`') - await target.send(f'**You are now banned from using the Server Bot for {word_}**' - f'\n`Reason: {reason}`') + await bans.command_ban_id(bot, guild_details, ctx, user_id, ban_length=ban_length, + reason=reason) @bot.command( name='unban', @@ -373,19 +185,7 @@ async def unban(ctx: commands.Context, _id: int): if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - initial_check = _browse_ban_profile(user_id=_id) - if not initial_check: - return await ctx.send("`Invalid Discord ID.`") - - updated_list = update_unban(_id) - updated_list = updated_list[1] - - for user in updated_list: - target = await bot.fetch_user(int(user['discord_id'])) - await target.send('**You are now unbanned from using the Server Bot. Please do not ' - 'commit the same offense again.**') - - return await ctx.send("**Updated! Those whose ban is revoked will be notified.**") + await bans.unban(bot, guild_details, ctx, _id) @bot.command( name='ban_profile', @@ -405,35 +205,7 @@ async def ban_profile(ctx: commands.Context, _id: int, _all=False): if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - profile = _browse_ban_profile(user_id=_id) - embed = None - - if not profile: - return await ctx.send("`That ID does not exist in the database.`") - - for user in profile: - date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) - description = ( - f'**Discord Name**: {user["discord_name"]}\n' - f'**Discord ID**: {user["discord_id"]}\n' - f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' - f'**Reason**: {user["reason"]}\n' - f'**Ban Ended**: {user["ended"]}' - ) - embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', - description=description, - colour=discord.Color.dark_blue()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - if _all: - await ctx.send(embed=embed) - - if not _all: - await ctx.send(embed=embed) - - return + await bans.command_ban_profile(bot, guild_details, _id, _all=_all) @bot.command( name='ban_profile_all', @@ -450,30 +222,7 @@ async def ban_profile_all(ctx: commands.Context): if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - profile = _browse_ban_profile() - - if not profile: - return await ctx.send("`There are no bans in the Database.`") - - for user in profile: - date_ = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(user["ban_timestamp"]))) - description = ( - f'**Discord Name**: {user["discord_name"]}\n' - f'**Discord ID**: {user["discord_id"]}\n' - f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' - f'**Reason**: {user["reason"]}\n' - f'**Ban Ended**: {user["ended"]}' - ) - embed = discord.Embed(title=f'Ban Profile : {user["discord_name"]}', - description=description, - colour=discord.Color.dark_blue()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - await ctx.send(embed=embed) - - return + await bans.command_ban_profile_all(bot, guild_details, ctx) @bot.command( name='ban_list_update', @@ -489,19 +238,7 @@ async def ban_list_update(ctx: commands.Context): if not _check_bot_admin(ctx): return await ctx.send("`Insufficient Privileges.`") - updated_list = update_ban_list()[1] - - if updated_list: - for user in updated_list: - answer = int(user["ban_timestamp"]) + int(user["ban_length"]) - answer = round(time.time()) - answer - - if answer >= 0: - target = await bot.fetch_user(int(user['discord_id'])) - await target.send('**You are now unbanned from using the Server Bot. ' - 'Please do not make the same offense again.**') - - await ctx.send("**Updated! Those whose ban is over will be notified.**") + await bans.command_ban_list_update(bot, guild_details, ctx) @bot.command( name='add_roleplay', @@ -539,7 +276,8 @@ async def add_roleplay(ctx: commands.Context, rp_name, main_host_id: int, return await ctx.send("`Insufficient Privileges.`") await roleplays.command_add_roleplay( - bot, guild_details, ctx, rp_name, main_host_id, rp_start_date, rp_duration, doc + bot, guild_details, ctx, rp_name, main_host_id, rp_start_date, rp_duration, doc, + serial_code=serial_code, local=local, sign_up=sign_up, ongoing=ongoing, ended=ended, ) @bot.command( @@ -573,7 +311,7 @@ async def rp_profile_filter(ctx: commands.Context, value="0"): if not _validate_command(ctx): return - await roleplays.command_rp_profile_filter(bot, guild_details, ctx, value) + await roleplays.command_rp_profile_filter(bot, guild_details, ctx, value=value) @bot.command( name='rp_change_status', diff --git a/src/roleplays.py b/src/roleplays.py index c947fdc..92ef297 100644 --- a/src/roleplays.py +++ b/src/roleplays.py @@ -139,7 +139,7 @@ async def command_rp_profile(bot, guild_details, ctx: commands.Context, _id): await ctx.send(embed=embed) -async def command_rp_profile_filter(bot, guild_details, ctx: commands.Context, value): +async def command_rp_profile_filter(bot, guild_details, ctx: commands.Context, value="0"): try: value = int(value) except ValueError: From 5b04b5c2d4945511a4b3392a909165b0b1e515d3 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 21:57:32 -0400 Subject: [PATCH 6/9] Separate times from functionality code --- src/functionality.py | 137 +++---------------------------------------- src/times.py | 137 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 129 deletions(-) create mode 100644 src/times.py diff --git a/src/functionality.py b/src/functionality.py index 84068ed..9f5906d 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -1,14 +1,13 @@ # -*- coding: utf-8 -*- import discord -import time -import csv from discord.ext import commands, tasks -from typing import Dict, List +from typing import Dict from src import bans from src import roles from src import roleplays +from src import times class Functionality: def __init__(self, bot: commands.Bot = None, guild_details: Dict = None): @@ -130,26 +129,7 @@ def _check_bot_admin(ctx) -> bool: return False - def timezone_time_check(hour_change: int, inc_time: int): - hour_change = hour_change + inc_time - - if hour_change > 23: - hour_change = hour_change - 24 - if len(str(hour_change)) < 2: - hour_change = f"0{hour_change}" - - elif hour_change < 0: - hour_change = hour_change + 24 - if len(str(hour_change)) < 2: - hour_change = f"0{hour_change}" - - if len(str(hour_change)) < 2: - hour_change = f"0{hour_change}" - - return hour_change - # -- Commands Area -- # - @bot.command( name='ban_id', brief='Bans a Discord User from using the Bot.', @@ -383,70 +363,17 @@ async def devtester(ctx: commands.Context): help=('Lists the time based on timezones. Date format: YYYY-MM-DD.' '\nArguments: $timezone ' '\nExample: $timezone HN' - '\n\n is an optional field; it is an epoch/unix second argument. It can also be an RP Serial Code.' - "\nInputting as an RP Serial Code will return you said RP's First Session Date." + '\n\n is an optional field; it is an epoch/unix second argument. It can ' + 'also be an RP Serial Code.' + "\nInputting as an RP Serial Code will return you said RP's First Session " + "Date." ) ) async def timezone(ctx: commands.Context, seconds=None): if not _validate_command(ctx): return - irl_time = None - title_embed = None - - if not seconds: - seconds = round(time.time()) - - try: - seconds = int(seconds) - try: - irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds)) - except OSError: - return await ctx.send("`Integer cannot be over 2,147,483,647`") - - title_embed = f'It is {irl_time} UTC' - except ValueError: - rp_list = roleplays.rp_id_check() - for rp in rp_list: - if seconds == rp["serial_code"]: - irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) - title_embed = f'RP Hosted at {irl_time} UTC' - - if not irl_time: - return await ctx.send("`Invalid RP Serial Code`") - - other_timezones = str(irl_time) - other_timezones = other_timezones.split(" ") - hour_split = other_timezones[1].split(":") - hour_change = int(hour_split[0]) - text_ = [ - "**-- Standard Time --**", - f"**EST**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", - f"**CST**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", - f"**MST**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", - f"**PST**: {timezone_time_check(hour_change, -8)}:{hour_split[1]}:{hour_split[2]}", - "**-- Daylight Time --**", - f"**EDT**: {timezone_time_check(hour_change, -4)}:{hour_split[1]}:{hour_split[2]}", - f"**CDT**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", - f"**MDT**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", - f"**PDT**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", - "**-- Europe Time --**", - f"**UTC-1**: {timezone_time_check(hour_change, -1)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC**: {timezone_time_check(hour_change, 0)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+1**: {timezone_time_check(hour_change, 1)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+2**: {timezone_time_check(hour_change, 2)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+3**: {timezone_time_check(hour_change, 3)}:{hour_split[1]}:{hour_split[2]}", - ] - - text_ = "\n".join(text_) - - embed = discord.Embed(title=title_embed, - description=f'{text_}', - colour=discord.Color.dark_gold()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - await ctx.send(embed=embed) + await times.command_timezone(bot, guild_details, ctx, seconds=seconds) @bot.command( name='utc', @@ -463,55 +390,7 @@ async def utc(ctx: commands.Context, seconds=None): if not _validate_command(ctx): return - if not seconds: - seconds = round(time.time()) - - try: - seconds = int(seconds) - try: - irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds)) - except OSError: - return await ctx.send("`Integer cannot be over 2,147,483,647`") - except ValueError: - return await ctx.send("`You can only input integers as an argument.`") - - other_timezones = str(irl_time) - other_timezones = other_timezones.split(" ") - hour_split = other_timezones[1].split(":") - text_ = [] - - for i in range(1, 13): - hour_change = int(hour_split[0]) + i - hour_change_negative = hour_change - (i * 2) - - if hour_change > 23: - hour_change = (int(hour_split[0]) + i) - 24 - if len(str(hour_change)) < 2: - hour_change = f"0{hour_change}" - - if len(str(hour_change)) < 2: - hour_change = f"0{hour_change}" - - if hour_change_negative < 0: - hour_change_negative = hour_change_negative + 24 - if len(str(hour_change_negative)) < 2: - hour_change_negative = f"0{hour_change_negative}" - - if len(str(hour_change_negative)) < 2: - hour_change_negative = f"0{hour_change_negative}" - - other_tz = f"**UTC+{i} | -{i}**: {hour_change}:{hour_split[1]}:{hour_split[2]} | {hour_change_negative}:{hour_split[1]}:{hour_split[2]}" - text_.append(other_tz) - - text_ = "\n".join(text_) - - embed = discord.Embed(title=f'It is {irl_time} UTC', - description=f'{text_}', - colour=discord.Color.dark_blue()) - embed.set_thumbnail(url=ctx.author.avatar_url) - embed.set_footer(text=ctx.author) - - await ctx.send(embed=embed) + await times.command_utc(bot, guild_details, ctx, seconds=seconds) # -- Command Error Area -- # diff --git a/src/times.py b/src/times.py new file mode 100644 index 0000000..4550283 --- /dev/null +++ b/src/times.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- + +import time + +import discord +from discord.ext import commands + +from src import roleplays + + +async def command_timezone(bot, guild_details, ctx: commands.Context, seconds=None): + irl_time = None + title_embed = None + + if not seconds: + seconds = round(time.time()) + + try: + seconds = int(seconds) + try: + irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds)) + except OSError: + return await ctx.send("`Integer cannot be over 2,147,483,647`") + + title_embed = f'It is {irl_time} UTC' + except ValueError: + rp_list = roleplays.rp_id_check() + for rp in rp_list: + if seconds == rp["serial_code"]: + irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(rp["rp_start_date"]))) + title_embed = f'RP Hosted at {irl_time} UTC' + + if not irl_time: + return await ctx.send("`Invalid RP Serial Code`") + + other_timezones = str(irl_time) + other_timezones = other_timezones.split(" ") + hour_split = other_timezones[1].split(":") + hour_change = int(hour_split[0]) + text_ = [ + "**-- Standard Time --**", + f"**EST**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", + f"**CST**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", + f"**MST**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", + f"**PST**: {timezone_time_check(hour_change, -8)}:{hour_split[1]}:{hour_split[2]}", + "**-- Daylight Time --**", + f"**EDT**: {timezone_time_check(hour_change, -4)}:{hour_split[1]}:{hour_split[2]}", + f"**CDT**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", + f"**MDT**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", + f"**PDT**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", + "**-- Europe Time --**", + f"**UTC-1**: {timezone_time_check(hour_change, -1)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC**: {timezone_time_check(hour_change, 0)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+1**: {timezone_time_check(hour_change, 1)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+2**: {timezone_time_check(hour_change, 2)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+3**: {timezone_time_check(hour_change, 3)}:{hour_split[1]}:{hour_split[2]}", + ] + + text_ = "\n".join(text_) + + embed = discord.Embed(title=title_embed, + description=f'{text_}', + colour=discord.Color.dark_gold()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + await ctx.send(embed=embed) + + +async def commands_utc(bot, guild_details, ctx: commands.Context, seconds=None): + if not seconds: + seconds = round(time.time()) + + try: + seconds = int(seconds) + try: + irl_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds)) + except OSError: + return await ctx.send("`Integer cannot be over 2,147,483,647`") + except ValueError: + return await ctx.send("`You can only input integers as an argument.`") + + other_timezones = str(irl_time) + other_timezones = other_timezones.split(" ") + hour_split = other_timezones[1].split(":") + text_ = [] + + for i in range(1, 13): + hour_change = int(hour_split[0]) + i + hour_change_negative = hour_change - (i * 2) + + if hour_change > 23: + hour_change = (int(hour_split[0]) + i) - 24 + if len(str(hour_change)) < 2: + hour_change = f"0{hour_change}" + + if len(str(hour_change)) < 2: + hour_change = f"0{hour_change}" + + if hour_change_negative < 0: + hour_change_negative = hour_change_negative + 24 + if len(str(hour_change_negative)) < 2: + hour_change_negative = f"0{hour_change_negative}" + + if len(str(hour_change_negative)) < 2: + hour_change_negative = f"0{hour_change_negative}" + + other_tz = f"**UTC+{i} | -{i}**: {hour_change}:{hour_split[1]}:{hour_split[2]} | {hour_change_negative}:{hour_split[1]}:{hour_split[2]}" + text_.append(other_tz) + + text_ = "\n".join(text_) + + embed = discord.Embed(title=f'It is {irl_time} UTC', + description=f'{text_}', + colour=discord.Color.dark_blue()) + embed.set_thumbnail(url=ctx.author.avatar_url) + embed.set_footer(text=ctx.author) + + await ctx.send(embed=embed) + +def timezone_time_check(hour_change: int, inc_time: int) -> int: + hour_change = hour_change + inc_time + + if hour_change > 23: + hour_change = hour_change - 24 + if len(str(hour_change)) < 2: + hour_change = f"0{hour_change}" + + elif hour_change < 0: + hour_change = hour_change + 24 + if len(str(hour_change)) < 2: + hour_change = f"0{hour_change}" + + if len(str(hour_change)) < 2: + hour_change = f"0{hour_change}" + + return hour_change From 86c33c691b84b8b04f63fc1221d8c5ecd937b798 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 22:04:43 -0400 Subject: [PATCH 7/9] Mark private-like functions as such --- src/bans.py | 22 +++++++++++----------- src/roleplays.py | 30 +++++++++++++++--------------- src/roles.py | 24 ++++++++++++------------ src/times.py | 30 +++++++++++++++--------------- 4 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/bans.py b/src/bans.py index 5f96b05..f5d0290 100644 --- a/src/bans.py +++ b/src/bans.py @@ -43,11 +43,11 @@ async def command_ban_id(bot, guild_details, ctx: commands.Context, user_id: int }) if ban_length < 86400: - days = second_to_hour(ban_length) + days = _second_to_hour(ban_length) word_ = f"{days} hours" else: - days = second_to_day(ban_length) + days = _second_to_day(ban_length) word_ = f"{days} days" await ctx.channel.send(f'**{target.name} ({user_id}) is now banned from using the ' @@ -62,7 +62,7 @@ async def command_unban(bot, guild_details, ctx: commands.Context, _id: int): if not initial_check: return await ctx.send("`Invalid Discord ID.`") - updated_list = update_unban(_id) + updated_list = _update_unban(_id) updated_list = updated_list[1] for user in updated_list: @@ -86,7 +86,7 @@ async def command_ban_profile(bot, guild_details, ctx: commands.Context, _id: in f'**Discord Name**: {user["discord_name"]}\n' f'**Discord ID**: {user["discord_id"]}\n' f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Ban Length**: {_second_to_day(int(user["ban_length"]))}\n' f'**Reason**: {user["reason"]}\n' f'**Ban Ended**: {user["ended"]}' ) @@ -115,7 +115,7 @@ async def command_ban_profile_all(bot, guild_details, ctx: commands.Context): f'**Discord Name**: {user["discord_name"]}\n' f'**Discord ID**: {user["discord_id"]}\n' f'**Ban Date**: {date_}\n' - f'**Ban Length**: {second_to_day(int(user["ban_length"]))}\n' + f'**Ban Length**: {_second_to_day(int(user["ban_length"]))}\n' f'**Reason**: {user["reason"]}\n' f'**Ban Ended**: {user["ended"]}' ) @@ -129,7 +129,7 @@ async def command_ban_profile_all(bot, guild_details, ctx: commands.Context): async def command_ban_list_update(bot, guild_details, ctx: commands.Context): - updated_list = update_ban_list()[1] + updated_list = _update_ban_list()[1] if updated_list: for user in updated_list: @@ -167,13 +167,13 @@ def ban_id_check(): # Converts Seconds to Days -def second_to_day(second: int) -> int: +def _second_to_day(second: int) -> int: answer = second / 86400 return round(answer) # Converts Seconds to Hours -def second_to_hour(second: int) -> int: +def _second_to_hour(second: int) -> int: answer = second / 3600 return round(answer) @@ -189,7 +189,7 @@ def _browse_ban_profile(user_id: int = None) -> List: return found -def update_unban(_id): +def _update_unban(_id): ban_ids = ban_id_check() updated_ban_list = list() inform_ban_list = list() # This is to inform players when their ban is over. @@ -229,7 +229,7 @@ def update_unban(_id): return updated_ban_list, inform_ban_list -def update_ban_list(): +def _update_ban_list(): ban_ids = ban_id_check() updated_ban_list = list() inform_ban_list = list() # This is to inform players when their ban is over. @@ -279,7 +279,7 @@ def update_ban_list(): async def inform_update_list(bot): - updated_list = update_ban_list()[1] + updated_list = _update_ban_list()[1] if updated_list: for user in updated_list: diff --git a/src/roleplays.py b/src/roleplays.py index 92ef297..ca0338f 100644 --- a/src/roleplays.py +++ b/src/roleplays.py @@ -27,7 +27,7 @@ async def command_add_roleplay(bot, guild_details, ctx: commands.Context, rp_nam return await ctx.send(f"`Please input a valid integer. {arg} is not valid.`") if not serial_code: - serial_code = create_code() + serial_code = _create_code() for rp in rp_list: local_note = "in DRO servers" @@ -107,7 +107,7 @@ async def command_add_roleplay(bot, guild_details, ctx: commands.Context, rp_nam async def command_rp_profile(bot, guild_details, ctx: commands.Context, _id): - profile = rp_profile_check(_id) + profile = _rp_profile_check(_id) embed = None bool_string = {"0": False, "1": True} @@ -147,10 +147,10 @@ async def command_rp_profile_filter(bot, guild_details, ctx: commands.Context, v return rp_dict = { - 0: rp_profile_check_sign_up(value), - 1: rp_profile_check_ongoing(value), - 2: rp_profile_check_ended(value), - 3: rp_profile_check_all(value) + 0: _rp_profile_check_sign_up(value), + 1: _rp_profile_check_ongoing(value), + 2: _rp_profile_check_ended(value), + 3: _rp_profile_check_all(value) } bool_string = {"0": False, "1": True} @@ -204,14 +204,14 @@ async def command_rp_change_status(bot, guild_details, ctx: commands.Context, _i string_inform = None - initial_check = rp_profile_check(_id) + initial_check = _rp_profile_check(_id) if not initial_check: await ctx.send('`Invalid RP Serial Code, cannot update.`') return try: - update = update_rp_list_choice(_id, value) + update = _update_rp_list_choice(_id, value) except KeyError: await ctx.send("`Input a valid value from 0-3 only.`") return @@ -228,7 +228,7 @@ async def command_rp_change_status(bot, guild_details, ctx: commands.Context, _i # This is a string generator for RP Serial Codes. But it can be used for something more in the future. -def create_code() -> str: +def _create_code() -> str: lowercase_letter = string.ascii_lowercase uppercase_letter = string.ascii_uppercase digits = str(string.digits) @@ -335,7 +335,7 @@ def update_rp_list() -> List: return updated_rp_list -def update_rp_list_choice(_id, val) -> Tuple[List, List]: +def _update_rp_list_choice(_id, val) -> Tuple[List, List]: change_dict = { 0: [0, 0, 0], # Close Sign Ups 1: [1, 0, 0], # Open Sign Ups @@ -402,7 +402,7 @@ def update_rp_list_choice(_id, val) -> Tuple[List, List]: return updated_rp_list, inform_update -def rp_profile_check(_id) -> List: +def _rp_profile_check(_id) -> List: rp_list = rp_id_check() found = [] @@ -413,7 +413,7 @@ def rp_profile_check(_id) -> List: return found -def rp_profile_check_sign_up(val) -> List: +def _rp_profile_check_sign_up(val) -> List: if val != 0: return @@ -427,7 +427,7 @@ def rp_profile_check_sign_up(val) -> List: return found -def rp_profile_check_ongoing(val) -> List: +def _rp_profile_check_ongoing(val) -> List: if val != 1: return @@ -441,7 +441,7 @@ def rp_profile_check_ongoing(val) -> List: return found -def rp_profile_check_ended(val) -> List: +def _rp_profile_check_ended(val) -> List: if val != 2: return @@ -455,7 +455,7 @@ def rp_profile_check_ended(val) -> List: return found -def rp_profile_check_all(val) -> List: +def _rp_profile_check_all(val) -> List: if val != 3: return diff --git a/src/roles.py b/src/roles.py index 5803266..77894f0 100644 --- a/src/roles.py +++ b/src/roles.py @@ -4,6 +4,18 @@ from discord.ext import commands +async def command_rpactive(bot, guild_details, ctx): + await _optin_role(ctx, + guild_details.rp_active_role_name, + guild_details.rp_active_role_id) + + +async def command_devtester(bot, guild_details, ctx): + await _optin_role(ctx, + guild_details.dev_tester_role_name, + guild_details.dev_tester_role_id) + + async def _optin_role(ctx: commands.Context, role_name: str, role_id: int): role = discord.utils.get(ctx.message.guild.roles, name=role_name) @@ -20,15 +32,3 @@ async def _optin_role(ctx: commands.Context, role_name: str, role_id: int): else: await user.add_roles(role) await ctx.send(f'Added role **{role_name}**.') - - -async def command_rpactive(bot, guild_details, ctx): - await _optin_role(ctx, - guild_details.rp_active_role_name, - guild_details.rp_active_role_id) - - -async def command_devtester(bot, guild_details, ctx): - await _optin_role(ctx, - guild_details.dev_tester_role_name, - guild_details.dev_tester_role_id) diff --git a/src/times.py b/src/times.py index 4550283..46413e1 100644 --- a/src/times.py +++ b/src/times.py @@ -39,21 +39,21 @@ async def command_timezone(bot, guild_details, ctx: commands.Context, seconds=No hour_change = int(hour_split[0]) text_ = [ "**-- Standard Time --**", - f"**EST**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", - f"**CST**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", - f"**MST**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", - f"**PST**: {timezone_time_check(hour_change, -8)}:{hour_split[1]}:{hour_split[2]}", + f"**EST**: {_timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", + f"**CST**: {_timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", + f"**MST**: {_timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", + f"**PST**: {_timezone_time_check(hour_change, -8)}:{hour_split[1]}:{hour_split[2]}", "**-- Daylight Time --**", - f"**EDT**: {timezone_time_check(hour_change, -4)}:{hour_split[1]}:{hour_split[2]}", - f"**CDT**: {timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", - f"**MDT**: {timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", - f"**PDT**: {timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", + f"**EDT**: {_timezone_time_check(hour_change, -4)}:{hour_split[1]}:{hour_split[2]}", + f"**CDT**: {_timezone_time_check(hour_change, -5)}:{hour_split[1]}:{hour_split[2]}", + f"**MDT**: {_timezone_time_check(hour_change, -6)}:{hour_split[1]}:{hour_split[2]}", + f"**PDT**: {_timezone_time_check(hour_change, -7)}:{hour_split[1]}:{hour_split[2]}", "**-- Europe Time --**", - f"**UTC-1**: {timezone_time_check(hour_change, -1)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC**: {timezone_time_check(hour_change, 0)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+1**: {timezone_time_check(hour_change, 1)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+2**: {timezone_time_check(hour_change, 2)}:{hour_split[1]}:{hour_split[2]}", - f"**UTC+3**: {timezone_time_check(hour_change, 3)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC-1**: {_timezone_time_check(hour_change, -1)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC**: {_timezone_time_check(hour_change, 0)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+1**: {_timezone_time_check(hour_change, 1)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+2**: {_timezone_time_check(hour_change, 2)}:{hour_split[1]}:{hour_split[2]}", + f"**UTC+3**: {_timezone_time_check(hour_change, 3)}:{hour_split[1]}:{hour_split[2]}", ] text_ = "\n".join(text_) @@ -67,7 +67,7 @@ async def command_timezone(bot, guild_details, ctx: commands.Context, seconds=No await ctx.send(embed=embed) -async def commands_utc(bot, guild_details, ctx: commands.Context, seconds=None): +async def command_utc(bot, guild_details, ctx: commands.Context, seconds=None): if not seconds: seconds = round(time.time()) @@ -118,7 +118,7 @@ async def commands_utc(bot, guild_details, ctx: commands.Context, seconds=None): await ctx.send(embed=embed) -def timezone_time_check(hour_change: int, inc_time: int) -> int: +def _timezone_time_check(hour_change: int, inc_time: int) -> int: hour_change = hour_change + inc_time if hour_change > 23: From 919ac18170a2ce653ed2b1840bed008cd7e6fb09 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 22:13:47 -0400 Subject: [PATCH 8/9] Rename methods --- src/bans.py | 19 +++++++++++++------ src/functionality.py | 10 +++++----- src/roleplays.py | 18 +++++++++--------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/bans.py b/src/bans.py index f5d0290..5031cad 100644 --- a/src/bans.py +++ b/src/bans.py @@ -10,7 +10,7 @@ async def command_ban_id(bot, guild_details, ctx: commands.Context, user_id: int, ban_length: int = 259200, reason="Unstated Reason"): - ban_ids = ban_id_check() + ban_ids = fetch_ban_ids() try: target = await bot.fetch_user(user_id) @@ -145,7 +145,7 @@ async def command_ban_list_update(bot, guild_details, ctx: commands.Context): # This is the initial check for ban_ids.csv and obtaining its data. -def ban_id_check(): +def fetch_ban_ids(): ban_list = None try: with open("ban_ids.csv", "r+", newline="") as file: @@ -159,7 +159,14 @@ def ban_id_check(): except FileNotFoundError: with open("ban_ids.csv", "w") as file: print("ban_ids.csv does not exist; the bot will now create one...") - fieldnames = ["discord_id", "discord_name", "ban_timestamp", "ban_length", "reason", "ended"] + fieldnames = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() @@ -179,7 +186,7 @@ def _second_to_hour(second: int) -> int: def _browse_ban_profile(user_id: int = None) -> List: - ban_list = ban_id_check() + ban_list = fetch_ban_ids() found = [] for user in ban_list: @@ -190,7 +197,7 @@ def _browse_ban_profile(user_id: int = None) -> List: def _update_unban(_id): - ban_ids = ban_id_check() + ban_ids = fetch_ban_ids() updated_ban_list = list() inform_ban_list = list() # This is to inform players when their ban is over. @@ -230,7 +237,7 @@ def _update_unban(_id): def _update_ban_list(): - ban_ids = ban_id_check() + ban_ids = fetch_ban_ids() updated_ban_list = list() inform_ban_list = list() # This is to inform players when their ban is over. update_dict = None diff --git a/src/functionality.py b/src/functionality.py index 9f5906d..a2cfb5d 100644 --- a/src/functionality.py +++ b/src/functionality.py @@ -14,8 +14,8 @@ def __init__(self, bot: commands.Bot = None, guild_details: Dict = None): @bot.event async def on_ready(): # This is to start the checks and if said file does not exist, will create one. - bans.ban_id_check() - roleplays.rp_id_check() + bans.fetch_ban_ids() + roleplays.fetch_rps() # This is to begin the task loop. second_passing.start() @@ -44,13 +44,13 @@ async def on_command_error(ctx, error): @bot.event async def on_message(message): - # Looks through the ban_id.csv - ban_id_ = bans.ban_id_check() - if message.author == bot.user: return if message.channel.name in guild_details.relaying_channels: + # Looks through the ban_id.csv + ban_id_ = bans.fetch_ban_ids() + # Right here, it will delete the message and notify the user who tried using the # bot that they are banned. for user in ban_id_: diff --git a/src/roleplays.py b/src/roleplays.py index ca0338f..7c6f6a8 100644 --- a/src/roleplays.py +++ b/src/roleplays.py @@ -14,7 +14,7 @@ async def command_add_roleplay(bot, guild_details, ctx: commands.Context, rp_name, main_host_id: int, rp_start_date: int, rp_duration: int, doc, serial_code=None, local: int = 1, sign_up: int = 1, ongoing: int = 0, ended: int = 0): - rp_list = rp_id_check() + rp_list = fetch_rps() arg_check = [local, sign_up, ongoing, ended] for arg in arg_check: @@ -237,7 +237,7 @@ def _create_code() -> str: # This is the initial check for rp_collection.csv and obtaining its data. -def rp_id_check() -> List: +def fetch_rps() -> List: rp_list = None try: with open("rp_collection.csv", "r+", newline="") as file: @@ -274,7 +274,7 @@ def rp_id_check() -> List: def update_rp_list() -> List: - rp_list = rp_id_check() + rp_list = fetch_rps() updated_rp_list = list() update_dict = None @@ -343,7 +343,7 @@ def _update_rp_list_choice(_id, val) -> Tuple[List, List]: 3: [0, 0, 1] # Ends RP } - rp_list = rp_id_check() + rp_list = fetch_rps() updated_rp_list = list() inform_update = list() @@ -403,7 +403,7 @@ def _update_rp_list_choice(_id, val) -> Tuple[List, List]: def _rp_profile_check(_id) -> List: - rp_list = rp_id_check() + rp_list = fetch_rps() found = [] for rp in rp_list: @@ -417,7 +417,7 @@ def _rp_profile_check_sign_up(val) -> List: if val != 0: return - rp_list = rp_id_check() + rp_list = fetch_rps() found = [] for rp in rp_list: @@ -431,7 +431,7 @@ def _rp_profile_check_ongoing(val) -> List: if val != 1: return - rp_list = rp_id_check() + rp_list = fetch_rps() found = [] for rp in rp_list: @@ -445,7 +445,7 @@ def _rp_profile_check_ended(val) -> List: if val != 2: return - rp_list = rp_id_check() + rp_list = fetch_rps() found = [] for rp in rp_list: @@ -459,7 +459,7 @@ def _rp_profile_check_all(val) -> List: if val != 3: return - rp_list = rp_id_check() + rp_list = fetch_rps() found = [] for rp in rp_list: From 5ba43e75260c63e5ac0fc9d34150343b403b6ab4 Mon Sep 17 00:00:00 2001 From: Chrezm Date: Thu, 30 Sep 2021 22:41:08 -0400 Subject: [PATCH 9/9] Condense duplicate database code --- src/bans.py | 113 ++++++++++++++---------------- src/roleplays.py | 178 ++++++++++++++++++----------------------------- 2 files changed, 122 insertions(+), 169 deletions(-) diff --git a/src/bans.py b/src/bans.py index 5031cad..37da761 100644 --- a/src/bans.py +++ b/src/bans.py @@ -23,24 +23,16 @@ async def command_ban_id(bot, guild_details, ctx: commands.Context, user_id: int return await ctx.send(f"**{target.name} is already in the list and his ban has " "not ended.**") - with open("ban_ids.csv", "a") as file_: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file_, fieldnames=fieldnames) - writer.writerow({ - "discord_id": user_id, - "discord_name": target.name, - "ban_timestamp": round(time.time()), - "ban_length": ban_length, - "reason": reason, - "ended": 0 - }) + ban_entry = { + "discord_id": user_id, + "discord_name": target.name, + "ban_timestamp": round(time.time()), + "ban_length": ban_length, + "reason": reason, + "ended": 0 + } + + _append_ban_database([ban_entry]) if ban_length < 86400: days = _second_to_hour(ban_length) @@ -157,18 +149,8 @@ def fetch_ban_ids(): return ban_list except FileNotFoundError: - with open("ban_ids.csv", "w") as file: - print("ban_ids.csv does not exist; the bot will now create one...") - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() + print("ban_ids.csv does not exist; the bot will now create one...") + _set_ban_database([]) return ban_list @@ -218,21 +200,7 @@ def _update_unban(_id): updated_ban_list.append(update_dict) - with open("ban_ids.csv", "w+") as file: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_ban_list: - writer.writerow(update) - + _set_ban_database(updated_ban_list) return updated_ban_list, inform_ban_list @@ -267,21 +235,7 @@ def _update_ban_list(): updated_ban_list.append(update_dict) - with open("ban_ids.csv", "w+") as file: - fieldnames = [ - "discord_id", - "discord_name", - "ban_timestamp", - "ban_length", - "reason", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_ban_list: - writer.writerow(update) - + _set_ban_database(updated_ban_list) return updated_ban_list, inform_ban_list @@ -299,3 +253,42 @@ async def inform_update_list(bot): "Please do not commit the same offense again.**") return + + +_BAN_DATABASE_FIELDS = [ + "discord_id", + "discord_name", + "ban_timestamp", + "ban_length", + "reason", + "ended" + ] + +_BAN_DATABASE_FILENAME = "ban_ids.csv" + + +def _set_ban_database(bans: List): + with open(_BAN_DATABASE_FILENAME, "w+") as file: + writer = csv.DictWriter(file, fieldnames=_BAN_DATABASE_FILENAME) + writer.writeheader() + + for update in bans: + writer.writerow(update) + + +def _append_ban_database(bans: List): + with open(_BAN_DATABASE_FILENAME, "a") as file: + writer = csv.DictWriter(file, fieldnames=_BAN_DATABASE_FILENAME) + + for update in bans: + writer.writerow(update) + + +def _read_ban_database() -> List: + with open(_BAN_DATABASE_FILENAME, "r+", newline="") as file: + reader = csv.DictReader(file) + ban_list = [] + for x in reader: + ban_list.append(x) + + return ban_list diff --git a/src/roleplays.py b/src/roleplays.py index 7c6f6a8..c8a2b65 100644 --- a/src/roleplays.py +++ b/src/roleplays.py @@ -62,39 +62,24 @@ async def command_add_roleplay(bot, guild_details, ctx: commands.Context, rp_nam noted_time = time.strftime('%d-%B-%Y %H:%M:%S', time.gmtime(int(rp_start_date))) - with open("rp_collection.csv", "a") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended"] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writerow({ - "serial_code": serial_code, - "approved": ctx.author.name, - "approved_id": ctx.author.id, - "local": local, - "rp_name": rp_name, - "main_host": target.name, - "main_host_id": main_host_id, - "rp_start_date": rp_start_date, - "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", - "rp_duration": rp_duration, - "doc": doc, - "sign_up": sign_up, - "ongoing": ongoing, - "ended": ended - }) + database_entry = { + "serial_code": serial_code, + "approved": ctx.author.name, + "approved_id": ctx.author.id, + "local": local, + "rp_name": rp_name, + "main_host": target.name, + "main_host_id": main_host_id, + "rp_start_date": rp_start_date, + "rp_start_time": f"{hour_split[0]}:{hour_split[1]}", + "rp_duration": rp_duration, + "doc": doc, + "sign_up": sign_up, + "ongoing": ongoing, + "ended": ended + } + + _append_roleplay_database([database_entry]) await ctx.channel.send(f'**{rp_name} ({serial_code}) was inserted to the Database; ' f'hosted by {target.name}.**' @@ -240,35 +225,10 @@ def _create_code() -> str: def fetch_rps() -> List: rp_list = None try: - with open("rp_collection.csv", "r+", newline="") as file: - reader = csv.DictReader(file) - rp_list = [] - for x in reader: - rp_list.append(x) - - return rp_list - + return _read_roleplay_database() except FileNotFoundError: - with open("rp_collection.csv", "w") as file: - print("rp_collection.csv does not exist; the bot will now create one...") - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() + print("rp_collection.csv does not exist; the bot will now create one...") + _set_roleplay_database([]) return rp_list @@ -308,30 +268,7 @@ def update_rp_list() -> List: updated_rp_list.append(update_dict) - with open("rp_collection.csv", "w+") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_rp_list: - writer.writerow(update) - + _set_roleplay_database(updated_rp_list) return updated_rp_list @@ -375,30 +312,7 @@ def _update_rp_list_choice(_id, val) -> Tuple[List, List]: updated_rp_list.append(update_dict) - with open("rp_collection.csv", "w+") as file: - fieldnames = [ - "serial_code", - "approved", - "approved_id", - "local", - "rp_name", - "main_host", - "main_host_id", - "rp_start_date", - "rp_start_time", - "rp_duration", - "doc", - "sign_up", - "ongoing", - "ended" - ] - - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - - for update in updated_rp_list: - writer.writerow(update) - + _set_roleplay_database(updated_rp_list) return updated_rp_list, inform_update @@ -466,3 +380,49 @@ def _rp_profile_check_all(val) -> List: found.append(rp) return found + + +_ROLEPLAY_DATABASE_FIELDS = [ + "serial_code", + "approved", + "approved_id", + "local", + "rp_name", + "main_host", + "main_host_id", + "rp_start_date", + "rp_start_time", + "rp_duration", + "doc", + "sign_up", + "ongoing", + "ended" + ] + +_ROLEPLAY_DATABASE_FILENAME = "rp_collection.csv" + + +def _set_roleplay_database(roleplays: List): + with open(_ROLEPLAY_DATABASE_FILENAME, "w+") as file: + writer = csv.DictWriter(file, fieldnames=_ROLEPLAY_DATABASE_FIELDS) + writer.writeheader() + + for update in roleplays: + writer.writerow(update) + + +def _append_roleplay_database(roleplays: List): + with open(_ROLEPLAY_DATABASE_FILENAME, "a") as file: + writer = csv.DictWriter(file, fieldnames=_ROLEPLAY_DATABASE_FIELDS) + for update in roleplays: + writer.writerow(update) + + +def _read_roleplay_database() -> List: + with open(_ROLEPLAY_DATABASE_FILENAME, "r+", newline="") as file: + reader = csv.DictReader(file) + rp_list = [] + for x in reader: + rp_list.append(x) + + return rp_list