Switch to message buffer / dispatcher for Twitch event sub messages

This commit is contained in:
WarmUpTill 2024-01-30 23:20:40 +01:00 committed by WarmUpTill
parent 41033f2230
commit a62883153c
4 changed files with 77 additions and 76 deletions

View File

@ -3,7 +3,6 @@
#include "twitch-helpers.hpp"
#include <log-helper.hpp>
#include <plugin-state-helpers.hpp>
namespace advss {
@ -27,6 +26,8 @@ static constexpr std::string_view registerSubscriptionPath =
#endif
static const int reconnectDelay = 15;
#undef DispatchMessage
EventSub::EventSub() : QObject(nullptr)
{
_client.get_alog().clear_channels(
@ -59,20 +60,6 @@ EventSub::~EventSub()
UnregisterInstance();
}
static bool setupEventSubMessageClear()
{
AddIntervalResetStep(&EventSub::ClearAllEvents);
return true;
}
bool EventSub::_setupDone = setupEventSubMessageClear();
void EventSub::ClearEvents()
{
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.clear();
}
std::mutex EventSub::_instancesMtx;
std::vector<EventSub *> EventSub::_instances;
@ -89,14 +76,6 @@ void EventSub::UnregisterInstance()
_instances.erase(it, _instances.end());
}
void EventSub::ClearAllEvents()
{
std::lock_guard<std::mutex> lock(_instancesMtx);
for (const auto &eventSub : _instances) {
eventSub->ClearEvents();
}
}
void EventSub::ConnectThread()
{
while (!_disconnect) {
@ -169,10 +148,9 @@ void EventSub::Disconnect()
ClearActiveSubscriptions();
}
std::vector<Event> EventSub::Events()
EventSubMessageBuffer EventSub::RegisterForEvents()
{
std::lock_guard<std::mutex> lock(_messageMtx);
return _messages;
return _dispatcher.RegisterClient();
}
bool EventSub::SubscriptionIsActive(const std::string &id)
@ -358,8 +336,7 @@ void EventSub::HandleNotification(obs_data_t *data)
event.type = obs_data_get_string(subscription, "type");
OBSDataAutoRelease eventData = obs_data_get_obj(data, "event");
event.data = eventData;
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.emplace_back(event);
_dispatcher.DispatchMessage(event);
}
void EventSub::HandleReconnect(obs_data_t *data)

View File

@ -1,4 +1,6 @@
#pragma once
#include "message-dispatcher.hpp"
#include <obs.hpp>
#include <websocketpp/client.hpp>
#include <QObject>
@ -23,9 +25,13 @@ typedef websocketpp::client<websocketpp::config::asio_tls_client>
EventSubWSClient;
#endif
using websocketpp::connection_hdl;
struct Event;
class TwitchToken;
using websocketpp::connection_hdl;
using EventSubMessageBuffer = std::shared_ptr<MessageBuffer<Event>>;
using EventSubMessageDispatcher = MessageDispatcher<Event>;
struct Event {
std::string id;
std::string type;
@ -47,11 +53,10 @@ public:
void Connect();
void Disconnect();
std::vector<Event> Events();
[[nodiscard]] EventSubMessageBuffer RegisterForEvents();
bool SubscriptionIsActive(const std::string &id);
static std::string AddEventSubscribtion(std::shared_ptr<TwitchToken>,
Subscription);
static void ClearAllEvents();
void ClearActiveSubscriptions();
private:
@ -71,8 +76,6 @@ private:
void HandleReconnect(obs_data_t *);
void HanldeRevocation(obs_data_t *);
void ClearEvents();
void RegisterInstance();
void UnregisterInstance();
@ -87,14 +90,12 @@ private:
std::string _url;
std::string _sessionID;
std::mutex _messageMtx;
std::vector<Event> _messages;
std::deque<std::string> _messageIDs;
std::mutex _subscriptionMtx;
std::set<Subscription> _activeSubscriptions;
static std::mutex _instancesMtx;
static std::vector<EventSub *> _instances;
static bool _setupDone;
EventSubMessageDispatcher _dispatcher;
};
} // namespace advss

View File

