📝 Cogs and utils

Improved code style and added comments and docstrings to cogs and utils.
This commit is contained in:
NikolajDanger
2021-04-15 15:16:56 +02:00
parent 35b2446a10
commit 43f26ec383
11 changed files with 725 additions and 255 deletions
+6 -3
View File
@@ -1,7 +1,10 @@
"""A collections of utilities used by Gwendolyn and her functions"""
"""A collections of utilities used by Gwendolyn and her functions."""
__all__ = ["Options", "Credentials", "databaseFuncs", "EventHandler", "ErrorHandler", "getParams", "logThis", "cap", "makeFiles", "replaceMultiple", "emojiToCommand"]
__all__ = ["Options", "Credentials", "databaseFuncs", "EventHandler",
"ErrorHandler", "getParams", "logThis", "cap", "makeFiles",
"replaceMultiple", "emojiToCommand"]
from .helperClasses import Options, Credentials, databaseFuncs
from .eventHandlers import EventHandler, ErrorHandler
from .utilFunctions import getParams, logThis, cap, makeFiles, replaceMultiple, emojiToCommand
from .utilFunctions import (getParams, logThis, cap, makeFiles,
replaceMultiple, emojiToCommand, longStrings)
+120 -64
View File
@@ -1,13 +1,52 @@
import discord, traceback, discord_slash, sys
from discord.ext import commands
"""
Classes used to handle bot events and errors.
*Classes*
---------
EventHandler
ErrorHandler
"""
import discord # Used to init discord.Game and discord.Status, as well
# as compare errors to discord errors and as typehints
import traceback # Used to get the traceback of errors
import sys # Used to get traceback when the specific error is not
# available
from discord.ext import commands # Used to compare errors with command
# errors
from discord_slash.context import SlashContext
from utils.utilFunctions import emojiToCommand
from .utilFunctions import emojiToCommand
class EventHandler():
"""
Handles bot events.
*Methods*
---------
on_ready()
on_slash_command(ctx: discord_slash.context.SlashContext)
on_reaction_add(ctx: discord_slash.context.SlashContext)
"""
def __init__(self, bot):
"""Initialize the handler."""
self.bot = bot
async def on_slash_command(self, ctx):
async def on_ready(self):
"""Log and sets status when it logs in."""
await self.bot.databaseFuncs.syncCommands()
name = self.bot.user.name
userid = str(self.bot.user.id)
loggedInMessage = f"Logged in as {name}, {userid}"
self.bot.log(loggedInMessage, level=25)
game = discord.Game("Use /help for commands")
onlineStatus = discord.Status.online
await self.bot.change_presence(activity=game, status=onlineStatus)
async def on_slash_command(self, ctx: SlashContext):
"""Log when a slash command is given."""
if ctx.subcommand_name is not None:
subcommand = f" {ctx.subcommand_name} "
else:
@@ -21,100 +60,117 @@ class EventHandler():
args = " ".join([str(i) for i in ctx.args])
fullCommand = f"/{ctx.command}{subcommand}{subcommandGroup}{args}"
logMessage = f"{ctx.author.display_name} ran {fullCommand}"
self.bot.log(logMessage, str(ctx.channel_id), level = 25)
self.bot.log(logMessage, str(ctx.channel_id), level=25)
async def on_ready(self):
await self.bot.databaseFuncs.syncCommands()
self.bot.log("Logged in as "+self.bot.user.name+", "+str(self.bot.user.id), level = 25)
game = discord.Game("Use /help for commands")
await self.bot.change_presence(activity=game, status = discord.Status.online)
async def on_reaction_add(self, reaction : discord.Reaction, user):
if user.bot == False:
async def on_reaction_add(self, reaction: discord.Reaction,
user: discord.User):
"""Take action if the reaction is on a command message."""
if not user.bot:
tests = self.bot.databaseFuncs
message = reaction.message
channel = message.channel
self.bot.log(f"{user.display_name} reacted to a message",str(channel.id))
try:
connectFourTheirTurn, piece = self.bot.databaseFuncs.connectFourReactionTest(channel,message,"#"+str(user.id))
except:
connectFourTheirTurn = False
reactedMessage = f"{user.display_name} reacted to a message"
self.bot.log(reactedMessage, str(channel.id))
plexData = tests.bedreNetflixReactionTest(message)
# plexData is a list containing 3 elements: whether it was
# the addshow/addmovie command message the reaction was to
# (bool), whether it's a movie (bool) (if false, it's a
# show), and the imdb ids/names for the for the movies or
# shows listed in the message (list).
bedreNetflixMessage, addMovie, imdbIds = self.bot.databaseFuncs.bedreNetflixReactionTest(channel, message)
reactionTestParams = [message, f"#{str(user.id)}"]
if connectFourTheirTurn:
if tests.connectFourReactionTest(*reactionTestParams):
column = emojiToCommand(reaction.emoji)
await self.bot.games.connectFour.placePiece(message, f"#{user.id}", column-1)
elif bedreNetflixMessage and addMovie:
moviePick = emojiToCommand(reaction.emoji)
if moviePick == "none":
imdbID = None
else:
imdbID = imdbIds[moviePick-1]
params = [message, f"#{user.id}", column-1]
await self.bot.games.connectFour.placePiece(*params)
if isinstance(channel, discord.DMChannel):
await message.delete()
await self.bot.other.bedreNetflix.addMovie(message, imdbID, False)
else:
await message.clear_reactions()
await self.bot.other.bedreNetflix.addMovie(message, imdbID)
elif bedreNetflixMessage and not addMovie:
showPick = emojiToCommand(reaction.emoji)
if showPick == "none":
imdbName = None
else:
imdbName = imdbIds[showPick-1]
if plexData[0]:
plexFuncs = self.bot.other.bedreNetflix
if plexData[1]:
moviePick = emojiToCommand(reaction.emoji)
if moviePick == "none":
imdbID = None
else:
imdbID = plexData[2][moviePick-1]
if isinstance(channel, discord.DMChannel):
await message.delete()
await self.bot.other.bedreNetflix.addShow(message, imdbName, False)
if isinstance(channel, discord.DMChannel):
await message.delete()
await plexFuncs.addMovie(message, imdbID, False)
else:
await message.clear_reactions()
await plexFuncs.addMovie(message, imdbID)
else:
await message.clear_reactions()
await self.bot.other.bedreNetflix.addShow(message, imdbName)
showPick = emojiToCommand(reaction.emoji)
if showPick == "none":
imdbName = None
else:
imdbName = plexData[2][showPick-1]
elif self.bot.databaseFuncs.hangmanReactionTest(channel, message, f"#{user.id}"):
if isinstance(channel, discord.DMChannel):
await message.delete()
await plexFuncs.addShow(message, imdbName, False)
else:
await message.clear_reactions()
await plexFuncs.addShow(message, imdbName)
elif tests.hangmanReactionTest(*reactionTestParams):
self.bot.log("They reacted to the hangman message")
if ord(reaction.emoji) in range(127462,127488):
if ord(reaction.emoji) in range(127462, 127488):
# The range is letter-emojis
guess = chr(ord(reaction.emoji)-127397)
await self.bot.games.hangman.guess(message, f"#{user.id}", guess)
# Converts emoji to letter
params = [message, f"#{user.id}", guess]
await self.bot.games.hangman.guess(*params)
else:
self.bot.log("Bot they didn't react with a valid guess")
class ErrorHandler():
"""
Handles errors.
*Methods*
---------
on_slash_command_error(ctx: discord_slash.context.SlashContext,
error: Exception)
on_error(method: str)
"""
def __init__(self, bot):
"""Initialize the handler."""
self.bot = bot
async def on_slash_command_error(self, ctx, error):
async def on_slash_command_error(self, ctx: SlashContext,
error: Exception):
"""Log when there's a slash command."""
if isinstance(error, commands.CommandNotFound):
await ctx.send("That's not a command (error code 001)")
await ctx.send("That's not a command")
elif isinstance(error, discord.errors.NotFound):
self.bot.log("Deleted message before I could add all reactions")
elif isinstance(error, commands.errors.MissingRequiredArgument):
self.bot.log(f"{error}",str(ctx.channel_id))
await ctx.send("Missing command parameters (error code 002). Try using `!help [command]` to find out how to use the command.")
self.bot.log(f"{error}", str(ctx.channel_id))
await ctx.send(self.bot.longStrings["missing parameters"])
else:
exception = traceback.format_exception(type(error), error, error.__traceback__)
stopAt = "\nThe above exception was the direct cause of the following exception:\n\n"
if stopAt in exception:
index = exception.index(stopAt)
exception = exception[:index]
params = [type(error), error, error.__traceback__]
exception = traceback.format_exception(*params)
exceptionString = "".join(exception)
self.bot.log([f"exception in /{ctx.name}", f"{exceptionString}"],str(ctx.channel_id), 40)
logMessages = [f"exception in /{ctx.name}", f"{exceptionString}"]
self.bot.log(logMessages, str(ctx.channel_id), 40)
if isinstance(error, discord.errors.NotFound):
self.bot.log("Context is non-existant", level = 40)
self.bot.log("Context is non-existant", level=40)
else:
await ctx.send("Something went wrong (error code 000)")
async def on_error(self, method):
async def on_error(self, method: str):
"""Log when there's an error."""
errorType = sys.exc_info()[0]
if errorType == discord.errors.NotFound:
self.bot.log("Deleted message before I could add all reactions")
else:
exception = traceback.format_exc()
stopAt = "\nThe above exception was the direct cause of the following exception:\n\n"
if stopAt in exception:
index = exception.index(stopAt)
exception = exception[:index]
exceptionString = "".join(exception)
self.bot.log([f"exception in {method}", f"{exceptionString}"], level = 40)
logMessages = [f"exception in {method}", f"{exceptionString}"]
self.bot.log(logMessages, level=40)
+208 -30
View File
@@ -1,6 +1,44 @@
import re, git, os, json, time
"""
Contains classes used for utilities.
def sanitize(data : str, options : bool = False):
*Functions*
-----------
Sanitize(data: str, lowerCaseValue: bool = false) -> dict
*Classes*
---------
Options()
Credentials()
DatabaseFuncs()
"""
import re # Used in getID
import git # Used to pull when stopping
import os # Used to test if files exist
import json # Used to read the data about addmovie/addshow
import time # Used to test how long it's been since commands were synced
import discord # Used for type hints
def sanitize(data: str, lowerCaseValue: bool = False):
"""
Sanitize and create a dictionary from a string.
Each element is created from a line with a : in it. The key is left
of the :, the value is right of it.
*Parameters*
------------
data: str
The string to create a dict from.
lowerCaseValue: bool = False
Whether the value of each element should be lowercase.
*Returns*
---------
dct: dict
The sanitized dictionary of elements.
"""
data = data.splitlines()
dct = {}
for line in data:
@@ -8,7 +46,7 @@ def sanitize(data : str, options : bool = False):
lineValues = line.split(":")
lineValues[0] = lineValues[0].lower()
lineValues[1] = lineValues[1].replace(" ", "")
if options:
if lowerCaseValue:
lineValues[1] = lineValues[1].lower()
if lineValues[0] in ["testing guild ids", "admins"]:
@@ -23,18 +61,26 @@ def sanitize(data : str, options : bool = False):
return dct
class Options():
"""Contains the options for the bot."""
def __init__(self):
with open("options.txt","r") as f:
"""Initialize the options."""
with open("options.txt", "r") as f:
data = sanitize(f.read(), True)
self.testing = data["testing"]
self.guildIds = data["testing guild ids"]
self.admins = data["admins"]
class Credentials():
"""Contains the credentials for the bot and apis."""
def __init__(self):
with open("credentials.txt","r") as f:
"""Initialize the credentials."""
with open("credentials.txt", "r") as f:
data = sanitize(f.read())
self.token = data["bot token"]
@@ -46,34 +92,99 @@ class Credentials():
self.radarrKey = data["radarr api key"]
self.sonarrKey = data["sonarr api key"]
class databaseFuncs():
"""
Manages database functions.
*Methods*
---------
getName(userID: str) -> str
getID(userName: str) -> str
deleteGame(gameType: str, channel: str)
wipeGames()
connectFourReactionTest(message: discord.Message,
user: discord.User) -> bool
hangmanReactionTest(message: discord.Message,
user: discord.User) -> bool
BedreNetflixReactionTest(message: discord.Message,
user: discord.User) -> bool, bool,
list
syncCommands()
"""
def __init__(self, bot):
"""Initialize the class."""
self.bot = bot
def getName(self, userID):
user = self.bot.database["users"].find_one({"_id":userID})
def getName(self, userID: str):
"""
Get the name of a user you have the # id of.
*Parameters:
------------
userID: str
The id of the user you want the name of. The format is
"#" + str(discord.User.id)
*Returns*
---------
userName: str
The name of the user. If the user couldn't be found,
returns the userID.
"""
user = self.bot.database["users"].find_one({"_id": userID})
if userID == f"#{self.bot.user.id}":
return "Gwendolyn"
elif user != None:
elif user is not None:
return user["user name"]
else:
self.bot.log(f"Couldn't find user {userID}")
return userID
def getID(self,userName):
user = self.bot.database["users"].find_one({"user name":re.compile(userName, re.IGNORECASE)})
def getID(self, userName: str):
"""
Get the id of a user you have the username of.
if user != None:
*Parameters:
------------
userName: str
The name of the user you want the id of.
*Returns*
---------
userID: str
The id of the user in the format "#" +
str(discord.User.id). If the user couldn't be found,
returns the userName.
"""
userSearch = {"user name": re.compile(userName, re.IGNORECASE)}
user = self.bot.database["users"].find_one(userSearch)
if user is not None:
return user["_id"]
else:
self.bot.log("Couldn't find user "+userName)
return None
def deleteGame(self, gameType, channel):
self.bot.database[gameType].delete_one({"_id":channel})
def deleteGame(self, gameType: str, channel: str):
"""
Remove a game from the database.
def stopServer(self):
*Parameters*
------------
gameType: str
The name of the collection the game is in, like
"hangman games", "blackjack games" etc.
channel: str
The channel id of the channel the game is on as a
string.
"""
self.bot.database[gameType].delete_one({"_id": channel})
def wipeGames(self):
"""Delete all running games and pull from git."""
self.bot.database["trivia questions"].delete_many({})
self.bot.database["blackjack games"].delete_many({})
self.bot.database["connect 4 games"].delete_many({})
@@ -84,35 +195,80 @@ class databaseFuncs():
g = git.cmd.Git("")
g.pull()
def connectFourReactionTest(self,channel,message,user):
game = self.bot.database["connect 4 games"].find_one({"_id":str(channel.id)})
def connectFourReactionTest(self, message: discord.Message,
user: discord.User):
"""
Test if the given message is the current connect four game.
with open("resources/games/oldImages/connectFour"+str(channel.id), "r") as f:
Also tests if the given user is the one who's turn it is.
*Parameters*
------------
message: discord.Message
The message to test.
user: discord.User
The user to test.
*Returns*
---------
: bool
Whether the given message is the current connect four
game and if the user who reacted is the user who's turn
it is.
"""
channel = message.channel
channelSearch = {"_id": str(channel.id)}
game = self.bot.database["connect 4 games"].find_one(channelSearch)
filePath = f"resources/games/oldImages/connectFour{channel.id}"
with open(filePath, "r") as f:
oldImage = int(f.read())
if message.id == oldImage:
self.bot.log("They reacted to the connectFour game")
turn = game["turn"]
if user == game["players"][turn]:
return True, turn+1
return True
else:
self.bot.log("It wasn't their turn")
return False, 0
return False
else:
return False, 0
return False
def hangmanReactionTest(self, channel, message, user):
try:
with open("resources/games/oldImages/hangman"+str(channel.id), "r") as f:
def hangmanReactionTest(self, message: discord.Message,
user: discord.User):
"""
Test if the given message is the current hangman game.
Also tests if the given user is the one who's playing hangman.
*Parameters*
------------
message: discord.Message
The message to test.
user: discord.User
The user to test.
*Returns*
---------
: bool
Whether the given message is the current hangman game
and if the user who reacted is the user who's playing
hangman.
"""
channel = message.channel
filePath = f"resources/games/oldImages/hangman{channel.id}"
if os.path.isfile(filePath):
with open(filePath, "r") as f:
oldMessages = f.read().splitlines()
except:
else:
return False
gameMessage = False
for oldMessage in oldMessages:
oldMessageID = int(oldMessage)
if message.id == oldMessageID:
game = self.bot.database["hangman games"].find_one({"_id":str(channel.id)})
database = self.bot.database["hangman games"]
channelSearch = {"_id": str(channel.id)}
game = database.find_one(channelSearch)
if user == game["player"]:
gameMessage = True
@@ -120,9 +276,30 @@ class databaseFuncs():
return gameMessage
def bedreNetflixReactionTest(self, channel, message):
if os.path.isfile(f"resources/bedreNetflix/oldMessage{str(channel.id)}"):
with open("resources/bedreNetflix/oldMessage"+str(channel.id),"r") as f:
def bedreNetflixReactionTest(self, message: discord.Message):
"""
Test if the given message is the response to a plex request.
*Parameters*
------------
message: discord.Message
The message to test.
*Returns*
---------
: bool
Whether the message is the response to a plex request.
: bool
Whether it was a movie request (false for a show
request)
: list
A list of ids or names of the shows or movies that
Gwendolyn presented after the request.
"""
channel = message.channel
filePath = f"resources/bedreNetflix/oldMessage{str(channel.id)}"
if os.path.isfile(filePath):
with open(filePath, "r") as f:
data = json.load(f)
else:
return False, None, None
@@ -136,6 +313,7 @@ class databaseFuncs():
return False, None, None
async def syncCommands(self):
"""Sync the slash commands with the discord API."""
collection = self.bot.database["last synced"]
lastSynced = collection.find_one()
now = time.time()
@@ -144,6 +322,6 @@ class databaseFuncs():
self.bot.log(f"Updating commands: {slashCommandList}")
await self.bot.slash.sync_all_commands()
idNumber = lastSynced["_id"]
queryFilter = {"_id" : idNumber}
update = {"$set" : {"last synced" : now}}
queryFilter = {"_id": idNumber}
update = {"$set": {"last synced": now}}
collection.update_one(queryFilter, update)
+145 -29
View File
@@ -1,3 +1,18 @@
"""
Contains utility functions used by parts of the bot.
*Functions*
-----------
longstrings() -> dict
getParams() -> dict
logThis(messages: Union[str, list], channel: str = "",
level: int = 20)
cap(s: str) -> str
makeFiles()
replaceMultiple(mainString: str, toBeReplaced: list,
newString: str) -> str
emojiToCommand(emoji: str) -> str
"""
import json
import logging
import os
@@ -5,22 +20,55 @@ import sys
import imdb
from .helperClasses import Options
# All of this is logging configuration
FORMAT = " %(asctime)s | %(name)-16s | %(levelname)-8s | %(message)s"
PRINTFORMAT = "%(asctime)s - %(message)s"
DATEFORMAT = "%Y-%m-%d %H:%M:%S"
logging.addLevelName(25, "PRINT")
logging.basicConfig(format=FORMAT, datefmt=DATEFORMAT, level=logging.INFO, filename="gwendolyn.log")
loggingConfigParams = {
"format": FORMAT,
"datefmt": DATEFORMAT,
"level": logging.INFO,
"filename": "gwendolyn.log"
}
logging.basicConfig(**loggingConfigParams)
logger = logging.getLogger("Gwendolyn")
printer = logging.getLogger("printer")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt = PRINTFORMAT, datefmt=DATEFORMAT))
handler.setFormatter(logging.Formatter(fmt=PRINTFORMAT, datefmt=DATEFORMAT))
printer.addHandler(handler)
printer.propagate = False
imdb._logging.setLevel("CRITICAL")
imdb._logging.setLevel("CRITICAL") # Basically disables imdbpy
# logging, since it's printed to the terminal.
def longStrings():
"""
Get the data from resources/longStrings.json.
*Returns*
---------
data: dict
The long strings and their keys.
"""
with open("resources/longStrings.json", "r") as f:
data = json.load(f)
return data
def getParams():
"""
Get the slash command parameters.
*Returns*
---------
params: dict
The parameters for every slash command.
"""
with open("resources/slashParameters.json", "r") as f:
params = json.load(f)
@@ -32,8 +80,25 @@ def getParams():
return params
def logThis(messages, channel : str = "", level : int = 20):
channel = channel.replace("Direct Message with ","")
def logThis(messages, channel: str = "", level: int = 20):
"""
Log something in Gwendolyn's logs.
*Parameters*
------------
messages: Union[str, list]
A string or list of strings to be logged. If there are
multiple strings and the level is PRINT (25) or higher,
only the first string will be printed.
channel: str = ""
The channel the event to be logged occurred in. Will be
logged along with the message(s).
level: int = 20
The level to log the message(s) at. If PRINT (25) or
higher, the first message will be printed to the console.
"""
channel = channel.replace("Direct Message with ", "")
if type(messages) is str:
messages = [messages]
@@ -41,9 +106,11 @@ def logThis(messages, channel : str = "", level : int = 20):
for x, msg in enumerate(messages):
if channel != "":
messages[x] = f"{msg} - ({channel})"
messages[x] = f"{msg} - ({channel})" # Adds channel to log
# messages
if len(messages) > 1:
if len(messages) > 1: # Tells user to check the log if there are
# more messages there
printMessage += " (details in log)"
if level >= 25:
@@ -52,10 +119,24 @@ def logThis(messages, channel : str = "", level : int = 20):
for logMessage in messages:
logger.log(level, logMessage)
# Capitalizes all words except some of them
def cap(s):
no_caps_list = ["of","the"]
# Capitalizes a strink like a movie title
def cap(s: str):
"""
Capitalize a string like a movie title.
That means "of" and "the" are not capitalized.
*Parameters*
------------
s: str
The string to capitalized.
*Returns*
---------
res: str
The capitalized string.
"""
no_caps_list = ["of", "the"]
word_number = 0
lst = s.split()
res = ''
@@ -67,22 +148,25 @@ def cap(s):
res = res[:-1]
return res
def makeFiles():
def makeJsonFile(path,content):
# Creates json file if it doesn't exist
if not os.path.isfile(path):
logThis(path.split("/")[-1]+" didn't exist. Making it now.")
with open(path,"w") as f:
json.dump(content,f,indent = 4)
def makeTxtFile(path,content):
# Creates txt file if it doesn't exist
def makeFiles():
"""Create all the files and directories needed by Gwendolyn."""
def makeJsonFile(path, content):
"""Create json file if it doesn't exist."""
if not os.path.isfile(path):
logThis(path.split("/")[-1]+" didn't exist. Making it now.")
with open(path,"w") as f:
with open(path, "w") as f:
json.dump(content, f, indent=4)
def makeTxtFile(path, content):
"""Create txt file if it doesn't exist."""
if not os.path.isfile(path):
logThis(path.split("/")[-1]+" didn't exist. Making it now.")
with open(path, "w") as f:
f.write(content)
def directory(path):
"""Create directory if it doesn't exist."""
if not os.path.isdir(path):
os.makedirs(path)
logThis("The "+path.split("/")[-1]+" directory didn't exist")
@@ -91,26 +175,57 @@ def makeFiles():
data = json.load(f)
for path, content in data["json"].items():
makeJsonFile(path,content)
makeJsonFile(path, content)
for path, content in data["txt"].items():
makeTxtFile(path,content)
makeTxtFile(path, content)
for path in data["folder"]:
directory(path)
# Replaces multiple things with the same thing
def replaceMultiple(mainString, toBeReplaces, newString):
def replaceMultiple(mainString: str, toBeReplaced: list, newString: str):
"""
Replace multiple substrings in a string with the same substring.
*Parameters*
------------
mainString: str
The string to replace substrings in.
toBeReplaced: list
The substrings to replace.
newString: str
The string to replace the substrings with.
*Returns*
---------
mainString: str
The string with the substrings replaced.
"""
# Iterate over the strings to be replaced
for elem in toBeReplaces :
for elem in toBeReplaced:
# Check if string is in the main string
if elem in mainString :
if elem in mainString:
# Replace the string
mainString = mainString.replace(elem, newString)
return mainString
def emojiToCommand(emoji):
def emojiToCommand(emoji: str):
"""
Convert emoji to text.
*Parameters*
------------
emoji: str
The emoji to decipher.
*Returns*
---------
: str
The deciphered string.
"""
if emoji == "1️⃣":
return 1
elif emoji == "2️⃣":
@@ -131,4 +246,5 @@ def emojiToCommand(emoji):
return "none"
elif emoji == "✔️":
return 1
else: return ""
else:
return ""