Switch to message buffer / dispatcher for Twitch chat messages

This commit is contained in:
WarmUpTill 2024-02-21 21:43:55 +01:00 committed by WarmUpTill
parent 04631964d3
commit 782791b013
4 changed files with 40 additions and 106 deletions

View File

@ -5,6 +5,8 @@
#include <log-helper.hpp>
#include <plugin-state-helpers.hpp>
#undef DispatchMessage
namespace advss {
using websocketpp::lib::placeholders::_1;
@ -354,43 +356,11 @@ TwitchChatConnection::TwitchChatConnection(const TwitchToken &token,
asio::ssl::context::sslv23_client);
});
_url = defaultURL.data();
RegisterInstance();
}
TwitchChatConnection::~TwitchChatConnection()
{
Disconnect();
UnregisterInstance();
}
static bool setupChatMessageClear()
{
AddIntervalResetStep(&TwitchChatConnection::ClearAllMessages);
return true;
}
bool TwitchChatConnection::_setupDone = setupChatMessageClear();
void TwitchChatConnection::ClearMessages()
{
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.clear();
}
std::mutex TwitchChatConnection::_instancesMtx;
std::vector<TwitchChatConnection *> TwitchChatConnection::_instances;
void TwitchChatConnection::RegisterInstance()
{
std::lock_guard<std::mutex> lock(_instancesMtx);
_instances.emplace_back(this);
}
void TwitchChatConnection::UnregisterInstance()
{
std::lock_guard<std::mutex> lock(_instancesMtx);
auto it = std::remove(_instances.begin(), _instances.end(), this);
_instances.erase(it, _instances.end());
}
std::shared_ptr<TwitchChatConnection>
@ -414,17 +384,10 @@ TwitchChatConnection::GetChatConnection(const TwitchToken &token_,
auto connection = std::shared_ptr<TwitchChatConnection>(
new TwitchChatConnection(token_, channel));
_chatMap[key] = connection;
connection->ConnectToChat();
return connection;
}
void TwitchChatConnection::ClearAllMessages()
{
std::lock_guard<std::mutex> lock(_instancesMtx);
for (const auto &TwitchChatConnection : _instances) {
TwitchChatConnection->ClearMessages();
}
}
void TwitchChatConnection::ConnectThread()
{
while (!_disconnect) {
@ -503,55 +466,29 @@ static std::string toLowerCase(const std::string &str)
return result;
}
std::vector<IRCMessage> TwitchChatConnection::Messages()
void TwitchChatConnection::ConnectToChat()
{
if (!_connected) {
Connect();
return {};
return;
}
if (_joinedChannelName !=
"#" + toLowerCase(std::string(_channel.GetName()))) {
// TODO: disconnect previous channel?
JoinChannel(_channel.GetName());
return {};
}
std::lock_guard<std::mutex> lock(_messageMtx);
return _messages;
}
std::vector<IRCMessage> TwitchChatConnection::Whispers()
ChatMessageBuffer TwitchChatConnection::RegisterForMessages()
{
if (!_connected) {
Connect();
return {};
}
ConnectToChat();
return _messageDispatcher.RegisterClient();
}
if (_joinedChannelName !=
"#" + toLowerCase(std::string(_channel.GetName()))) {
// TODO: disconnect previous channel?
JoinChannel(_channel.GetName());
return {};
}
std::lock_guard<std::mutex> lock(_messageMtx);
return _whispers;
ChatMessageBuffer TwitchChatConnection::RegisterForWhispers()
{
ConnectToChat();
return _whisperDispatcher.RegisterClient();
}
void TwitchChatConnection::SendChatMessage(const std::string &message)
{
if (!_connected) {
Connect();
return;
}
if (_joinedChannelName !=
"#" + toLowerCase(std::string(_channel.GetName()))) {
// TODO: disconnect previous channel?
JoinChannel(_channel.GetName());
return;
}
ConnectToChat();
Send("PRIVMSG " + _joinedChannelName + " :" + message);
}
@ -582,23 +519,20 @@ void TwitchChatConnection::JoinChannel(const std::string &channelName)
void TwitchChatConnection::HandleJoin(const IRCMessage &message)
{
std::lock_guard<std::mutex> lock(_messageMtx);
_joinedChannelName = std::get<std::string>(message.command.parameters);
vblog(LOG_INFO, "Twitch chat join was successful!");
}
void TwitchChatConnection::HandleNewMessage(const IRCMessage &message)
{
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.emplace_back(message);
_messageDispatcher.DispatchMessage(message);
vblog(LOG_INFO, "Received new chat message %s",
message.message.c_str());
}
void TwitchChatConnection::HandleWhisper(const IRCMessage &message)
{
std::lock_guard<std::mutex> lock(_messageMtx);
_whispers.emplace_back(message);
_whisperDispatcher.DispatchMessage(message);
vblog(LOG_INFO, "Received new chat whisper message %s",
message.message.c_str());
}
@ -662,6 +596,7 @@ void TwitchChatConnection::OnMessage(
vblog(LOG_INFO,
"Twitch chat connection authenticated!");
_authenticated = true;
JoinChannel(_channel.GetName());
} else if (message.command.command == pingCommand) {
Send("PONG " +
std::get<std::string>(message.command.parameters));

View File

@ -2,13 +2,13 @@
#include "channel-selection.hpp"
#include "token.hpp"
#include <websocketpp/client.hpp>
#include <QObject>
#include <mutex>
#include <condition_variable>
#include <websocketpp/config/asio_client.hpp>
#include <mutex>
#include <message-buffer.hpp>
#include <QObject>
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio.hpp>
#include <websocketpp/config/asio_client.hpp>
namespace advss {
@ -38,6 +38,9 @@ struct IRCMessage {
std::string message;
};
using ChatMessageBuffer = std::shared_ptr<MessageBuffer<IRCMessage>>;
using ChatMessageDispatcher = MessageDispatcher<IRCMessage>;
class TwitchChatConnection : public QObject {
public:
~TwitchChatConnection();
@ -45,12 +48,10 @@ public:
static std::shared_ptr<TwitchChatConnection>
GetChatConnection(const TwitchToken &token,
const TwitchChannel &channel);
std::vector<IRCMessage> Messages();
std::vector<IRCMessage> Whispers();
[[nodiscard]] ChatMessageBuffer RegisterForMessages();
[[nodiscard]] ChatMessageBuffer RegisterForWhispers();
void SendChatMessage(const std::string &message);
static void ClearAllMessages();
void ClearMessages();
void ConnectToChat();
private:
TwitchChatConnection(const TwitchToken &token,
@ -77,9 +78,6 @@ private:
void HandleNotice(const IRCMessage &) const;
void HandleReconnect();
void RegisterInstance();
void UnregisterInstance();
struct ChatMapKey {
std::string channelName;
std::string token;
@ -103,12 +101,8 @@ private:
std::atomic_bool _disconnect{false};
std::string _url;
std::mutex _messageMtx;
std::vector<IRCMessage> _messages;
std::vector<IRCMessage> _whispers;
static std::mutex _instancesMtx;
static std::vector<TwitchChatConnection *> _instances;
static bool _setupDone;
ChatMessageDispatcher _messageDispatcher;
ChatMessageDispatcher _whisperDispatcher;
};
} // namespace advss

View File

@ -363,14 +363,18 @@ bool MacroConditionTwitch::CheckChatMessages(TwitchToken &token)
if (!_chatConnection) {
_chatConnection = TwitchChatConnection::GetChatConnection(
token, _channel);
_chatBuffer = _chatConnection->RegisterForMessages();
return false;
}
auto messages = _chatConnection->Messages();
for (const auto &message : messages) {
if (stringMatches(_regexChat, message.message, _chatMessage)) {
SetTempVarValue("chatter", message.source.nick);
SetTempVarValue("chat_message", message.message);
while (!_chatBuffer->Empty()) {
auto message = _chatBuffer->ConsumeMessage();
if (!message) {
continue;
}
if (stringMatches(_regexChat, message->message, _chatMessage)) {
SetTempVarValue("chatter", message->source.nick);
SetTempVarValue("chat_message", message->message);
return true;
}
}

View File

@ -131,6 +131,7 @@ private:
std::future<std::string> _subscriptionIDFuture;
std::string _subscriptionID;
ChatMessageBuffer _chatBuffer;
std::shared_ptr<TwitchChatConnection> _chatConnection;
static bool _registered;