@ -229,6 +229,11 @@ void MacroConditionTwitch::SetCondition(const Condition &condition)
ResetSubscription();
}
void MacroConditionTwitch::SetToken(const std::weak_ptr<TwitchToken> &t)
{
_token = t;
}
void MacroConditionTwitch::SetChannel(const TwitchChannel &channel)
{
_channel = channel;
@ -281,19 +286,21 @@ setTempVarsHelper(obs_data_t *data,
bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token)
{
auto eventSub = token.GetEventSub();
if (!eventSub) {
if (!_eventBuffer) {
return false;
}
auto events = eventSub->Events();
for (const auto &event : events) {
if (_subscriptionID != event.id) {
while (!_eventBuffer->Empty()) {
auto event = _eventBuffer->ConsumeMessage();
if (!event) {
continue;
}
SetVariableValue(event.ToString());
if (_subscriptionID != event->id) {
continue;
}
SetVariableValue(event->ToString());
setTempVarsHelper(
event.data,
event->data,
std::bind(&MacroConditionTwitch::SetTempVarValue, this,
std::placeholders::_1,
std::placeholders::_2));
@ -305,14 +312,16 @@ bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token)
bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token)
{
auto eventSub = token.GetEventSub();
if (!eventSub) {
if (!_eventBuffer) {
return false;
}
auto events = eventSub->Events();
for (const auto &event : events) {
if (_subscriptionID != event.id) {
while (!_eventBuffer->Empty()) {
auto event = _eventBuffer->ConsumeMessage();
if (!event) {
continue;
}
if (_subscriptionID != event->id) {
continue;
}
@ -321,15 +330,15 @@ bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token)
continue;
}
auto type = obs_data_get_string(event.data, "type");
auto type = obs_data_get_string(event->data, "type");
const auto &typeId = it->second;
if (type != typeId) {
continue;
}
SetVariableValue(event.ToString());
SetVariableValue(event->ToString());
setTempVarsHelper(
event.data,
event->data,
std::bind(&MacroConditionTwitch::SetTempVarValue, this,
std::placeholders::_1,
std::placeholders::_2));
@ -422,6 +431,20 @@ void MacroConditionTwitch::SetTempVarValues(const ChannelInfo &info)
info.is_branded_content ? "true" : "false");
}
bool advss::MacroConditionTwitch::EventSubscriptionIsSetup(
const std::shared_ptr<EventSub> &eventSub)
{
if (!eventSub) {
return false;
}
SetupEventSubscription(*eventSub);
if (_subscriptionIDFuture.valid()) {
// Still waiting for the subscription to be registered
return false;
}
return true;
}
bool MacroConditionTwitch::CheckCondition()
{
SetVariableValue("");
@ -432,15 +455,8 @@ bool MacroConditionTwitch::CheckCondition()
auto eventSub = token->GetEventSub();
if (IsUsingEventSubCondition()) {
if (!eventSub) {
return false;
}
CheckEventSubscription(*eventSub);
if (_subscriptionIDFuture.valid()) {
// Still waiting for the subscription to be registered
return false;
}
if (IsUsingEventSubCondition() && !EventSubscriptionIsSetup(eventSub)) {
return false;
}
switch (_condition) {
@ -688,7 +704,7 @@ bool MacroConditionTwitch::ConditionIsSupportedByToken()
return token->AnyOptionIsEnabled(options);
}
void MacroConditionTwitch::SetupEventSubscriptions()
void MacroConditionTwitch::RegisterEventSubscription()
{
if (!IsUsingEventSubCondition()) {
return;
@ -773,10 +789,11 @@ void MacroConditionTwitch::SetupEventSubscriptions()
void MacroConditionTwitch::ResetSubscription()
{
_eventBuffer.reset();
_subscriptionID = "";
}
void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub)
void MacroConditionTwitch::SetupEventSubscription(EventSub &eventSub)
{
if (_subscriptionIDFuture.valid()) {
if (_subscriptionIDFuture.wait_for(std::chrono::seconds(0)) !=
@ -788,7 +805,8 @@ void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub)
if (eventSub.SubscriptionIsActive(_subscriptionID)) {
return;
}
SetupEventSubscriptions();
RegisterEventSubscription();
_eventBuffer = eventSub.RegisterForEvents();
}
bool MacroConditionTwitch::IsUsingEventSubCondition()
@ -1298,10 +1316,10 @@ void MacroConditionTwitchEdit::TwitchTokenChanged(const QString &token)
}
auto lock = LockContext();
_entryData->_token = GetWeakTwitchTokenByQString(token);
_category->SetToken(_entryData->_token);
_channel->SetToken(_entryData->_token);
_pointsReward->SetToken(_entryData->_token);
_entryData->SetToken(GetWeakTwitchTokenByQString(token));
_category->SetToken(_entryData->GetToken());
_channel->SetToken(_entryData->GetToken());
_pointsReward->SetToken(_entryData->GetToken());
_entryData->ResetChatConnection();
SetWidgetVisibility();
@ -1322,14 +1340,14 @@ void MacroConditionTwitchEdit::CheckToken()
if (!_entryData) {
return;
}
if (_entryData->_token.expired()) {
if (_entryData->GetToken().expired()) {
SetTokenWarning(
true,
obs_module_text(
"AdvSceneSwitcher.twitchToken.noSelection"));
return;
}
if (!TokenIsValid(_entryData->_token)) {
if (!TokenIsValid(_entryData->GetToken())) {
SetTokenWarning(
true, obs_module_text(
"AdvSceneSwitcher.twitchToken.notValid"));
@ -1470,17 +1488,17 @@ void MacroConditionTwitchEdit::UpdateEntryData()
_conditions->setCurrentIndex(_conditions->findData(
static_cast<int>(_entryData->GetCondition())));
_tokens->SetToken(_entryData->_token);
_channel->SetToken(_entryData->_token);
_tokens->SetToken(_entryData->GetToken());
_channel->SetToken(_entryData->GetToken());
_channel->SetChannel(_entryData->_channel);
_pointsReward->SetToken(_entryData->_token);
_pointsReward->SetToken(_entryData->GetToken());
_pointsReward->SetChannel(_entryData->_channel);
_pointsReward->SetPointsReward(_entryData->_pointsReward);
_streamTitle->setText(_entryData->_streamTitle);
_regexTitle->SetRegexConfig(_entryData->_regexTitle);
_chatMessage->setPlainText(_entryData->_chatMessage);
_regexChat->SetRegexConfig(_entryData->_regexChat);
_category->SetToken(_entryData->_token);
_category->SetToken(_entryData->GetToken());
_category->SetCategory(_entryData->_category);
SetWidgetVisibility();

View File

@ -82,6 +82,8 @@ public:
void SetCondition(const Condition &condition);
Condition GetCondition() const { return _condition; }
void SetToken(const std::weak_ptr<TwitchToken> &);
std::weak_ptr<TwitchToken> GetToken() const { return _token; }
void SetChannel(const TwitchChannel &channel);
TwitchChannel GetChannel() const { return _channel; }
void SetPointsReward(const TwitchPointsReward &pointsReward);
@ -93,7 +95,6 @@ public:
bool Load(obs_data_t *obj);
bool ConditionIsSupportedByToken();
std::weak_ptr<TwitchToken> _token;
TwitchChannel _channel;
TwitchPointsReward _pointsReward;
StringVariable _streamTitle = obs_module_text(
@ -108,10 +109,11 @@ private:
bool CheckChannelLiveEvents(TwitchToken &token);
bool CheckChatMessages(TwitchToken &token);
void SetupEventSubscriptions();
void RegisterEventSubscription();
void ResetSubscription();
void CheckEventSubscription(EventSub &);
void SetupEventSubscription(EventSub &);
bool IsUsingEventSubCondition();
bool EventSubscriptionIsSetup(const std::shared_ptr<EventSub> &);
void AddChannelGenericEventSubscription(
const char *version, bool includeModeratorId = false,
const char *mainUserIdFieldName = "broadcaster_user_id",
@ -123,6 +125,9 @@ private:
Condition _condition = Condition::LIVE_POLLING;
std::weak_ptr<TwitchToken> _token;
EventSubMessageBuffer _eventBuffer;
std::future<std::string> _subscriptionIDFuture;
std::string _subscriptionID;