mirror of
https://github.com/Lilac-Rose/Lacie.git
synced 2026-03-21 17:54:50 -05:00
ping protect and minesweeper stats
This commit is contained in:
parent
f2bfd09042
commit
a7dde9ba2d
1
bot.py
1
bot.py
|
|
@ -22,6 +22,7 @@ TOKEN = os.getenv("TOKEN")
|
|||
# All cog folders to load on startup and reload
|
||||
COG_FOLDERS = [
|
||||
"commands",
|
||||
"games",
|
||||
"moderation",
|
||||
"xp",
|
||||
"sparkle",
|
||||
|
|
|
|||
|
|
@ -13,43 +13,53 @@ class ChainDetector(commands.Cog):
|
|||
# Ignore bot messages
|
||||
if message.author.bot:
|
||||
return
|
||||
|
||||
# Ignore images, stickers, files, etc.
|
||||
if message.attachments or message.stickers:
|
||||
|
||||
# Ignore images, files, etc. (but allow stickers)
|
||||
if message.attachments:
|
||||
return
|
||||
|
||||
content = message.content.strip()
|
||||
if not content:
|
||||
return
|
||||
|
||||
|
||||
if message.mentions or message.role_mentions or message.channel_mentions or message.mention_everyone:
|
||||
return
|
||||
|
||||
|
||||
# Determine chain key and how to send
|
||||
if message.stickers:
|
||||
sticker = message.stickers[0]
|
||||
chain_key = f"sticker:{sticker.id}"
|
||||
async def send_chain():
|
||||
await message.channel.send(stickers=[sticker])
|
||||
else:
|
||||
content = message.content.strip()
|
||||
if not content:
|
||||
return
|
||||
chain_key = content
|
||||
async def send_chain():
|
||||
await message.channel.send(chain_key)
|
||||
|
||||
channel_id = message.channel.id
|
||||
|
||||
|
||||
# Initialize cache for this channel
|
||||
if channel_id not in self.cache:
|
||||
self.cache[channel_id] = {
|
||||
"last_message": content,
|
||||
"last_message": chain_key,
|
||||
"users": [message.author.id]
|
||||
}
|
||||
return
|
||||
|
||||
|
||||
chain = self.cache[channel_id]
|
||||
|
||||
|
||||
# If message matches the chain message
|
||||
if content == chain["last_message"]:
|
||||
if chain_key == chain["last_message"]:
|
||||
# Only count if it's a DIFFERENT user
|
||||
if message.author.id not in chain["users"]:
|
||||
chain["users"].append(message.author.id)
|
||||
else:
|
||||
# Reset chain
|
||||
chain["last_message"] = content
|
||||
chain["last_message"] = chain_key
|
||||
chain["users"] = [message.author.id]
|
||||
|
||||
|
||||
# If three different users said the same thing
|
||||
if len(chain["users"]) == 3:
|
||||
await message.channel.send(content)
|
||||
await send_chain()
|
||||
# Reset the chain completely
|
||||
chain["last_message"] = ""
|
||||
chain["users"] = []
|
||||
|
|
|
|||
|
|
@ -1,62 +1,151 @@
|
|||
import sqlite3
|
||||
import discord
|
||||
from discord import app_commands
|
||||
from discord.ext import commands
|
||||
from pathlib import Path
|
||||
import aiosqlite
|
||||
|
||||
# The user ID to protect from pings
|
||||
PROTECTED_USER_ID = 252130669919076352
|
||||
NO_PINGS_ROLE_ID = 1439583411517001819
|
||||
|
||||
# Users allowed to ping the protected user
|
||||
ALLOWED_PINGERS = {
|
||||
PROTECTED_USER_ID = 252130669919076352 # only lilac gets ping tracking
|
||||
|
||||
# initial allowlist for lilac, gets seeded into DB on load
|
||||
INITIAL_ALLOWED_PINGERS = {
|
||||
505390548232699906,
|
||||
771709136051372032,
|
||||
692030310644187206,
|
||||
1153235432813895730,
|
||||
252130669919076352,
|
||||
1409637508689563689,
|
||||
1407793866559721532,
|
||||
1265042492865122358,
|
||||
547143614099226626,
|
||||
}
|
||||
|
||||
DB_PATH = Path(__file__).parent.parent / "data" / "ping_protect.db"
|
||||
|
||||
def get_db():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS ping_counts (
|
||||
user_id INTEGER PRIMARY KEY,
|
||||
count INTEGER DEFAULT 0
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
def record_ping(user_id: int):
|
||||
conn = get_db()
|
||||
conn.execute("""
|
||||
INSERT INTO ping_counts (user_id, count) VALUES (?, 1)
|
||||
ON CONFLICT(user_id) DO UPDATE SET count = count + 1
|
||||
""", (user_id,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
class PingProtectListener(commands.Cog):
|
||||
"""Responds when the protected user is pinged by someone not on the allowlist."""
|
||||
|
||||
class PingProtect(commands.GroupCog, name="noping"):
|
||||
def __init__(self, bot: commands.Bot):
|
||||
self.bot = bot
|
||||
|
||||
async def cog_load(self):
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
await db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS ping_counts (
|
||||
user_id INTEGER PRIMARY KEY,
|
||||
count INTEGER DEFAULT 0
|
||||
)
|
||||
""")
|
||||
await db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS allowlists (
|
||||
protected_user_id INTEGER,
|
||||
allowed_user_id INTEGER,
|
||||
PRIMARY KEY (protected_user_id, allowed_user_id)
|
||||
)
|
||||
""")
|
||||
for uid in INITIAL_ALLOWED_PINGERS:
|
||||
await db.execute("""
|
||||
INSERT OR IGNORE INTO allowlists (protected_user_id, allowed_user_id)
|
||||
VALUES (?, ?)
|
||||
""", (PROTECTED_USER_ID, uid))
|
||||
await db.commit()
|
||||
|
||||
def _has_permission(self, member: discord.Member) -> bool:
|
||||
return (
|
||||
member.id == PROTECTED_USER_ID or
|
||||
any(r.id == NO_PINGS_ROLE_ID for r in member.roles)
|
||||
)
|
||||
|
||||
async def _is_allowed(self, protected_user_id: int, pinger_id: int) -> bool:
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT 1 FROM allowlists WHERE protected_user_id = ? AND allowed_user_id = ?",
|
||||
(protected_user_id, pinger_id)
|
||||
)
|
||||
return await cursor.fetchone() is not None
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.Message):
|
||||
if message.author.bot:
|
||||
if message.author.bot or not message.guild:
|
||||
return
|
||||
|
||||
# Check if the protected user is mentioned
|
||||
if not any(user.id == PROTECTED_USER_ID for user in message.mentions):
|
||||
for user in message.mentions:
|
||||
member = message.guild.get_member(user.id)
|
||||
|
||||
is_protected = (
|
||||
user.id == PROTECTED_USER_ID or
|
||||
(member and any(r.id == NO_PINGS_ROLE_ID for r in member.roles))
|
||||
)
|
||||
if not is_protected:
|
||||
continue
|
||||
|
||||
if await self._is_allowed(user.id, message.author.id):
|
||||
continue
|
||||
|
||||
if user.id == PROTECTED_USER_ID:
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
await db.execute("""
|
||||
INSERT INTO ping_counts (user_id, count) VALUES (?, 1)
|
||||
ON CONFLICT(user_id) DO UPDATE SET count = count + 1
|
||||
""", (message.author.id,))
|
||||
await db.commit()
|
||||
await message.reply("Please don't ping faer", mention_author=False)
|
||||
else:
|
||||
name = member.display_name if member else user.name
|
||||
await message.reply(f"Please don't ping {name}, they have pings disabled.", mention_author=False)
|
||||
|
||||
@app_commands.command(name="allow", description="Allow someone to ping you")
|
||||
@app_commands.describe(user="The user to allow")
|
||||
async def allow(self, interaction: discord.Interaction, user: discord.Member):
|
||||
if not self._has_permission(interaction.user):
|
||||
await interaction.response.send_message("You need the no-pings role to use this.", ephemeral=True)
|
||||
return
|
||||
|
||||
# Allow users on the allowlist (don't track them)
|
||||
if message.author.id in ALLOWED_PINGERS:
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
await db.execute("""
|
||||
INSERT OR IGNORE INTO allowlists (protected_user_id, allowed_user_id)
|
||||
VALUES (?, ?)
|
||||
""", (interaction.user.id, user.id))
|
||||
await db.commit()
|
||||
|
||||
await interaction.response.send_message(f"✅ {user.display_name} can now ping you.", ephemeral=True)
|
||||
|
||||
@app_commands.command(name="remove", description="Remove someone from your ping allowlist")
|
||||
@app_commands.describe(user="The user to remove")
|
||||
async def remove(self, interaction: discord.Interaction, user: discord.Member):
|
||||
if not self._has_permission(interaction.user):
|
||||
await interaction.response.send_message("You need the no-pings role to use this.", ephemeral=True)
|
||||
return
|
||||
|
||||
record_ping(message.author.id)
|
||||
await message.reply("Please don't ping faer", mention_author=False)
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
await db.execute(
|
||||
"DELETE FROM allowlists WHERE protected_user_id = ? AND allowed_user_id = ?",
|
||||
(interaction.user.id, user.id)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
await interaction.response.send_message(f"✅ {user.display_name} can no longer ping you.", ephemeral=True)
|
||||
|
||||
@app_commands.command(name="list", description="View your ping allowlist")
|
||||
async def list_allowed(self, interaction: discord.Interaction):
|
||||
if not self._has_permission(interaction.user):
|
||||
await interaction.response.send_message("You need the no-pings role to use this.", ephemeral=True)
|
||||
return
|
||||
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT allowed_user_id FROM allowlists WHERE protected_user_id = ?",
|
||||
(interaction.user.id,)
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
await interaction.response.send_message("Your allowlist is empty.", ephemeral=True)
|
||||
return
|
||||
|
||||
mentions = "\n".join(f"<@{row[0]}>" for row in rows)
|
||||
await interaction.response.send_message(f"**Your ping allowlist:**\n{mentions}", ephemeral=True)
|
||||
|
||||
|
||||
async def setup(bot: commands.Bot):
|
||||
await bot.add_cog(PingProtectListener(bot))
|
||||
await bot.add_cog(PingProtect(bot))
|
||||
|
|
|
|||
|
|
@ -58,37 +58,33 @@ class MinesweeperGame:
|
|||
self.rows = rows
|
||||
self.cols = cols
|
||||
self.mine_count = mines
|
||||
|
||||
|
||||
# Board: -1 = mine, 0-8 = number of adjacent mines
|
||||
self.board = [[0 for _ in range(cols)] for _ in range(rows)]
|
||||
# Revealed: False = hidden, True = revealed
|
||||
self.revealed = [[False for _ in range(cols)] for _ in range(rows)]
|
||||
# Flags: False = no flag, True = flagged
|
||||
self.flags = [[False for _ in range(cols)] for _ in range(rows)]
|
||||
|
||||
|
||||
self.game_over = False
|
||||
self.won = False
|
||||
self.first_move = True
|
||||
|
||||
# Column number emojis (11-13 use custom bot emojis)
|
||||
|
||||
# 11-13 use custom bot emojis
|
||||
self.col_emojis = [
|
||||
"1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣", "6️⃣", "7️⃣", "8️⃣", "9️⃣", "🔟",
|
||||
"<:11:1470841923785719952>",
|
||||
"<:12:1470841922716172299>",
|
||||
"<:13:1470841927409729600>"
|
||||
]
|
||||
|
||||
|
||||
def setup_board(self, safe_row: int, safe_col: int):
|
||||
"""Generate mines avoiding the first click position"""
|
||||
positions = [(r, c) for r in range(self.rows) for c in range(self.cols)
|
||||
if not (abs(r - safe_row) <= 1 and abs(c - safe_col) <= 1)]
|
||||
|
||||
|
||||
mine_positions = random.sample(positions, self.mine_count)
|
||||
|
||||
|
||||
for r, c in mine_positions:
|
||||
self.board[r][c] = -1
|
||||
|
||||
# Calculate numbers
|
||||
|
||||
for r in range(self.rows):
|
||||
for c in range(self.cols):
|
||||
if self.board[r][c] != -1:
|
||||
|
|
@ -102,102 +98,85 @@ class MinesweeperGame:
|
|||
if self.board[nr][nc] == -1:
|
||||
count += 1
|
||||
self.board[r][c] = count
|
||||
|
||||
|
||||
def reveal(self, row: int, col: int) -> bool:
|
||||
"""Reveal a cell. Returns True if hit a mine."""
|
||||
if self.first_move:
|
||||
self.setup_board(row, col)
|
||||
self.first_move = False
|
||||
|
||||
|
||||
if self.revealed[row][col] or self.flags[row][col]:
|
||||
return False
|
||||
|
||||
|
||||
self.revealed[row][col] = True
|
||||
|
||||
|
||||
if self.board[row][col] == -1:
|
||||
self.game_over = True
|
||||
return True
|
||||
|
||||
# Flood fill: Auto-reveal adjacent cells if it's a 0
|
||||
|
||||
if self.board[row][col] == 0:
|
||||
self._flood_fill(row, col)
|
||||
|
||||
# Check win condition
|
||||
|
||||
if self.check_win():
|
||||
self.game_over = True
|
||||
self.won = True
|
||||
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _flood_fill(self, start_row: int, start_col: int):
|
||||
"""
|
||||
Flood fill algorithm to reveal all connected empty cells and their borders.
|
||||
Reveals all 0-cells connected to the starting position, plus the numbered
|
||||
cells that border them (the 'edge' of the empty region).
|
||||
"""
|
||||
from collections import deque
|
||||
|
||||
|
||||
queue = deque([(start_row, start_col)])
|
||||
visited = set()
|
||||
visited.add((start_row, start_col))
|
||||
|
||||
|
||||
while queue:
|
||||
row, col = queue.popleft()
|
||||
|
||||
|
||||
for dr in [-1, 0, 1]:
|
||||
for dc in [-1, 0, 1]:
|
||||
if dr == 0 and dc == 0:
|
||||
continue
|
||||
|
||||
|
||||
nr, nc = row + dr, col + dc
|
||||
|
||||
|
||||
if not (0 <= nr < self.rows and 0 <= nc < self.cols):
|
||||
continue
|
||||
if (nr, nc) in visited:
|
||||
continue
|
||||
|
||||
# Skip if flagged
|
||||
if self.flags[nr][nc]:
|
||||
continue
|
||||
|
||||
# Mark as visited
|
||||
|
||||
visited.add((nr, nc))
|
||||
|
||||
# Reveal the cell
|
||||
self.revealed[nr][nc] = True
|
||||
|
||||
# If it's also a 0, add it to the queue to continue flood fill
|
||||
|
||||
if self.board[nr][nc] == 0:
|
||||
queue.append((nr, nc))
|
||||
|
||||
|
||||
def toggle_flag(self, row: int, col: int):
|
||||
"""Toggle flag on a cell"""
|
||||
if self.revealed[row][col]:
|
||||
return
|
||||
self.flags[row][col] = not self.flags[row][col]
|
||||
|
||||
|
||||
def check_win(self) -> bool:
|
||||
"""Check if all non-mine cells are revealed"""
|
||||
for r in range(self.rows):
|
||||
for c in range(self.cols):
|
||||
if self.board[r][c] != -1 and not self.revealed[r][c]:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_cell_display(self, row: int, col: int, show_all: bool = False) -> str:
|
||||
"""Get the display string for a cell"""
|
||||
if show_all and self.board[row][col] == -1:
|
||||
return "💣"
|
||||
|
||||
|
||||
if self.flags[row][col]:
|
||||
return "🚩"
|
||||
|
||||
|
||||
if not self.revealed[row][col]:
|
||||
return "⬛"
|
||||
|
||||
|
||||
if self.board[row][col] == -1:
|
||||
return "💥"
|
||||
|
||||
|
||||
num_to_emoji = {
|
||||
0: "⬜",
|
||||
1: "1️⃣",
|
||||
|
|
@ -210,71 +189,58 @@ class MinesweeperGame:
|
|||
8: "8️⃣"
|
||||
}
|
||||
return num_to_emoji.get(self.board[row][col], "⬜")
|
||||
|
||||
|
||||
def render_board(self) -> str:
|
||||
"""Render the board as a string"""
|
||||
# Column headers using numbers
|
||||
col_headers = "⬛" + "".join(self.col_emojis[:self.cols])
|
||||
|
||||
|
||||
lines = [col_headers]
|
||||
for r in range(self.rows):
|
||||
# Row headers using letters (A-M)
|
||||
row_header = chr(0x1F1E6 + r) # Regional indicator A-M
|
||||
row_header = chr(0x1F1E6 + r) # regional indicator A-M
|
||||
row_str = row_header + "".join(
|
||||
self.get_cell_display(r, c, self.game_over)
|
||||
for c in range(self.cols)
|
||||
)
|
||||
lines.append(row_str)
|
||||
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_stats(self) -> str:
|
||||
"""Get game statistics"""
|
||||
total_flags = sum(sum(row) for row in self.flags)
|
||||
flags_remaining = self.mine_count - total_flags
|
||||
cells_remaining = sum(
|
||||
1 for r in range(self.rows)
|
||||
for c in range(self.cols)
|
||||
1 for r in range(self.rows)
|
||||
for c in range(self.cols)
|
||||
if not self.revealed[r][c] and self.board[r][c] != -1
|
||||
)
|
||||
|
||||
|
||||
return f"🚩 Flags: {total_flags}/{self.mine_count} | 💣 Mines: {self.mine_count} | ⬛ Cells left: {cells_remaining}"
|
||||
|
||||
|
||||
class MinesweeperView(discord.ui.View):
|
||||
def __init__(self, player: discord.Member, game: MinesweeperGame):
|
||||
super().__init__(timeout=None) # Disable built-in timeout, we'll manage it ourselves
|
||||
super().__init__(timeout=None) # timeout handled manually
|
||||
self.player = player
|
||||
self.game = game
|
||||
self.message: discord.Message = None
|
||||
self.timeout_seconds = 1800 # 30 minutes
|
||||
self.timeout_task = None
|
||||
self.timed_out = False
|
||||
|
||||
|
||||
async def start_timeout(self):
|
||||
"""Start or restart the timeout timer"""
|
||||
# Cancel existing timeout task if any
|
||||
if self.timeout_task and not self.timeout_task.done():
|
||||
self.timeout_task.cancel()
|
||||
|
||||
# Create new timeout task
|
||||
self.timeout_task = asyncio.create_task(self._timeout_handler())
|
||||
|
||||
|
||||
async def _timeout_handler(self):
|
||||
"""Handle the timeout after waiting"""
|
||||
try:
|
||||
await asyncio.sleep(self.timeout_seconds)
|
||||
# If we reach here, the timeout expired
|
||||
await self.handle_timeout()
|
||||
except asyncio.CancelledError:
|
||||
# Timeout was reset, this is expected
|
||||
pass
|
||||
|
||||
|
||||
async def handle_timeout(self):
|
||||
"""Handle when the game times out"""
|
||||
if self.timed_out or self.game.game_over:
|
||||
return
|
||||
|
||||
|
||||
self.timed_out = True
|
||||
self.game.game_over = True
|
||||
await _update_stats(self.player.id, False)
|
||||
|
|
@ -293,28 +259,26 @@ class MinesweeperView(discord.ui.View):
|
|||
try:
|
||||
await self.message.edit(embed=embed, view=self)
|
||||
except discord.errors.NotFound:
|
||||
# Message was deleted
|
||||
# message was deleted
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating timed out game: {e}")
|
||||
|
||||
|
||||
def reset_timeout(self):
|
||||
"""Reset the timeout timer - called when a move is made"""
|
||||
if not self.game.game_over and not self.timed_out:
|
||||
asyncio.create_task(self.start_timeout())
|
||||
|
||||
|
||||
@discord.ui.button(label="Forfeit", style=discord.ButtonStyle.danger, emoji="🏳️")
|
||||
async def forfeit(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.player.id:
|
||||
await interaction.response.send_message("Only the player can forfeit!", ephemeral=True)
|
||||
return
|
||||
|
||||
|
||||
self.game.game_over = True
|
||||
|
||||
# Cancel timeout task
|
||||
|
||||
if self.timeout_task and not self.timeout_task.done():
|
||||
self.timeout_task.cancel()
|
||||
|
||||
|
||||
for child in self.children:
|
||||
child.disabled = True
|
||||
self.stop()
|
||||
|
|
@ -325,7 +289,7 @@ class MinesweeperView(discord.ui.View):
|
|||
embed.title = "💀 Game Over - Forfeited"
|
||||
|
||||
await interaction.response.edit_message(embed=embed, view=self)
|
||||
|
||||
|
||||
@discord.ui.button(label="How to Play", style=discord.ButtonStyle.secondary, emoji="❓")
|
||||
async def help_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
help_embed = discord.Embed(
|
||||
|
|
@ -359,11 +323,10 @@ class MinesweeperView(discord.ui.View):
|
|||
),
|
||||
inline=False
|
||||
)
|
||||
|
||||
|
||||
await interaction.response.send_message(embed=help_embed, ephemeral=True)
|
||||
|
||||
|
||||
def create_embed(self) -> discord.Embed:
|
||||
"""Create the game embed"""
|
||||
if self.game.game_over:
|
||||
if self.game.won:
|
||||
embed = discord.Embed(
|
||||
|
|
@ -383,27 +346,25 @@ class MinesweeperView(discord.ui.View):
|
|||
description=f"{self.player.mention}'s game",
|
||||
color=get_embed_color(self.player.id)
|
||||
)
|
||||
|
||||
|
||||
board = self.game.render_board()
|
||||
# Discord has a 4096 character limit for embed descriptions
|
||||
# Split board if needed
|
||||
# Discord embed fields max out around 1024 chars, board can get big
|
||||
if len(board) < 1900:
|
||||
embed.add_field(name="Board", value=board, inline=False)
|
||||
else:
|
||||
# Split into chunks
|
||||
lines = board.split('\n')
|
||||
chunk_size = len(lines) // 2
|
||||
chunk1 = '\n'.join(lines[:chunk_size])
|
||||
chunk2 = '\n'.join(lines[chunk_size:])
|
||||
embed.add_field(name="Board (1/2)", value=chunk1, inline=False)
|
||||
embed.add_field(name="Board (2/2)", value=chunk2, inline=False)
|
||||
|
||||
|
||||
stats = self.game.get_stats()
|
||||
embed.add_field(name="Stats", value=stats, inline=False)
|
||||
|
||||
|
||||
if not self.game.game_over:
|
||||
embed.set_footer(text="Send your move in chat: [column number] [row letter] [flag] | Timeout resets with each move (30min)")
|
||||
|
||||
|
||||
return embed
|
||||
|
||||
|
||||
|
|
@ -411,7 +372,7 @@ class Minesweeper(commands.Cog):
|
|||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.active_games = {} # channel_id: (player_id, MinesweeperView)
|
||||
|
||||
|
||||
@app_commands.command(name="minesweeper", description="Start a game of Minesweeper")
|
||||
@app_commands.describe(
|
||||
difficulty="Choose difficulty level (Easy: 13x13 with 20 mines, Medium: 13x13 with 35 mines, Hard: 13x13 with 50 mines)"
|
||||
|
|
@ -422,86 +383,71 @@ class Minesweeper(commands.Cog):
|
|||
app_commands.Choice(name="Hard (50 mines)", value=3)
|
||||
])
|
||||
async def minesweeper(self, interaction: discord.Interaction, difficulty: int = 1):
|
||||
# Check if there's already an active game in this channel
|
||||
if interaction.channel_id in self.active_games:
|
||||
player_id, _ = self.active_games[interaction.channel_id]
|
||||
await interaction.response.send_message(
|
||||
f"There's already an active game in this channel! Please wait for it to finish.",
|
||||
ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
# Set difficulty
|
||||
|
||||
mine_counts = {1: 20, 2: 35, 3: 50}
|
||||
mines = mine_counts.get(difficulty, 20)
|
||||
|
||||
|
||||
await interaction.response.defer()
|
||||
|
||||
|
||||
game = MinesweeperGame(rows=13, cols=13, mines=mines)
|
||||
view = MinesweeperView(interaction.user, game)
|
||||
|
||||
|
||||
embed = view.create_embed()
|
||||
message = await interaction.followup.send(embed=embed, view=view)
|
||||
view.message = message
|
||||
|
||||
# Start the timeout timer
|
||||
|
||||
await view.start_timeout()
|
||||
|
||||
|
||||
self.active_games[interaction.channel_id] = (interaction.user.id, view)
|
||||
|
||||
# Wait for the game to finish
|
||||
|
||||
await view.wait()
|
||||
|
||||
# Clean up
|
||||
|
||||
if interaction.channel_id in self.active_games:
|
||||
del self.active_games[interaction.channel_id]
|
||||
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.Message):
|
||||
# Ignore bot messages
|
||||
if message.author.bot:
|
||||
return
|
||||
|
||||
# Check if there's an active game in this channel
|
||||
|
||||
if message.channel.id not in self.active_games:
|
||||
return
|
||||
|
||||
|
||||
player_id, view = self.active_games[message.channel.id]
|
||||
|
||||
# Check if message is from the player
|
||||
|
||||
if message.author.id != player_id:
|
||||
return
|
||||
|
||||
# Check if game is over
|
||||
|
||||
if view.game.game_over:
|
||||
return
|
||||
|
||||
|
||||
move = self.parse_move(message.content)
|
||||
|
||||
|
||||
if move is None:
|
||||
# Invalid format, don't delete the message - it's not an input attempt
|
||||
# not a move attempt, leave the message alone
|
||||
return
|
||||
|
||||
|
||||
col, row, is_flag = move
|
||||
|
||||
# Valid input - delete the player's message to reduce clutter
|
||||
|
||||
try:
|
||||
await message.delete()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Make the move
|
||||
|
||||
if is_flag:
|
||||
view.game.toggle_flag(row, col)
|
||||
else:
|
||||
view.game.reveal(row, col)
|
||||
|
||||
# Reset the timeout timer since a valid move was made
|
||||
|
||||
view.reset_timeout()
|
||||
|
||||
# Update the embed
|
||||
|
||||
if view.game.game_over:
|
||||
# Cancel timeout task since game is over
|
||||
if view.timeout_task and not view.timeout_task.done():
|
||||
view.timeout_task.cancel()
|
||||
|
||||
|
|
@ -510,41 +456,32 @@ class Minesweeper(commands.Cog):
|
|||
for child in view.children:
|
||||
child.disabled = True
|
||||
view.stop()
|
||||
|
||||
|
||||
embed = view.create_embed()
|
||||
try:
|
||||
await view.message.edit(embed=embed, view=view)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def parse_move(self, text: str) -> tuple[int, int, bool] | None:
|
||||
"""
|
||||
Parse a move from text.
|
||||
Returns (col, row, is_flag) or None if invalid.
|
||||
Format: [column number] [row letter] [flag]
|
||||
Examples: "1 A", "6 B F", "7D FLAG", "2Hf"
|
||||
"""
|
||||
text = text.upper().strip()
|
||||
|
||||
# Try to match pattern with optional spaces and flag
|
||||
# Pattern: number (1-13), letter (A-M), optional flag
|
||||
|
||||
pattern = r'(\d+)\s*([A-M])\s*(F|FLAG)?'
|
||||
match = re.match(pattern, text)
|
||||
|
||||
|
||||
if not match:
|
||||
return None
|
||||
|
||||
|
||||
col_str, row_letter, flag = match.groups()
|
||||
|
||||
col = int(col_str) - 1 # Convert to 0-indexed
|
||||
row = ord(row_letter) - ord('A') # Convert letter to 0-indexed number
|
||||
|
||||
# Validate
|
||||
|
||||
col = int(col_str) - 1
|
||||
row = ord(row_letter) - ord('A')
|
||||
|
||||
if not (0 <= col < 13 and 0 <= row < 13):
|
||||
return None
|
||||
|
||||
|
||||
is_flag = flag is not None
|
||||
|
||||
|
||||
return (col, row, is_flag)
|
||||
|
||||
|
||||
|
|
@ -587,6 +524,31 @@ class Minesweeper(commands.Cog):
|
|||
embed.add_field(name="Best Streak", value=str(max_streak))
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
@app_commands.command(name="minesweeper_serverstats", description="View server-wide Minesweeper stats")
|
||||
async def minesweeper_serverstats(self, interaction: discord.Interaction):
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT played, wins FROM minesweeper_stats")
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
await interaction.response.send_message("Nobody has played Minesweeper yet!", ephemeral=True)
|
||||
return
|
||||
|
||||
total_played = sum(r[0] for r in rows)
|
||||
total_wins = sum(r[1] for r in rows)
|
||||
total_losses = total_played - total_wins
|
||||
win_rate = round(total_wins / total_played * 100, 1) if total_played else 0
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Server Minesweeper Stats",
|
||||
color=get_embed_color(interaction.user.id)
|
||||
)
|
||||
embed.add_field(name="Total Games Played", value=str(total_played))
|
||||
embed.add_field(name="Total Wins", value=str(total_wins))
|
||||
embed.add_field(name="Total Losses", value=str(total_losses))
|
||||
embed.add_field(name="Overall Win Rate", value=f"{win_rate}%")
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Minesweeper(bot))
|
||||
await bot.add_cog(Minesweeper(bot))
|
||||
|
|
@ -9,27 +9,23 @@ class TicTacToeButton(discord.ui.Button):
|
|||
super().__init__(style=discord.ButtonStyle.secondary, label='\u200b', row=y)
|
||||
self.x = x
|
||||
self.y = y
|
||||
|
||||
|
||||
async def callback(self, interaction: discord.Interaction):
|
||||
view: TicTacToeView = self.view
|
||||
|
||||
# Check if it's the correct player's turn
|
||||
if interaction.user.id != view.current_player.id:
|
||||
await interaction.response.send_message("It's not your turn!", ephemeral=True)
|
||||
return
|
||||
|
||||
# Check if the spot is already taken
|
||||
|
||||
if view.board[self.y][self.x] != 0:
|
||||
await interaction.response.send_message("That spot is already taken!", ephemeral=True)
|
||||
return
|
||||
|
||||
# Make the move
|
||||
|
||||
view.board[self.y][self.x] = view.current_mark
|
||||
self.label = view.current_symbol
|
||||
self.style = discord.ButtonStyle.primary if view.current_mark == 1 else discord.ButtonStyle.danger
|
||||
self.disabled = True
|
||||
|
||||
# Check for winner
|
||||
winner = view.check_winner()
|
||||
if winner:
|
||||
for child in view.children:
|
||||
|
|
@ -42,8 +38,7 @@ class TicTacToeButton(discord.ui.Button):
|
|||
winner_user = view.player1 if winner == 1 else view.player2
|
||||
await interaction.response.edit_message(content=f"{winner_user.mention} wins! 🎉", view=view)
|
||||
return
|
||||
|
||||
# Switch turns
|
||||
|
||||
view.current_mark = 2 if view.current_mark == 1 else 1
|
||||
view.current_player = view.player2 if view.current_mark == 2 else view.player1
|
||||
view.current_symbol = "O" if view.current_mark == 2 else "X"
|
||||
|
|
@ -55,11 +50,10 @@ class TicTacToeButton(discord.ui.Button):
|
|||
|
||||
class TicTacToeView(discord.ui.View):
|
||||
def __init__(self, player1: discord.Member, player2: discord.Member):
|
||||
super().__init__(timeout=300) # 5 minute timeout
|
||||
super().__init__(timeout=300) # 5 minute timeout
|
||||
self.player1 = player1
|
||||
self.player2 = player2
|
||||
|
||||
# Randomly decide who goes first
|
||||
self.current_mark = 1
|
||||
self.current_player = player1
|
||||
self.current_symbol = "X"
|
||||
|
|
@ -67,7 +61,6 @@ class TicTacToeView(discord.ui.View):
|
|||
# 0 = empty, 1 = player1 (X), 2 = player2 (O)
|
||||
self.board = [[0, 0, 0], [0, 0, 0], [0, 0, 0]]
|
||||
|
||||
# Create the 3x3 grid of buttons
|
||||
for y in range(3):
|
||||
for x in range(3):
|
||||
self.add_item(TicTacToeButton(x, y))
|
||||
|
|
@ -77,24 +70,24 @@ class TicTacToeView(discord.ui.View):
|
|||
for row in self.board:
|
||||
if row[0] == row[1] == row[2] != 0:
|
||||
return row[0]
|
||||
|
||||
|
||||
# Check columns
|
||||
for col in range(3):
|
||||
if self.board[0][col] == self.board[1][col] == self.board[2][col] != 0:
|
||||
return self.board[0][col]
|
||||
|
||||
# Check diagonals
|
||||
|
||||
# Check diagonals
|
||||
if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
|
||||
return self.board[0][0]
|
||||
if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
|
||||
return self.board[0][2]
|
||||
|
||||
|
||||
# Check for tie
|
||||
if all(self.board[y][x] != 0 for y in range(3) for x in range(3)):
|
||||
return 3
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def on_timeout(self):
|
||||
for child in self.children:
|
||||
child.disabled = True
|
||||
|
|
@ -105,22 +98,22 @@ class ConfirmView(discord.ui.View):
|
|||
super().__init__(timeout=60)
|
||||
self.player2 = player2
|
||||
self.value = None
|
||||
|
||||
|
||||
@discord.ui.button(label="Accept", style=discord.ButtonStyle.green)
|
||||
async def accept(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.player2.id:
|
||||
await interaction.response.send_message("Only the challenged player can accept!", ephemeral=True)
|
||||
return
|
||||
|
||||
|
||||
self.value = True
|
||||
self.stop()
|
||||
|
||||
|
||||
@discord.ui.button(label="Decline", style=discord.ButtonStyle.red)
|
||||
async def decline(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||||
if interaction.user.id != self.player2.id:
|
||||
await interaction.response.send_message("Only the challenged player can decline!", ephemeral=True)
|
||||
return
|
||||
|
||||
|
||||
self.value = False
|
||||
self.stop()
|
||||
|
||||
|
|
@ -131,24 +124,20 @@ class TicTacToe(commands.Cog):
|
|||
@app_commands.command(name="tictactoe", description="Start a game of Tic-Tac-Toe with another user")
|
||||
@app_commands.describe(user="The user you want to play against")
|
||||
async def tictactoe(self, interaction: discord.Interaction, user: discord.Member):
|
||||
# Check if user is trying to play against themselves
|
||||
if user.id == interaction.user.id:
|
||||
await interaction.response.send_message("You can't play against yourself!", ephemeral=True)
|
||||
return
|
||||
|
||||
# Check if user is a bot
|
||||
|
||||
if user.bot:
|
||||
await interaction.response.send_message("You can't play against a bot!", ephemeral=True)
|
||||
return
|
||||
|
||||
# Send confirmation request
|
||||
|
||||
confirm_view = ConfirmView(user)
|
||||
await interaction.response.send_message(
|
||||
f"{user.mention}, {interaction.user.mention} has challenged you to Tic-Tac-Toe! Do you accept?",
|
||||
view=confirm_view
|
||||
)
|
||||
|
||||
# Wait for a response
|
||||
await confirm_view.wait()
|
||||
|
||||
if confirm_view.value is None:
|
||||
|
|
@ -157,18 +146,16 @@ class TicTacToe(commands.Cog):
|
|||
view=None
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
if not confirm_view.value:
|
||||
await interaction.edit_original_response(
|
||||
content=f"{user.mention} declined the challenge.",
|
||||
view=None
|
||||
)
|
||||
return
|
||||
|
||||
# Start the game
|
||||
|
||||
game_view = TicTacToeView(interaction.user, user)
|
||||
|
||||
# Randomly decide who goes first
|
||||
if random.choice([True, False]):
|
||||
game_view.current_player = user
|
||||
game_view.current_mark = 2
|
||||
|
|
@ -180,4 +167,4 @@ class TicTacToe(commands.Cog):
|
|||
)
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(TicTacToe(bot))
|
||||
await bot.add_cog(TicTacToe(bot))
|
||||
|
|
@ -13,7 +13,7 @@ logger = get_logger(__name__)
|
|||
|
||||
ADMIN_ID = 252130669919076352
|
||||
ADMIN_CHANNEL_ID = 1470441786810826884
|
||||
ADMIN_ROLE_ID = 1470439484549234866
|
||||
BOT_DEV_ROLE_ID = 1470439484549234866
|
||||
|
||||
|
||||
class DenyModal(discord.ui.Modal, title="Reason for denying suggestion"):
|
||||
|
|
@ -31,9 +31,8 @@ class DenyModal(discord.ui.Modal, title="Reason for denying suggestion"):
|
|||
|
||||
async def on_submit(self, interaction: discord.Interaction):
|
||||
try:
|
||||
# Defer the response immediately to prevent timeout
|
||||
await interaction.response.defer(ephemeral=True)
|
||||
|
||||
|
||||
reason_text = self.reason.value or None
|
||||
|
||||
db_path = Path(__file__).parent.parent / "data" / "suggestions.db"
|
||||
|
|
@ -41,7 +40,6 @@ class DenyModal(discord.ui.Modal, title="Reason for denying suggestion"):
|
|||
await db.execute("UPDATE suggestions SET status = ?, reason = ? WHERE id = ?", ("Denied", reason_text, self.suggestion_id))
|
||||
await db.commit()
|
||||
|
||||
# Use followup instead of response since we deferred
|
||||
await interaction.followup.send(f"❌ Suggestion #{self.suggestion_id} denied.", ephemeral=True)
|
||||
|
||||
if self.admin_message_id:
|
||||
|
|
@ -49,36 +47,33 @@ class DenyModal(discord.ui.Modal, title="Reason for denying suggestion"):
|
|||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if admin_channel:
|
||||
orig_msg = await admin_channel.fetch_message(self.admin_message_id)
|
||||
|
||||
# Update embed
|
||||
|
||||
updated_embed = self.original_embed.copy()
|
||||
updated_embed.color = discord.Color.red()
|
||||
updated_embed.title = f"❌ Denied Suggestion (ID: {self.suggestion_id})"
|
||||
|
||||
# Add or update status field
|
||||
|
||||
updated_embed.set_field_at(0, name="Suggested by", value=updated_embed.fields[0].value, inline=True)
|
||||
updated_embed.set_field_at(1, name="Channel", value=updated_embed.fields[1].value, inline=True)
|
||||
updated_embed.add_field(name="Status", value="Denied", inline=False)
|
||||
updated_embed.add_field(name="Denied by", value=f"{interaction.user.mention}", inline=True)
|
||||
updated_embed.add_field(name="Denied at", value=f"<t:{int(datetime.utcnow().timestamp())}:F>", inline=True)
|
||||
|
||||
|
||||
if reason_text:
|
||||
updated_embed.add_field(name="Reason", value=reason_text, inline=False)
|
||||
|
||||
|
||||
disabled_view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
disabled=True
|
||||
)
|
||||
await orig_msg.edit(embed=updated_embed, view=disabled_view)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to edit admin message: {e}")
|
||||
|
||||
# Send DM to user
|
||||
try:
|
||||
user = await self.bot.fetch_user(self.user_id)
|
||||
dm_note = f"❌ Your suggestion (ID: {self.suggestion_id}) — `{self.suggestion_text}` has been **denied**."
|
||||
|
|
@ -88,7 +83,6 @@ class DenyModal(discord.ui.Modal, title="Reason for denying suggestion"):
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to DM user: {e}")
|
||||
|
||||
# Send message in original channel
|
||||
channel = self.bot.get_channel(self.channel_id)
|
||||
if channel:
|
||||
try:
|
||||
|
|
@ -122,7 +116,6 @@ class SuggestionButtons(discord.ui.View):
|
|||
deny_cid = f"suggest_deny_{suggestion_id}" if suggestion_id else "suggest_deny"
|
||||
complete_cid = f"suggest_complete_{suggestion_id}" if suggestion_id else "suggest_complete"
|
||||
|
||||
# Only show approve/deny if not disabled
|
||||
if not show_complete:
|
||||
approve_btn = discord.ui.Button(label="Approve ✅", style=discord.ButtonStyle.success, custom_id=approve_cid, disabled=disabled)
|
||||
approve_btn.callback = self.approve
|
||||
|
|
@ -132,19 +125,17 @@ class SuggestionButtons(discord.ui.View):
|
|||
deny_btn.callback = self.deny
|
||||
self.add_item(deny_btn)
|
||||
else:
|
||||
# Show complete button for approved suggestions
|
||||
complete_btn = discord.ui.Button(label="Mark Complete 🎉", style=discord.ButtonStyle.primary, custom_id=complete_cid, disabled=disabled)
|
||||
complete_btn.callback = self.complete
|
||||
self.add_item(complete_btn)
|
||||
|
||||
async def approve(self, interaction: discord.Interaction):
|
||||
try:
|
||||
# Check if user has admin role or is the admin user
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == ADMIN_ROLE_ID for role in interaction.user.roles)
|
||||
any(role.id == BOT_DEV_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
|
||||
|
||||
if not has_permission:
|
||||
await interaction.response.send_message("You can't approve suggestions.", ephemeral=True)
|
||||
return
|
||||
|
|
@ -153,7 +144,6 @@ class SuggestionButtons(discord.ui.View):
|
|||
await interaction.response.send_message("⚠️ This button is no longer active.", ephemeral=True)
|
||||
return
|
||||
|
||||
# Defer immediately to prevent timeout
|
||||
await interaction.response.defer(ephemeral=True)
|
||||
|
||||
db_path = Path(__file__).parent.parent / "data" / "suggestions.db"
|
||||
|
|
@ -161,7 +151,6 @@ class SuggestionButtons(discord.ui.View):
|
|||
await db.execute("UPDATE suggestions SET status = ? WHERE id = ?", ("Approved", self.suggestion_id))
|
||||
await db.commit()
|
||||
|
||||
# Use followup since we deferred
|
||||
await interaction.followup.send(f"✅ Suggestion #{self.suggestion_id} approved.", ephemeral=True)
|
||||
|
||||
if self.admin_message_id:
|
||||
|
|
@ -169,25 +158,22 @@ class SuggestionButtons(discord.ui.View):
|
|||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if admin_channel:
|
||||
orig_msg = await admin_channel.fetch_message(self.admin_message_id)
|
||||
|
||||
# Update embed
|
||||
|
||||
updated_embed = orig_msg.embeds[0].copy()
|
||||
updated_embed.color = discord.Color.green()
|
||||
updated_embed.title = f"✅ Approved Suggestion (ID: {self.suggestion_id})"
|
||||
|
||||
# Add status fields
|
||||
|
||||
updated_embed.add_field(name="Status", value="Approved", inline=False)
|
||||
updated_embed.add_field(name="Approved by", value=f"{interaction.user.mention}", inline=True)
|
||||
updated_embed.add_field(name="Approved at", value=f"<t:{int(datetime.utcnow().timestamp())}:F>", inline=True)
|
||||
|
||||
# Show complete button instead of approve/deny
|
||||
|
||||
complete_view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
disabled=False,
|
||||
show_complete=True
|
||||
)
|
||||
|
|
@ -195,14 +181,12 @@ class SuggestionButtons(discord.ui.View):
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to edit admin message: {e}")
|
||||
|
||||
# Send DM to user
|
||||
try:
|
||||
user = await self.bot.fetch_user(self.user_id)
|
||||
await user.send(f"✅ Your suggestion (ID: {self.suggestion_id}) — `{self.suggestion_text}` has been **approved!**")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to DM user: {e}")
|
||||
|
||||
# Send message in original channel
|
||||
channel = self.bot.get_channel(self.channel_id)
|
||||
if channel:
|
||||
try:
|
||||
|
|
@ -223,12 +207,11 @@ class SuggestionButtons(discord.ui.View):
|
|||
|
||||
async def deny(self, interaction: discord.Interaction):
|
||||
try:
|
||||
# Check if user has admin role or is the admin user
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == ADMIN_ROLE_ID for role in interaction.user.roles)
|
||||
any(role.id == BOT_DEV_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
|
||||
|
||||
if not has_permission:
|
||||
await interaction.response.send_message("You can't deny suggestions.", ephemeral=True)
|
||||
return
|
||||
|
|
@ -237,7 +220,6 @@ class SuggestionButtons(discord.ui.View):
|
|||
await interaction.response.send_message("⚠️ This button is no longer active.", ephemeral=True)
|
||||
return
|
||||
|
||||
# Get the original embed to pass to the modal
|
||||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if admin_channel and self.admin_message_id:
|
||||
try:
|
||||
|
|
@ -249,11 +231,11 @@ class SuggestionButtons(discord.ui.View):
|
|||
original_embed = None
|
||||
|
||||
modal = DenyModal(
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
bot=self.bot,
|
||||
original_embed=original_embed
|
||||
)
|
||||
|
|
@ -271,12 +253,11 @@ class SuggestionButtons(discord.ui.View):
|
|||
|
||||
async def complete(self, interaction: discord.Interaction):
|
||||
try:
|
||||
# Check if user has admin role or is the admin user
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == ADMIN_ROLE_ID for role in interaction.user.roles)
|
||||
any(role.id == BOT_DEV_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
|
||||
|
||||
if not has_permission:
|
||||
await interaction.response.send_message("You can't mark suggestions as complete.", ephemeral=True)
|
||||
return
|
||||
|
|
@ -285,22 +266,19 @@ class SuggestionButtons(discord.ui.View):
|
|||
await interaction.response.send_message("⚠️ This button is no longer active.", ephemeral=True)
|
||||
return
|
||||
|
||||
# Defer immediately to prevent timeout
|
||||
await interaction.response.defer(ephemeral=True)
|
||||
|
||||
db_path = Path(__file__).parent.parent / "data" / "suggestions.db"
|
||||
async with aiosqlite.connect(db_path) as db:
|
||||
# Check if it's approved
|
||||
async with db.execute("SELECT status FROM suggestions WHERE id = ?", (self.suggestion_id,)) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if not row or row[0] != "Approved":
|
||||
await interaction.followup.send("⚠️ This suggestion must be approved before marking as complete.", ephemeral=True)
|
||||
return
|
||||
|
||||
|
||||
await db.execute("UPDATE suggestions SET status = ? WHERE id = ?", ("Completed", self.suggestion_id))
|
||||
await db.commit()
|
||||
|
||||
# Use followup since we deferred
|
||||
await interaction.followup.send(f"🎉 Suggestion #{self.suggestion_id} marked as completed!", ephemeral=True)
|
||||
|
||||
if self.admin_message_id:
|
||||
|
|
@ -308,30 +286,26 @@ class SuggestionButtons(discord.ui.View):
|
|||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if admin_channel:
|
||||
orig_msg = await admin_channel.fetch_message(self.admin_message_id)
|
||||
|
||||
# Update embed
|
||||
|
||||
updated_embed = orig_msg.embeds[0].copy()
|
||||
updated_embed.color = discord.Color.blue()
|
||||
updated_embed.title = f"🎉 Completed Suggestion (ID: {self.suggestion_id})"
|
||||
|
||||
# Update status field (should be at index 2)
|
||||
|
||||
for i, field in enumerate(updated_embed.fields):
|
||||
if field.name == "Status":
|
||||
updated_embed.set_field_at(i, name="Status", value="Completed", inline=field.inline)
|
||||
break
|
||||
|
||||
# Add completion info
|
||||
|
||||
updated_embed.add_field(name="Completed by", value=f"{interaction.user.mention}", inline=True)
|
||||
updated_embed.add_field(name="Completed at", value=f"<t:{int(datetime.utcnow().timestamp())}:F>", inline=True)
|
||||
|
||||
# Disable the complete button
|
||||
|
||||
disabled_view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
self.bot,
|
||||
suggestion_id=self.suggestion_id,
|
||||
user_id=self.user_id,
|
||||
suggestion_text=self.suggestion_text,
|
||||
channel_id=self.channel_id,
|
||||
admin_message_id=self.admin_message_id,
|
||||
disabled=True,
|
||||
show_complete=True
|
||||
)
|
||||
|
|
@ -339,14 +313,12 @@ class SuggestionButtons(discord.ui.View):
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to edit admin message: {e}")
|
||||
|
||||
# Send DM to user
|
||||
try:
|
||||
user = await self.bot.fetch_user(self.user_id)
|
||||
await user.send(f"🎉 Your suggestion (ID: {self.suggestion_id}) — `{self.suggestion_text}` has been **implemented!**")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to DM user: {e}")
|
||||
|
||||
# Send message in original channel
|
||||
channel = self.bot.get_channel(self.channel_id)
|
||||
if channel:
|
||||
try:
|
||||
|
|
@ -404,7 +376,6 @@ class PaginationView(discord.ui.View):
|
|||
|
||||
|
||||
class Suggestion(commands.GroupCog, name="suggest"):
|
||||
"""Suggestion commands"""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
|
@ -426,18 +397,17 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
""")
|
||||
await self.db.commit()
|
||||
|
||||
# Re-register persistent views for all pending and approved suggestions
|
||||
async with self.db.execute("SELECT id, user_id, suggestion, channel_id, admin_message_id, status FROM suggestions WHERE status IN (?, ?)", ("Pending", "Approved")) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
for sid, uid, suggestion_text, channel_id, admin_msg_id, status in rows:
|
||||
if admin_msg_id: # Only register if we have a message ID
|
||||
if admin_msg_id:
|
||||
view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id=sid,
|
||||
user_id=uid,
|
||||
suggestion_text=suggestion_text,
|
||||
channel_id=channel_id,
|
||||
self.bot,
|
||||
suggestion_id=sid,
|
||||
user_id=uid,
|
||||
suggestion_text=suggestion_text,
|
||||
channel_id=channel_id,
|
||||
admin_message_id=admin_msg_id,
|
||||
show_complete=(status == "Approved")
|
||||
)
|
||||
|
|
@ -482,13 +452,12 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
await self.db.execute("UPDATE suggestions SET admin_message_id = ? WHERE id = ?", (sent.id, suggestion_id))
|
||||
await self.db.commit()
|
||||
|
||||
# Register the persistent view with the message_id
|
||||
persistent_view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id,
|
||||
interaction.user.id,
|
||||
idea,
|
||||
interaction.channel_id,
|
||||
self.bot,
|
||||
suggestion_id,
|
||||
interaction.user.id,
|
||||
idea,
|
||||
interaction.channel_id,
|
||||
admin_message_id=sent.id
|
||||
)
|
||||
self.bot.add_view(persistent_view, message_id=sent.id)
|
||||
|
|
@ -522,25 +491,25 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
"Denied": discord.Color.red(),
|
||||
"Completed": discord.Color.blue()
|
||||
}
|
||||
|
||||
|
||||
embed = discord.Embed(
|
||||
title=f"Suggestion #{suggestion_id}",
|
||||
description=suggestion_text,
|
||||
color=status_colors.get(status, discord.Color.greyple())
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
user = await self.bot.fetch_user(user_id)
|
||||
embed.add_field(name="Suggested by", value=f"{user.mention} ({user})", inline=True)
|
||||
except Exception:
|
||||
embed.add_field(name="Suggested by", value=f"<@{user_id}>", inline=True)
|
||||
|
||||
|
||||
embed.add_field(name="Status", value=status, inline=True)
|
||||
embed.add_field(name="Channel", value=f"<#{channel_id}>", inline=True)
|
||||
|
||||
|
||||
if reason:
|
||||
embed.add_field(name="Reason", value=reason, inline=False)
|
||||
|
||||
|
||||
await interaction.followup.send(embed=embed)
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -553,12 +522,11 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
await interaction.response.defer(ephemeral=False)
|
||||
|
||||
try:
|
||||
# Check if user has admin role or is the admin user
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == ADMIN_ROLE_ID for role in interaction.user.roles)
|
||||
any(role.id == BOT_DEV_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
|
||||
|
||||
if not has_permission:
|
||||
await interaction.followup.send("You don't have permission to do that.")
|
||||
return
|
||||
|
|
@ -580,36 +548,31 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
|
||||
await interaction.followup.send(f"✅ Suggestion #{suggestion_id} marked as completed!")
|
||||
|
||||
# Update the embed if there's an admin message
|
||||
if admin_message_id:
|
||||
try:
|
||||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if admin_channel:
|
||||
orig_msg = await admin_channel.fetch_message(admin_message_id)
|
||||
|
||||
# Update embed
|
||||
|
||||
updated_embed = orig_msg.embeds[0].copy()
|
||||
updated_embed.color = discord.Color.blue()
|
||||
updated_embed.title = f"🎉 Completed Suggestion (ID: {suggestion_id})"
|
||||
|
||||
# Update status field
|
||||
|
||||
for i, field in enumerate(updated_embed.fields):
|
||||
if field.name == "Status":
|
||||
updated_embed.set_field_at(i, name="Status", value="Completed", inline=field.inline)
|
||||
break
|
||||
|
||||
# Add completion info
|
||||
|
||||
updated_embed.add_field(name="Completed by", value=f"{interaction.user.mention}", inline=True)
|
||||
updated_embed.add_field(name="Completed at", value=f"<t:{int(datetime.utcnow().timestamp())}:F>", inline=True)
|
||||
|
||||
# Disable the complete button
|
||||
|
||||
disabled_view = SuggestionButtons(
|
||||
self.bot,
|
||||
suggestion_id=suggestion_id,
|
||||
user_id=user_id,
|
||||
suggestion_text=suggestion_text,
|
||||
channel_id=channel_id,
|
||||
admin_message_id=admin_message_id,
|
||||
self.bot,
|
||||
suggestion_id=suggestion_id,
|
||||
user_id=user_id,
|
||||
suggestion_text=suggestion_text,
|
||||
channel_id=channel_id,
|
||||
admin_message_id=admin_message_id,
|
||||
disabled=True,
|
||||
show_complete=True
|
||||
)
|
||||
|
|
@ -685,28 +648,21 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
await interaction.response.defer(ephemeral=False)
|
||||
|
||||
try:
|
||||
# If no member specified, show for the command user
|
||||
target_user = member if member else interaction.user
|
||||
|
||||
# Check if user has admin role or is the admin user when checking others
|
||||
if member and member != interaction.user:
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == ADMIN_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
|
||||
if not has_permission:
|
||||
await interaction.followup.send("You can only view your own to-do list.", ephemeral=True)
|
||||
return
|
||||
has_permission = (
|
||||
interaction.user.id == ADMIN_ID or
|
||||
any(role.id == BOT_DEV_ROLE_ID for role in interaction.user.roles)
|
||||
)
|
||||
if not has_permission:
|
||||
await interaction.followup.send("This command is restricted to Lacie bot devs.", ephemeral=True)
|
||||
return
|
||||
|
||||
target_user = member if member else interaction.user
|
||||
|
||||
# We need to track who approved what, so we'll need to update the database schema
|
||||
# For now, we'll fetch from the admin message embeds
|
||||
admin_channel = self.bot.get_channel(ADMIN_CHANNEL_ID)
|
||||
if not admin_channel:
|
||||
await interaction.followup.send("❌ Admin channel not found.")
|
||||
return
|
||||
|
||||
# Get all approved suggestions
|
||||
async with self.db.execute(
|
||||
"SELECT id, user_id, suggestion, admin_message_id FROM suggestions WHERE status = ? ORDER BY id DESC",
|
||||
("Approved",)
|
||||
|
|
@ -717,7 +673,6 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
await interaction.followup.send(f"No approved suggestions found.")
|
||||
return
|
||||
|
||||
# Filter by who approved them
|
||||
user_approved = []
|
||||
for sid, uid, suggestion_text, admin_msg_id in rows:
|
||||
if admin_msg_id:
|
||||
|
|
@ -725,7 +680,6 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
msg = await admin_channel.fetch_message(admin_msg_id)
|
||||
if msg.embeds:
|
||||
embed = msg.embeds[0]
|
||||
# Check if target_user approved this
|
||||
for field in embed.fields:
|
||||
if field.name == "Approved by" and target_user.mention in field.value:
|
||||
user_approved.append((sid, uid, suggestion_text))
|
||||
|
|
@ -741,7 +695,6 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
await interaction.followup.send("You haven't approved any suggestions yet.")
|
||||
return
|
||||
|
||||
# Create paginated embeds
|
||||
embeds = []
|
||||
per_page = 10
|
||||
for i in range(0, len(user_approved), per_page):
|
||||
|
|
@ -751,14 +704,14 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
color=discord.Color.gold()
|
||||
)
|
||||
embed.set_thumbnail(url=target_user.display_avatar.url)
|
||||
|
||||
|
||||
for sid, uid, suggestion_text in user_approved[i:i+per_page]:
|
||||
embed.add_field(
|
||||
name=f"ID: {sid}",
|
||||
value=f"**Suggested by:** <@{uid}>\n**Idea:** {suggestion_text[:150]}{'...' if len(suggestion_text) > 150 else ''}",
|
||||
inline=False
|
||||
)
|
||||
|
||||
|
||||
embed.set_footer(text=f"Total approved: {len(user_approved)}")
|
||||
embeds.append(embed)
|
||||
|
||||
|
|
@ -772,4 +725,4 @@ class Suggestion(commands.GroupCog, name="suggest"):
|
|||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Suggestion(bot))
|
||||
await bot.add_cog(Suggestion(bot))
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ COMMIT_CHANNEL_IDS = [876777562599194644, 1437941632849940563, 14704417868108268
|
|||
# --- Roles ---
|
||||
BIRTHDAY_ROLE_ID = 1113751318918602762
|
||||
BOT_TRAP_ROLE_ID = 1439354601672282335
|
||||
ADMIN_ROLE_ID = 1470439484549234866
|
||||
SERVER_ADMIN_ROLE_ID = 952560403970416722
|
||||
BOT_DEV_ROLE_ID = 1470439484549234866
|
||||
|
||||
# --- Emojis ---
|
||||
SALT_EMOJI_ID = 1074583707459010560
|
||||
|
|
|
|||
134
wordle/wordle.py
134
wordle/wordle.py
|
|
@ -11,21 +11,16 @@ from utils.logger import get_logger
|
|||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Configuration constants
|
||||
WORD_LIST_URL = "https://raw.githubusercontent.com/tabatkins/wordle-list/main/words"
|
||||
WORDLE_DIR = "wordle"
|
||||
WORD_LIST_PATH = os.path.join(WORDLE_DIR, "words.txt")
|
||||
DB_PATH = Path(__file__).parent.parent / "data" / "wordle.db"
|
||||
|
||||
# Emoji squares for displaying guess results
|
||||
SQUARES = {"green": "🟩", "yellow": "🟨", "gray": "⬛"}
|
||||
|
||||
class Wordle(commands.Cog):
|
||||
"""A Discord bot cog that implements the Wordle word-guessing game."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
# Create wordle directory if it doesn't exist
|
||||
os.makedirs(WORDLE_DIR, exist_ok=True)
|
||||
|
||||
async def cog_load(self):
|
||||
|
|
@ -55,84 +50,56 @@ class Wordle(commands.Cog):
|
|||
await db.commit()
|
||||
|
||||
async def _ensure_wordlist(self):
|
||||
"""Download the word list if it doesn't exist, then load it into memory."""
|
||||
# Download word list from GitHub if not present
|
||||
if not os.path.exists(WORD_LIST_PATH):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(WORD_LIST_URL) as resp:
|
||||
words = await resp.text()
|
||||
with open(WORD_LIST_PATH, "w", encoding="utf-8") as f:
|
||||
f.write(words)
|
||||
|
||||
|
||||
with open(WORD_LIST_PATH, "r", encoding="utf-8") as f:
|
||||
self.words = [w.strip() for w in f.read().splitlines() if len(w.strip()) == 5]
|
||||
logger.info(f"Loaded {len(self.words)} words.")
|
||||
|
||||
def get_daily_word(self):
|
||||
"""
|
||||
Get the word of the day based on today's date.
|
||||
Uses the date's ordinal value to deterministically select a word.
|
||||
"""
|
||||
today = datetime.date.today()
|
||||
index = today.toordinal() % len(self.words)
|
||||
return self.words[index]
|
||||
|
||||
def compare_guess(self, guess, target):
|
||||
"""
|
||||
Compare a guess to the target word and return color results.
|
||||
Properly handles duplicate letters - only marks as many yellows/greens
|
||||
as actually exist in the target word.
|
||||
|
||||
Args:
|
||||
guess: The user's guessed word
|
||||
target: The correct word
|
||||
|
||||
Returns:
|
||||
List of colors: "green" (correct position), "yellow" (wrong position), "gray" (not in word)
|
||||
"""
|
||||
result = ["gray"] * 5
|
||||
target_chars = list(target)
|
||||
|
||||
# First pass: mark all greens (correct position)
|
||||
|
||||
for i, ch in enumerate(guess):
|
||||
if ch == target[i]:
|
||||
result[i] = "green"
|
||||
target_chars[i] = None # Mark this letter as used
|
||||
|
||||
# Second pass: mark yellows (wrong position)
|
||||
target_chars[i] = None
|
||||
|
||||
for i, ch in enumerate(guess):
|
||||
if result[i] == "gray" and ch in target_chars:
|
||||
result[i] = "yellow"
|
||||
target_chars[target_chars.index(ch)] = None # Mark this letter as used
|
||||
|
||||
target_chars[target_chars.index(ch)] = None
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_keyboard_display(self, guesses, target):
|
||||
"""
|
||||
Generate a visual keyboard showing the status of each letter.
|
||||
Green = correct position, Yellow = wrong position, Gray = not in word, White = unused.
|
||||
"""
|
||||
keyboard_rows = [
|
||||
"qwertyuiop",
|
||||
"asdfghjkl",
|
||||
"zxcvbnm"
|
||||
]
|
||||
|
||||
# Track the best status for each letter (green beats yellow beats gray)
|
||||
|
||||
letter_status = {}
|
||||
status_priority = {"green": 3, "yellow": 2, "gray": 1}
|
||||
|
||||
# Process all guesses to determine letter statuses
|
||||
|
||||
for guess in guesses:
|
||||
result = self.compare_guess(guess, target)
|
||||
for ch, status in zip(guess, result):
|
||||
current_priority = status_priority.get(letter_status.get(ch), 0)
|
||||
new_priority = status_priority[status]
|
||||
# Only update if the new status is better
|
||||
if new_priority > current_priority:
|
||||
letter_status[ch] = status
|
||||
|
||||
# Build the keyboard display with colored squares
|
||||
|
||||
display_lines = []
|
||||
for row in keyboard_rows:
|
||||
row_display = []
|
||||
|
|
@ -143,24 +110,21 @@ class Wordle(commands.Cog):
|
|||
row_display.append(f"{letter.upper()}🟩")
|
||||
elif status == "yellow":
|
||||
row_display.append(f"{letter.upper()}🟨")
|
||||
else: # gray
|
||||
else:
|
||||
row_display.append(f"{letter.upper()}⬛")
|
||||
else:
|
||||
# Letter hasn't been used yet
|
||||
row_display.append(f"{letter.upper()}⬜")
|
||||
display_lines.append(" ".join(row_display))
|
||||
|
||||
|
||||
return "\n".join(display_lines)
|
||||
|
||||
async def get_user_game(self, user_id):
|
||||
"""Retrieve the current game state for a user on today's date."""
|
||||
today = str(datetime.date.today())
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT guesses, finished FROM wordle_games WHERE user_id=? AND date=?", (user_id, today))
|
||||
return await cursor.fetchone()
|
||||
|
||||
async def update_user_game(self, user_id, guesses, finished):
|
||||
"""Save the current game state for a user."""
|
||||
today = str(datetime.date.today())
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
await db.execute("""
|
||||
|
|
@ -171,30 +135,23 @@ class Wordle(commands.Cog):
|
|||
await db.commit()
|
||||
|
||||
async def update_stats(self, user_id, win):
|
||||
"""
|
||||
Update user statistics after completing a game.
|
||||
Tracks wins, losses, and win streaks.
|
||||
"""
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT played, wins, current_streak, max_streak FROM wordle_stats WHERE user_id=?", (user_id,))
|
||||
row = await cursor.fetchone()
|
||||
|
||||
# Initialize stats if user hasn't played before
|
||||
|
||||
if not row:
|
||||
played, wins, streak, max_streak = 0, 0, 0, 0
|
||||
else:
|
||||
played, wins, streak, max_streak = row
|
||||
|
||||
# Update stats based on game result
|
||||
|
||||
played += 1
|
||||
if win:
|
||||
wins += 1
|
||||
streak += 1
|
||||
max_streak = max(max_streak, streak)
|
||||
else:
|
||||
streak = 0 # Reset streak on loss
|
||||
|
||||
# Save updated stats
|
||||
streak = 0
|
||||
|
||||
await db.execute("""
|
||||
INSERT INTO wordle_stats (user_id, played, wins, current_streak, max_streak)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
|
|
@ -204,11 +161,6 @@ class Wordle(commands.Cog):
|
|||
await db.commit()
|
||||
|
||||
async def handle_guess(self, interaction, guess):
|
||||
"""
|
||||
Process a user's guess and update game state accordingly.
|
||||
Handles validation, game completion, and progress updates.
|
||||
"""
|
||||
# Validate the guess
|
||||
if len(guess) != 5 or guess not in self.words:
|
||||
await interaction.response.send_message("That's not a valid word.", ephemeral=True)
|
||||
return
|
||||
|
|
@ -216,27 +168,22 @@ class Wordle(commands.Cog):
|
|||
user_id = interaction.user.id
|
||||
target = self.get_daily_word()
|
||||
row = await self.get_user_game(user_id)
|
||||
|
||||
# Parse existing guesses
|
||||
|
||||
guesses = [] if not row else (row[0].split(",") if row[0] else [])
|
||||
finished = row[1] if row else 0
|
||||
|
||||
# Check if game is already completed
|
||||
if finished:
|
||||
await interaction.response.send_message("You've already finished today's Wordle!", ephemeral=True)
|
||||
return
|
||||
|
||||
# Add new guess and evaluate it
|
||||
guesses.append(guess)
|
||||
result = self.compare_guess(guess, target)
|
||||
|
||||
# Win condition: guess matches target
|
||||
if guess == target:
|
||||
await self.update_user_game(user_id, ",".join(guesses), 1)
|
||||
await self.update_stats(user_id, True)
|
||||
await interaction.response.send_message(f"✅ You got it! Wordle {datetime.date.today()} solved in {len(guesses)} tries.", ephemeral=True)
|
||||
|
||||
# Post public result embed
|
||||
embed = discord.Embed(
|
||||
title=f"{interaction.user.display_name}'s Wordle {datetime.date.today()} Result",
|
||||
description="\n".join(["".join(SQUARES[c] for c in self.compare_guess(g, target)) for g in guesses]),
|
||||
|
|
@ -245,13 +192,11 @@ class Wordle(commands.Cog):
|
|||
await interaction.channel.send(embed=embed)
|
||||
return
|
||||
|
||||
# Loss condition: used all 6 guesses
|
||||
if len(guesses) >= 6:
|
||||
await self.update_user_game(user_id, ",".join(guesses), 1)
|
||||
await self.update_stats(user_id, False)
|
||||
await interaction.response.send_message(f"❌ You've used all 6 guesses. The word was **{target.upper()}**.", ephemeral=True)
|
||||
|
||||
# Post public result embed
|
||||
embed = discord.Embed(
|
||||
title=f"{interaction.user.display_name}'s Wordle {datetime.date.today()} Result",
|
||||
description="\n".join(["".join(SQUARES[c] for c in self.compare_guess(g, target)) for g in guesses]),
|
||||
|
|
@ -260,16 +205,14 @@ class Wordle(commands.Cog):
|
|||
await interaction.channel.send(embed=embed)
|
||||
return
|
||||
|
||||
# Game continues: show progress with guess history and keyboard
|
||||
await self.update_user_game(user_id, ",".join(guesses), 0)
|
||||
|
||||
# Build visual grid showing guesses and words
|
||||
|
||||
grid_lines = []
|
||||
for g in guesses:
|
||||
squares = "".join(SQUARES[c] for c in self.compare_guess(g, target))
|
||||
grid_lines.append(f"{squares} `{g.upper()}`")
|
||||
grid = "\n".join(grid_lines)
|
||||
|
||||
|
||||
keyboard = self.get_keyboard_display(guesses, target)
|
||||
await interaction.response.send_message(
|
||||
f"Your guesses so far:\n{grid}\n\nGuesses: {len(guesses)}/6\n\n**Keyboard:**\n{keyboard}",
|
||||
|
|
@ -278,74 +221,64 @@ class Wordle(commands.Cog):
|
|||
|
||||
@app_commands.command(name="wordle", description="Start today's Wordle game.")
|
||||
async def wordle(self, interaction: discord.Interaction):
|
||||
"""Start a new Wordle game or check progress on current game."""
|
||||
user_id = interaction.user.id
|
||||
row = await self.get_user_game(user_id)
|
||||
|
||||
# Check if user already finished today's game
|
||||
if row and row[1]: # finished
|
||||
|
||||
if row and row[1]:
|
||||
await interaction.response.send_message("You've already finished today's Wordle! Check back tomorrow.", ephemeral=True)
|
||||
return
|
||||
|
||||
|
||||
guesses = [] if not row else (row[0].split(",") if row[0] else [])
|
||||
|
||||
# Show current progress if game is in progress
|
||||
|
||||
if guesses:
|
||||
target = self.get_daily_word()
|
||||
|
||||
# Build visual grid with guesses and words
|
||||
|
||||
grid_lines = []
|
||||
for g in guesses:
|
||||
squares = "".join(SQUARES[c] for c in self.compare_guess(g, target))
|
||||
grid_lines.append(f"{squares} `{g.upper()}`")
|
||||
grid = "\n".join(grid_lines)
|
||||
|
||||
|
||||
keyboard = self.get_keyboard_display(guesses, target)
|
||||
await interaction.response.send_message(
|
||||
f"You have a game in progress! Use `/wordle_guess <word>` to continue.\n\n{grid}\n\nGuesses: {len(guesses)}/6\n\n**Keyboard:**\n{keyboard}",
|
||||
ephemeral=True
|
||||
)
|
||||
else:
|
||||
# Start a new game
|
||||
await interaction.response.send_message(
|
||||
f"🎮 Started Wordle for {datetime.date.today()}!\n\nUse `/wordle_guess <word>` to make your guesses. You have 6 tries!",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
|
||||
@app_commands.command(name="wordle_guess", description="Submit a guess for today's Wordle.")
|
||||
@app_commands.describe(guess="Your 5-letter word guess")
|
||||
async def wordle_guess(self, interaction: discord.Interaction, guess: str):
|
||||
"""Submit a guess for your current Wordle game."""
|
||||
await self.handle_guess(interaction, guess.lower())
|
||||
|
||||
@app_commands.command(name="wordle_stats", description="View your Wordle stats.")
|
||||
async def wordle_stats(self, interaction: discord.Interaction):
|
||||
"""Display personal Wordle statistics including wins, streaks, and averages."""
|
||||
user_id = interaction.user.id
|
||||
|
||||
# Fetch user stats from database
|
||||
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT played, wins, current_streak, max_streak FROM wordle_stats WHERE user_id=?", (user_id,))
|
||||
row = await cursor.fetchone()
|
||||
|
||||
|
||||
if not row:
|
||||
await interaction.response.send_message("You haven't played any games yet!", ephemeral=True)
|
||||
return
|
||||
|
||||
played, wins, streak, max_streak = row
|
||||
win_rate = (wins / played * 100) if played else 0
|
||||
|
||||
# Calculate average guesses for won games
|
||||
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT guesses FROM wordle_games WHERE user_id=? AND finished=1", (user_id,))
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
|
||||
avg_guesses = 0
|
||||
if rows:
|
||||
valid = [len(r[0].split(",")) for r in rows if r[0]]
|
||||
avg_guesses = round(sum(valid) / len(valid), 2) if valid else 0
|
||||
|
||||
# Build and send stats embed
|
||||
embed = discord.Embed(
|
||||
title=f"{interaction.user.display_name}'s Wordle Stats",
|
||||
color=get_embed_color(interaction.user.id)
|
||||
|
|
@ -360,24 +293,20 @@ class Wordle(commands.Cog):
|
|||
|
||||
@app_commands.command(name="wordle_serverstats", description="View server-wide Wordle stats.")
|
||||
async def wordle_serverstats(self, interaction: discord.Interaction):
|
||||
"""Display aggregated statistics for all players on the server."""
|
||||
async with aiosqlite.connect(DB_PATH) as db:
|
||||
cursor = await db.execute("SELECT played, wins FROM wordle_stats")
|
||||
stats_rows = await cursor.fetchall()
|
||||
cursor = await db.execute("SELECT guesses FROM wordle_games WHERE finished=1")
|
||||
guess_rows = await cursor.fetchall()
|
||||
|
||||
# Calculate aggregate statistics
|
||||
total_played = sum(r[0] for r in stats_rows)
|
||||
total_wins = sum(r[1] for r in stats_rows)
|
||||
total_losses = total_played - total_wins
|
||||
win_rate = (total_wins / total_played * 100) if total_played else 0
|
||||
|
||||
# Calculate average guesses across all won games
|
||||
guesses = [len(r[0].split(",")) for r in guess_rows if r[0]]
|
||||
avg_guesses = round(sum(guesses) / len(guesses), 2) if guesses else 0
|
||||
|
||||
# Build and send server stats embed
|
||||
embed = discord.Embed(
|
||||
title="Server Wordle Stats",
|
||||
color=get_embed_color(interaction.user.id)
|
||||
|
|
@ -391,5 +320,4 @@ class Wordle(commands.Cog):
|
|||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
async def setup(bot):
|
||||
"""Load the Wordle cog into the bot."""
|
||||
await bot.add_cog(Wordle(bot))
|
||||
await bot.add_cog(Wordle(bot))
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user