Lacie/image/avatar.py
Lilac-Rose be54be9c51 add help and quote commands, fix type errors, clean up birthdays
added /help with a category dropdown that covers all non-mod commands, also
user's avatar fading in from the left side at an angle.

went through every file and fixed pylance errors - Optional type hints on
params that default to None, made cog_unload async everywhere, added isinstance
guards before accessing Member-only attributes, and narrowed get_channel()
return types before calling send().

birthday list now removes db entries for users who have left the server when
someone queries a month.
2026-04-13 04:12:41 +02:00

811 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import discord
from discord import app_commands
from discord.ext import commands
from PIL import Image, ImageOps, ImageSequence
import io
from embed.embed_color import get_embed_color
import asyncio
import aiohttp
import os
import tempfile
from pathlib import Path
from typing import Optional
import numpy as np
from scipy.ndimage import uniform_filter
import cv2
import imageio_ffmpeg
from utils.logger import get_logger
logger = get_logger(__name__)
def _composite_frame(dark, changed, dual, frame_h, frame_w, t1_rgb, t1_alpha, t2_rgb, t2_alpha):
output = np.zeros((frame_h, frame_w, 3), dtype=np.uint8)
show1 = dark & changed & (t1_alpha > 0)
output[show1] = t1_rgb[show1]
if dual:
show2 = ~dark & changed & (t2_alpha > 0)
output[show2] = t2_rgb[show2]
return output.tobytes()
class AvatarCommands(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.session = None
self.explosion_path = Path(__file__).parent.parent / "media" / "explosion-deltarune.gif"
self.obama_path = Path(__file__).parent.parent / "media" / "obama.jpg"
self.bad_apple_path = Path(__file__).parent.parent / "media" / "bad-apple"
self.bad_apple_audio_path = Path(__file__).parent.parent / "media" / "bad_apple.mp3"
self._bad_apple_frames: list[Path] = []
self._active_renders: set[asyncio.Task] = set()
self._active_ffmpeg: set = set() # set of asyncio.subprocess.Process
async def cog_load(self):
self.session = aiohttp.ClientSession()
if self.bad_apple_path.exists():
self._bad_apple_frames = sorted(self.bad_apple_path.glob("*.jpg"))
async def cog_unload(self):
if self.session:
await self.session.close()
for task in list(self._active_renders):
task.cancel()
for proc in list(self._active_ffmpeg):
try:
proc.kill()
except Exception:
pass
def get_avatar_url(self, user, avatar_type_choice):
"""Returns a valid avatar object (never None)."""
use_global = avatar_type_choice and avatar_type_choice.value == "global"
if use_global:
# user.avatar is always the global avatar; fall back for default-avatar accounts
return user.avatar or user.display_avatar
if isinstance(user, discord.Member) and user.guild_avatar:
return user.guild_avatar
return user.display_avatar
avatar_group = app_commands.Group(name="avatar", description="Avatar manipulation commands")
@avatar_group.command(name="show", description="Show your avatar or another user's avatar")
@app_commands.describe(
user="The user whose avatar to show (defaults to you)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_show(self, interaction: discord.Interaction, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
target = user or interaction.user
avatar = self.get_avatar_url(target, avatar_type)
avatar_url = avatar.url
embed = discord.Embed(
title=f"{target.display_name}'s Avatar",
color=get_embed_color(interaction.user.id)
)
embed.set_image(url=avatar_url)
embed.add_field(name="Direct Link", value=f"[Open Avatar]({avatar_url})")
await interaction.followup.send(embed=embed)
@avatar_group.command(name="bitcrush", description="Bitcrush a user's avatar to a lower bits-per-pixel value")
@app_commands.describe(
user="The user whose avatar to bitcrush (defaults to you)",
bpp="Bits per pixel (18, default 8)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_bitcrush(self, interaction: discord.Interaction, bpp: int = 8, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
if bpp < 1 or bpp > 8:
await interaction.followup.send("Please choose a bit depth between 1 and 8.", ephemeral=True)
return
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(512)
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
image_bytes = await resp.read()
crushed_bytes = await asyncio.to_thread(self._bitcrush_image, image_bytes, bpp)
file = discord.File(io.BytesIO(crushed_bytes), filename=f"bitcrushed_{bpp}bit.png")
await interaction.followup.send(
f"{user.display_name}'s avatar, bitcrushed to {bpp} bit(s):",
file=file
)
except Exception:
logger.exception("Error in avatar bitcrush")
await interaction.followup.send("An error occurred while processing the image.", ephemeral=True)
def _bitcrush_image(self, image_bytes: bytes, bits: int) -> bytes:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
colors = 2 ** bits
crushed = img.quantize(colors=colors, method=Image.MEDIANCUT)
out = io.BytesIO()
crushed.save(out, format="PNG")
out.seek(0)
return out.getvalue()
@avatar_group.command(name="canny_edge", description="Apply Canny edge detection to a user's avatar")
@app_commands.describe(
user="The user whose avatar to process (defaults to you)",
threshold1="Lower threshold for edge detection (default 100)",
threshold2="Upper threshold for edge detection (default 200)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_canny_edge(self, interaction: discord.Interaction, threshold1: int = 100, threshold2: int = 200, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
if threshold1 < 0 or threshold1 > 500:
await interaction.followup.send("threshold1 must be between 0 and 500.", ephemeral=True)
return
if threshold2 < 0 or threshold2 > 500:
await interaction.followup.send("threshold2 must be between 0 and 500.", ephemeral=True)
return
if threshold1 >= threshold2:
await interaction.followup.send("threshold1 must be less than threshold2.", ephemeral=True)
return
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(512)
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
image_bytes = await resp.read()
edge_bytes = await asyncio.to_thread(self._canny_edge_detection, image_bytes, threshold1, threshold2)
file = discord.File(io.BytesIO(edge_bytes), filename="canny_edges.png")
await interaction.followup.send(
f"{user.display_name}'s avatar with Canny edge detection:",
file=file
)
except Exception:
logger.exception("Error in avatar canny_edge")
await interaction.followup.send("An error occurred while processing the image.", ephemeral=True)
def _canny_edge_detection(self, image_bytes: bytes, threshold1: int, threshold2: int) -> bytes:
# Load image and convert to grayscale
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
img_array = np.array(img)
# Convert RGB to BGR for OpenCV
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
# Convert to grayscale
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
# Apply Canny edge detection
edges = cv2.Canny(gray, threshold1, threshold2)
# Convert back to PIL Image
edges_img = Image.fromarray(edges)
out = io.BytesIO()
edges_img.save(out, format="PNG")
out.seek(0)
return out.getvalue()
@avatar_group.command(name="explode", description="Make a user's avatar explode")
@app_commands.describe(
user="The user whose avatar to explode (defaults to you)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_explode(self, interaction: discord.Interaction, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(256)
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
avatar_bytes = await resp.read()
exploded_bytes = await asyncio.to_thread(self._explode_avatar, avatar_bytes)
file = discord.File(io.BytesIO(exploded_bytes), filename="exploded.gif")
await interaction.followup.send(f"{user.display_name} just got exploded!", file=file)
except Exception:
logger.exception("Error in avatar explode")
await interaction.followup.send("An error occurred while processing the explosion.", ephemeral=True)
def _explode_avatar(self, avatar_bytes: bytes) -> bytes:
avatar = Image.open(io.BytesIO(avatar_bytes)).convert("RGBA")
explosion = Image.open(self.explosion_path)
avatar_size = avatar.size
frames = []
for frame in ImageSequence.Iterator(explosion):
frame = frame.convert("RGBA")
frame_resized = frame.resize(avatar_size, Image.Resampling.LANCZOS)
combined = Image.new("RGBA", avatar_size)
combined.paste(avatar, (0, 0))
combined.paste(frame_resized, (0, 0), frame_resized)
frames.append(combined)
out = io.BytesIO()
frames[0].save(
out,
format="GIF",
save_all=True,
append_images=frames[1:],
duration=explosion.info.get("duration", 50),
loop=0,
disposal=2
)
out.seek(0)
return out.getvalue()
@avatar_group.command(name="grayscale", description="Grayscale a user's avatar")
@app_commands.describe(
user="The user whose avatar to grayscale (defaults to you)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_grayscale(self, interaction: discord.Interaction, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(512)
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
image_bytes = await resp.read()
grayscaled_bytes = await asyncio.to_thread(self._grayscale_image, image_bytes)
file = discord.File(io.BytesIO(grayscaled_bytes), filename="grayscaled.png")
await interaction.followup.send(
f"{user.display_name}'s avatar, grayscaled:",
file=file
)
except Exception:
logger.exception("Error in avatar grayscale")
await interaction.followup.send("An error occurred while processing the image.", ephemeral=True)
def _grayscale_image(self, image_bytes: bytes) -> bytes:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
grayscaled = ImageOps.grayscale(img)
out = io.BytesIO()
grayscaled.save(out, format="PNG")
out.seek(0)
return out.getvalue()
@avatar_group.command(name="inverse", description="Invert the colors of a user's avatar")
@app_commands.describe(
user="The user whose avatar to invert (defaults to you)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_inverse(self, interaction: discord.Interaction, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(512)
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
image_bytes = await resp.read()
inverted_bytes = await asyncio.to_thread(self._invert_image, image_bytes)
file = discord.File(io.BytesIO(inverted_bytes), filename="inverted.png")
await interaction.followup.send(
f"{user.display_name}'s avatar, color-inverted:",
file=file
)
except Exception:
logger.exception("Error in avatar inverse")
await interaction.followup.send("An error occurred while processing the image.", ephemeral=True)
def _invert_image(self, image_bytes: bytes) -> bytes:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
inverted = ImageOps.invert(img)
out = io.BytesIO()
inverted.save(out, format="PNG")
out.seek(0)
return out.getvalue()
@avatar_group.command(name="kuwahara", description="Apply a Kuwahara filter to a user's avatar for a painterly effect")
@app_commands.describe(
user="The user whose avatar to filter (defaults to you)",
kernel_size="Filter kernel size (3-15, odd numbers only, default 5)",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_kuwahara(self, interaction: discord.Interaction, kernel_size: int = 5, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
if kernel_size < 3 or kernel_size > 15 or kernel_size % 2 == 0:
await interaction.followup.send("Kernel size must be an odd number between 3 and 15.", ephemeral=True)
return
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.with_format("png").with_size(512)
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
async with self.session.get(str(avatar_url)) as resp:
resp.raise_for_status()
image_bytes = await resp.read()
filtered_bytes = await asyncio.to_thread(self._kuwahara_filter, image_bytes, kernel_size)
file = discord.File(io.BytesIO(filtered_bytes), filename="kuwahara.png")
await interaction.followup.send(
f"{user.display_name}'s avatar with Kuwahara filter (kernel size {kernel_size}):",
file=file
)
except Exception:
logger.exception("Error in avatar kuwahara")
await interaction.followup.send("An error occurred while processing the image.", ephemeral=True)
def _kuwahara_filter(self, image_bytes: bytes, kernel_size: int) -> bytes:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
img_array = np.array(img, dtype=np.float32)
h, w, c = img_array.shape
result = np.zeros_like(img_array)
radius = kernel_size // 2
# Process each color channel separately
for ch in range(c):
channel = img_array[:, :, ch]
# Calculate mean and variance for the four quadrants
mean = uniform_filter(channel, kernel_size, mode='reflect')
mean_sq = uniform_filter(channel**2, kernel_size, mode='reflect')
variance = mean_sq - mean**2
# Create four quadrants by shifting the variance map
padded_var = np.pad(variance, radius, mode='reflect')
padded_mean = np.pad(mean, radius, mode='reflect')
# Extract four overlapping regions (quadrants)
vars = []
means = []
for dy in [0, radius]:
for dx in [0, radius]:
vars.append(padded_var[dy:dy+h, dx:dx+w])
means.append(padded_mean[dy:dy+h, dx:dx+w])
# Stack and find minimum variance quadrant
vars_stack = np.stack(vars, axis=0)
means_stack = np.stack(means, axis=0)
min_var_idx = np.argmin(vars_stack, axis=0)
# Select mean from quadrant with minimum variance
for i in range(4):
mask = (min_var_idx == i)
result[:, :, ch][mask] = means_stack[i][mask]
result = np.clip(result, 0, 255).astype(np.uint8)
filtered_img = Image.fromarray(result)
out = io.BytesIO()
filtered_img.save(out, format="PNG")
out.seek(0)
return out.getvalue()
@avatar_group.command(name="obamify", description="Turn a user's avatar into a tile-based Obama mosaic")
@app_commands.describe(
user="The user whose avatar to obamify (defaults to you)",
tile_count="Number of tiles per row/column (default 32), 1-256",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_obamify(self, interaction: discord.Interaction, tile_count: int = 32, user: Optional[discord.User] = None, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
if tile_count < 1 or tile_count > 256:
await interaction.followup.send("Tile count must be 1256.", ephemeral=True)
return
if not os.path.exists(self.obama_path):
await interaction.followup.send("Error: obama.jpg not found.", ephemeral=True)
return
try:
avatar = self.get_avatar_url(user, avatar_type)
avatar_url = avatar.url
avatar_img = await self._fetch_avatar(avatar_url)
obama_img = Image.open(self.obama_path).convert("RGB")
buf = await asyncio.to_thread(self._generate_mosaic, avatar_img, obama_img, tile_count)
await interaction.followup.send(file=discord.File(buf, filename="obama_mosaic.png"))
except Exception:
logger.exception("Error in avatar obamify")
await interaction.followup.send("An error occurred during mosaic generation.", ephemeral=True)
async def _fetch_avatar(self, url: str) -> Image.Image:
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
async with self.session.get(url) as resp:
resp.raise_for_status()
data = await resp.read()
return Image.open(io.BytesIO(data)).convert("RGB")
def _generate_mosaic(self, avatar_img: Image.Image, obama_img: Image.Image, tile_count: int) -> io.BytesIO:
obama_img = obama_img.convert("RGB")
obama_w, obama_h = obama_img.size
tile_w = obama_w // tile_count
tile_h = obama_h // tile_count
avatar_tile = avatar_img.resize((tile_w, tile_h))
output = Image.new("RGB", (tile_w * tile_count, tile_h * tile_count))
obama_array = np.array(obama_img)
for y in range(tile_count):
for x in range(tile_count):
tile_array = obama_array[y*tile_h:(y+1)*tile_h, x*tile_w:(x+1)*tile_w]
avg_color = tile_array.mean(axis=(0,1))
tile = avatar_tile.copy()
tile_arr = np.array(tile).astype(np.float32)
tint_factor = avg_color / (tile_arr.mean(axis=(0,1)) + 1e-6)
tile_arr = np.clip(tile_arr * tint_factor, 0, 255).astype(np.uint8)
tile = Image.fromarray(tile_arr)
output.paste(tile, (x*tile_w, y*tile_h))
buf = io.BytesIO()
output.save(buf, format="PNG")
buf.seek(0)
return buf
@avatar_group.command(name="bad_apple", description="Play Bad Apple with avatar(s) tiled as the fill")
@app_commands.describe(
user="User for the black silhouette (defaults to you)",
user2="User for the white background (omit for transparent background)",
tile_count="Number of avatar tiles per row (default 16, range 164)",
delta_only="Only show pixels that changed from the previous frame",
avatar_type="Choose between server or global avatar"
)
@app_commands.choices(
avatar_type=[
app_commands.Choice(name="Server Avatar", value="server"),
app_commands.Choice(name="Global Avatar", value="global")
]
)
async def avatar_bad_apple(self, interaction: discord.Interaction, user: Optional[discord.User] = None, user2: Optional[discord.User] = None, tile_count: int = 16, delta_only: bool = False, invert: bool = False, avatar_type: Optional[app_commands.Choice[str]] = None):
await interaction.response.defer(thinking=True)
user = user or interaction.user
if tile_count < 1 or tile_count > 64:
await interaction.followup.send("Tile count must be 164.", ephemeral=True)
return
if not self.bad_apple_path.exists():
await interaction.followup.send("Error: Bad Apple frames not found.", ephemeral=True)
return
task = asyncio.current_task()
self._active_renders.add(task)
try:
if not self.session or self.session.closed:
self.session = aiohttp.ClientSession()
avatar = self.get_avatar_url(user, avatar_type)
async with self.session.get(str(avatar.with_format("png").with_size(512))) as resp:
resp.raise_for_status()
avatar_bytes = await resp.read()
avatar2_bytes = None
if user2 is not None:
avatar2 = self.get_avatar_url(user2, avatar_type)
async with self.session.get(str(avatar2.with_format("png").with_size(512))) as resp:
resp.raise_for_status()
avatar2_bytes = await resp.read()
tmp_raw = tempfile.NamedTemporaryFile(suffix=".raw", delete=False)
tmp_raw.close()
raw_path = tmp_raw.name
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
audio_offset = await self._detect_audio_offset(ffmpeg_exe)
frame_skip = int(audio_offset * 30)
loop = asyncio.get_event_loop()
async def update_progress(done: int, total: int) -> None:
pct = done / total * 100
filled = int(pct / 5)
bar = "" * filled + "" * (20 - filled)
try:
await interaction.edit_original_response(
content=f"-# Rendering... `[{bar}]` {pct:.0f}% ({done}/{total} frames)"
)
except Exception:
pass
def progress_callback(done: int, total: int) -> None:
asyncio.run_coroutine_threadsafe(update_progress(done, total), loop)
frame_w, frame_h = await asyncio.to_thread(
self._process_bad_apple_frames, avatar_bytes, tile_count, raw_path, avatar2_bytes, frame_skip, delta_only, progress_callback, invert
)
total_frames = len(self._bad_apple_frames) - frame_skip
async def update_encode_progress(done: int, total: int) -> None:
pct = done / total * 100
filled = int(pct / 5)
bar = "" * filled + "" * (20 - filled)
try:
await interaction.edit_original_response(
content=f"-# Encoding... `[{bar}]` {pct:.0f}% ({done}/{total} frames)"
)
except Exception:
pass
def encode_progress_callback(done: int, total: int) -> None:
asyncio.run_coroutine_threadsafe(update_encode_progress(done, total), loop)
buf = await self._encode_bad_apple_video(raw_path, frame_w, frame_h, ffmpeg_exe, audio_offset, total_frames, encode_progress_callback)
label = f"{user.display_name} vs {user2.display_name}" if user2 else user.display_name
file_size = buf.getbuffer().nbytes
limit = interaction.guild.filesize_limit if interaction.guild else 8_388_608
if file_size > limit:
mb = file_size / 1_048_576
limit_mb = limit / 1_048_576
await interaction.followup.send(
f"The rendered video is {mb:.1f} MB, which exceeds this server's upload limit of {limit_mb:.0f} MB.",
ephemeral=True
)
return
await interaction.followup.send(
f"Bad Apple, but it's {label}:",
file=discord.File(buf, filename="bad_apple.mp4")
)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Error in avatar bad_apple")
await interaction.followup.send("An error occurred while generating the video.", ephemeral=True)
finally:
self._active_renders.discard(task)
def _build_tiled(self, avatar_bytes: bytes, frame_w: int, frame_h: int, tile_count: int) -> np.ndarray:
"""Build a tiled RGBA canvas of the avatar at frame resolution."""
img = Image.open(io.BytesIO(avatar_bytes)).convert("RGBA")
tile_w = frame_w // tile_count
tile_h = tile_w
tile_arr = np.array(img.resize((tile_w, tile_h), Image.Resampling.LANCZOS))
tiles_x = -(-frame_w // tile_w) # ceiling division
tiles_y = -(-frame_h // tile_h)
return np.tile(tile_arr, (tiles_y, tiles_x, 1))[:frame_h, :frame_w]
def _process_bad_apple_frames(self, avatar_bytes: bytes, tile_count: int, raw_path: str, avatar2_bytes: bytes | None = None, frame_skip: int = 0, delta_only: bool = False, progress_callback=None, invert: bool = False) -> tuple[int, int]:
"""Process frames and write raw video data to raw_path. Returns (frame_w, frame_h, has_alpha)."""
from concurrent.futures import ThreadPoolExecutor
frame_files = (self._bad_apple_frames or sorted(self.bad_apple_path.glob("*.jpg")))[frame_skip:]
if not frame_files:
raise ValueError("No Bad Apple frames found")
first = Image.open(frame_files[0])
frame_w, frame_h = first.size
t1 = self._build_tiled(avatar_bytes, frame_w, frame_h, tile_count)
t1_rgb, t1_alpha = t1[:, :, :3], t1[:, :, 3]
dual = avatar2_bytes is not None
if dual:
t2 = self._build_tiled(avatar2_bytes, frame_w, frame_h, tile_count)
t2_rgb, t2_alpha = t2[:, :, :3], t2[:, :, 3]
else:
t2_rgb = t2_alpha = None
workers = min(8, os.cpu_count() or 4)
# Process in chunks so only ~CHUNK frames are in RAM at once
CHUNK = 240 # ~160 MB peak for RGBA at 480x360
ones = np.ones((frame_h, frame_w), dtype=bool)
def load_mask(path):
return np.array(Image.open(path).convert("L").resize((frame_w, frame_h), Image.Resampling.NEAREST))
def load_and_composite(path):
mask = load_mask(path) < 128
return _composite_frame(~mask if invert else mask, ones, dual, frame_h, frame_w, t1_rgb, t1_alpha, t2_rgb, t2_alpha)
total = len(frame_files)
done = 0
with open(raw_path, "wb") as f:
if not delta_only:
for i in range(0, total, CHUNK):
chunk = frame_files[i:i + CHUNK]
with ThreadPoolExecutor(max_workers=workers) as ex:
for fb in ex.map(load_and_composite, chunk):
f.write(fb)
done += len(chunk)
if progress_callback:
progress_callback(done, total)
else:
prev_mask = None
for i in range(0, total, CHUNK):
chunk = frame_files[i:i + CHUNK]
with ThreadPoolExecutor(max_workers=workers) as ex:
chunk_masks = list(ex.map(load_mask, chunk))
for j, mask in enumerate(chunk_masks):
dark = ~(mask < 128) if invert else (mask < 128)
ref = chunk_masks[j - 1] if j > 0 else prev_mask
changed = np.abs(mask.astype(np.int16) - ref.astype(np.int16)) > 20 if ref is not None else ones
f.write(_composite_frame(dark, changed, dual, frame_h, frame_w, t1_rgb, t1_alpha, t2_rgb, t2_alpha))
prev_mask = chunk_masks[-1]
done += len(chunk)
if progress_callback:
progress_callback(done, total)
return frame_w, frame_h
async def _detect_audio_offset(self, ffmpeg_exe: str) -> float:
"""Detect duration of silence at the start of the Bad Apple audio."""
proc = await asyncio.create_subprocess_exec(
ffmpeg_exe,
"-i", str(self.bad_apple_audio_path),
"-af", "silencedetect=n=-50dB:d=0.1",
"-f", "null", "-",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
for line in stderr.decode().splitlines():
if "silence_end" in line:
try:
return float(line.split("silence_end:")[1].split("|")[0].strip())
except (IndexError, ValueError):
pass
return 0.0
async def _encode_bad_apple_video(self, raw_path: str, frame_w: int, frame_h: int, ffmpeg_exe: Optional[str] = None, audio_offset: float = 0.0, total_frames: int = 0, progress_callback=None) -> io.BytesIO:
"""Encode raw frames + audio into MP4 (dual-user) or WebM with alpha (single-user)."""
if ffmpeg_exe is None:
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
suffix = ".mp4"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.close()
out_path = tmp.name
encode_threads = str(max(1, (os.cpu_count() or 4) // 2))
video_flags = ["-vcodec", "libx264", "-pix_fmt", "yuv420p", "-preset", "ultrafast", "-crf", "23", "-threads", encode_threads]
audio_filter = f"atrim=start={audio_offset:.6f},asetpts=PTS-STARTPTS" if audio_offset > 0 else "anull"
cmd = [
ffmpeg_exe, "-y",
"-progress", "pipe:1", "-nostats", "-loglevel", "error",
"-f", "rawvideo", "-vcodec", "rawvideo",
"-s", f"{frame_w}x{frame_h}",
"-pix_fmt", "rgb24", "-r", "30",
"-i", raw_path,
"-i", str(self.bad_apple_audio_path),
"-map", "0:v", "-map", "1:a",
"-af", audio_filter,
*video_flags,
"-shortest",
out_path,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
self._active_ffmpeg.add(proc)
try:
async def read_progress():
while True:
line = await proc.stdout.readline()
if not line:
break
text = line.decode().strip()
if text.startswith("frame=") and progress_callback and total_frames:
try:
done = int(text.split("=", 1)[1])
progress_callback(done, total_frames)
except ValueError:
pass
await asyncio.gather(proc.wait(), read_progress())
finally:
self._active_ffmpeg.discard(proc)
with open(out_path, "rb") as f:
video_bytes = f.read()
os.unlink(out_path)
os.unlink(raw_path)
return io.BytesIO(video_bytes)
async def setup(bot):
await bot.add_cog(AvatarCommands(bot))