diff --git a/plugins/base/macro-condition-websocket.cpp b/plugins/base/macro-condition-websocket.cpp index 4cde5425..af8d16a6 100644 --- a/plugins/base/macro-condition-websocket.cpp +++ b/plugins/base/macro-condition-websocket.cpp @@ -20,38 +20,33 @@ const static std::map "AdvSceneSwitcher.condition.websocket.type.event"}, }; +MacroConditionWebsocket::MacroConditionWebsocket(Macro *m) + : MacroCondition(m, true) +{ + _messageBuffer = RegisterForWebsocketMessages(); +} + bool MacroConditionWebsocket::CheckCondition() { - std::vector *messages; - switch (_type) { - case MacroConditionWebsocket::Type::REQUEST: - messages = &GetWebsocketMessages(); - break; - case MacroConditionWebsocket::Type::EVENT: { - auto connection = _connection.lock(); - if (!connection) { - return false; - } - messages = &connection->Events(); - break; - } - default: - break; + if (!_messageBuffer) { + return false; } - for (auto &m : *messages) { + while (!_messageBuffer->Empty()) { + auto message = _messageBuffer->ConsumeMessage(); + if (!message) { + continue; + } if (_regex.Enabled()) { - if (_regex.Matches(m.message, _message)) { - SetTempVarValue("message", m.message); - SetVariableValue(m.message); - m.processed = true; + if (_regex.Matches(*message, _message)) { + SetTempVarValue("message", *message); + SetVariableValue(*message); return true; } } else { - if (m.message == std::string(_message)) { - SetTempVarValue("message", m.message); - SetVariableValue(m.message); - m.processed = true; + if (*message == std::string(_message)) { + SetTempVarValue("message", *message); + SetVariableValue(*message); return true; } } @@ -84,6 +79,8 @@ bool MacroConditionWebsocket::Load(obs_data_t *obj) } _connection = GetWeakConnectionByName(obs_data_get_string(obj, "connection")); + + SetType(_type); return true; } @@ -95,6 +92,41 @@ std::string MacroConditionWebsocket::GetShortDesc() const return GetWeakConnectionName(_connection); } +void MacroConditionWebsocket::SetType(Type type) +{ + _type = type; + if (_type == Type::REQUEST) { + _messageBuffer = RegisterForWebsocketMessages(); + return; + } + + auto connection = _connection.lock(); + if (!connection) { + return; + } + _messageBuffer = connection->RegisterForEvents(); +} + +void MacroConditionWebsocket::SetConnection(const std::string &connectionName) +{ + _connection = GetWeakConnectionByName(connectionName); + if (_type == Type::REQUEST) { + // This should not really happen, but let's be safe + return; + } + + auto connection = _connection.lock(); + if (!connection) { + return; + } + _messageBuffer = connection->RegisterForEvents(); +} + +std::weak_ptr MacroConditionWebsocket::GetConnection() const +{ + return _connection; +} + void MacroConditionWebsocket::SetupTempVars() { MacroCondition::SetupTempVars(); @@ -187,12 +219,12 @@ void MacroConditionWebsocketEdit::UpdateEntryData() return; } - _conditions->setCurrentIndex(static_cast(_entryData->_type)); + _conditions->setCurrentIndex(static_cast(_entryData->GetType())); _message->setPlainText(_entryData->_message); _regex->SetRegexConfig(_entryData->_regex); - _connection->SetConnection(_entryData->_connection); + _connection->SetConnection(_entryData->GetConnection()); - if (_entryData->_type == MacroConditionWebsocket::Type::REQUEST) { + if (_entryData->GetType() == MacroConditionWebsocket::Type::REQUEST) { SetupRequestEdit(); } else { SetupEventEdit(); @@ -209,8 +241,8 @@ void MacroConditionWebsocketEdit::ConditionChanged(int index) } auto lock = LockContext(); - _entryData->_type = static_cast(index); - if (_entryData->_type == MacroConditionWebsocket::Type::REQUEST) { + _entryData->SetType(static_cast(index)); + if (_entryData->GetType() == MacroConditionWebsocket::Type::REQUEST) { SetupRequestEdit(); } else { SetupEventEdit(); @@ -240,7 +272,7 @@ void MacroConditionWebsocketEdit::ConnectionSelectionChanged( } auto lock = LockContext(); - _entryData->_connection = GetWeakConnectionByQString(connection); + _entryData->SetConnection(connection.toStdString()); emit(HeaderInfoChanged(connection)); } diff --git a/plugins/base/macro-condition-websocket.hpp b/plugins/base/macro-condition-websocket.hpp index 21d60b96..bc2eba78 100644 --- a/plugins/base/macro-condition-websocket.hpp +++ b/plugins/base/macro-condition-websocket.hpp @@ -3,6 +3,7 @@ #include "connection-manager.hpp" #include "variable-text-edit.hpp" #include "regex-config.hpp" +#include "websocket-helpers.hpp" #include @@ -10,7 +11,7 @@ namespace advss { class MacroConditionWebsocket : public MacroCondition { public: - MacroConditionWebsocket(Macro *m) : MacroCondition(m, true) {} + MacroConditionWebsocket(Macro *m); bool CheckCondition(); bool Save(obs_data_t *obj) const; bool Load(obs_data_t *obj); @@ -26,14 +27,20 @@ public: EVENT, }; - Type _type = Type::REQUEST; + void SetType(Type); + Type GetType() const { return _type; } + void SetConnection(const std::string &); + std::weak_ptr GetConnection() const; StringVariable _message = obs_module_text("AdvSceneSwitcher.enterText"); RegexConfig _regex; - std::weak_ptr _connection; private: void SetupTempVars(); + Type _type = Type::REQUEST; + std::weak_ptr _connection; + + WebsocketMessageBuffer _messageBuffer; static bool _registered; static const std::string id; }; diff --git a/plugins/base/utils/connection-manager.cpp b/plugins/base/utils/connection-manager.cpp index 30dfcb7c..1818f803 100644 --- a/plugins/base/utils/connection-manager.cpp +++ b/plugins/base/utils/connection-manager.cpp @@ -199,6 +199,11 @@ void Connection::Save(obs_data_t *obj) const obs_data_set_int(obj, "version", 1); } +WebsocketMessageBuffer Connection::RegisterForEvents() +{ + return _client.RegisterForEvents(); +} + void Connection::UseOBSWebsocketProtocol(bool useOBSWSProtocol) { _useOBSWSProtocol = useOBSWSProtocol; diff --git a/plugins/base/utils/connection-manager.hpp b/plugins/base/utils/connection-manager.hpp index 821f5099..2e7cd4b3 100644 --- a/plugins/base/utils/connection-manager.hpp +++ b/plugins/base/utils/connection-manager.hpp @@ -40,9 +40,9 @@ public: void SendMsg(const std::string &msg); void Load(obs_data_t *obj); void Save(obs_data_t *obj) const; - std::string GetName() { return _name; } - std::vector &Events() { return _client.Events(); } - bool IsUsingOBSProtocol() { return _useOBSWSProtocol; } + std::string GetName() const { return _name; } + WebsocketMessageBuffer RegisterForEvents(); + bool IsUsingOBSProtocol() const { return _useOBSWSProtocol; } private: void UseOBSWebsocketProtocol(bool); diff --git a/plugins/base/utils/websocket-helpers.cpp b/plugins/base/utils/websocket-helpers.cpp index 18d36358..a83e3bf4 100644 --- a/plugins/base/utils/websocket-helpers.cpp +++ b/plugins/base/utils/websocket-helpers.cpp @@ -14,14 +14,14 @@ using websocketpp::lib::placeholders::_2; using websocketpp::lib::bind; #define RPC_VERSION 1 +#undef DispatchMessage constexpr char VendorName[] = "AdvancedSceneSwitcher"; constexpr char VendorRequest[] = "AdvancedSceneSwitcherMessage"; constexpr char VendorEvent[] = "AdvancedSceneSwitcherEvent"; obs_websocket_vendor vendor; -static void clearWebsocketMessages(); -static std::vector websocketMessages; +static WebsocketMessageDispatcher websocketMessageDispatcher; static void registerWebsocketVendor(); static bool setup(); @@ -29,36 +29,13 @@ static bool setupDone = setup(); bool setup() { - AddIntervalResetStep(clearWebsocketMessages); AddPluginPostLoadStep(registerWebsocketVendor); return true; } -std::vector &GetWebsocketMessages() +WebsocketMessageBuffer RegisterForWebsocketMessages() { - return websocketMessages; -} - -static void clearWebsocketMessages() -{ - websocketMessages.erase(std::remove_if(websocketMessages.begin(), - websocketMessages.end(), - [](const WSMessage &message) { - return message.processed; - }), - websocketMessages.end()); - for (auto &connection : GetConnections()) { - auto c = dynamic_cast(connection.get()); - if (!c) { - continue; - } - auto &messages = c->Events(); - messages.erase(std::remove_if(messages.begin(), messages.end(), - [](const WSMessage &message) { - return message.processed; - }), - messages.end()); - } + return websocketMessageDispatcher.RegisterClient(); } void SendWebsocketEvent(const std::string &eventMsg) @@ -79,8 +56,7 @@ static void receiveWebsocketMessage(obs_data_t *request_data, obs_data_t *, } auto msg = obs_data_get_string(request_data, "message"); - auto lock = LockContext(); - websocketMessages.emplace_back(msg); + websocketMessageDispatcher.DispatchMessage(msg); vblog(LOG_INFO, "received message: %s", msg); } @@ -247,6 +223,11 @@ void WSConnection::SendRequest(const std::string &msg) Send(msg); } +WebsocketMessageBuffer WSConnection::RegisterForEvents() +{ + return _dispatcher.RegisterClient(); +} + WSConnection::Status WSConnection::GetStatus() const { return _status; @@ -329,8 +310,8 @@ void WSConnection::HandleEvent(obs_data_t *msg) return; } auto eventDataNested = obs_data_get_obj(eventData, "eventData"); - auto lock = LockContext(); - _messages.emplace_back(obs_data_get_string(eventDataNested, "message")); + _dispatcher.DispatchMessage( + obs_data_get_string(eventDataNested, "message")); vblog(LOG_INFO, "received event msg \"%s\"", obs_data_get_string(eventDataNested, "message")); obs_data_release(eventDataNested); @@ -361,9 +342,8 @@ void WSConnection::OnGenericMessage(connection_hdl, client::message_ptr message) return; } - auto lock = LockContext(); const auto payload = message->get_payload(); - _messages.emplace_back(payload); + _dispatcher.DispatchMessage(payload); vblog(LOG_INFO, "received event msg \"%s\"", payload.c_str()); } diff --git a/plugins/base/utils/websocket-helpers.hpp b/plugins/base/utils/websocket-helpers.hpp index 8dafba85..126eb15b 100644 --- a/plugins/base/utils/websocket-helpers.hpp +++ b/plugins/base/utils/websocket-helpers.hpp @@ -1,4 +1,6 @@ #pragma once +#include "message-buffer.hpp" +#include "message-dispatcher.hpp" #include #include @@ -21,16 +23,12 @@ namespace advss { using websocketpp::connection_hdl; - -struct WSMessage { - WSMessage(const std::string &m) : message(m) {} - std::string message = ""; - bool processed = false; -}; +using WebsocketMessageBuffer = std::shared_ptr>; +using WebsocketMessageDispatcher = MessageDispatcher; void SendWebsocketEvent(const std::string &); std::string ConstructVendorRequestMessage(const std::string &message); -std::vector &GetWebsocketMessages(); +[[nodiscard]] WebsocketMessageBuffer RegisterForWebsocketMessages(); class WSConnection : public QObject { using server = websocketpp::server; @@ -44,7 +42,7 @@ public: bool _reconnect, int reconnectDelay = 10); void Disconnect(); void SendRequest(const std::string &msg); - std::vector &Events() { return _messages; } + [[nodiscard]] WebsocketMessageBuffer RegisterForEvents(); std::string GetFail() { return _failMsg; } enum class Status { @@ -82,7 +80,7 @@ private: std::atomic _status = {Status::DISCONNECTED}; std::atomic_bool _disconnect{false}; - std::vector _messages; + WebsocketMessageDispatcher _dispatcher; }; } // namespace advss