Switch to message buffer / dispatcher for websocket messages

This commit is contained in:
WarmUpTill 2024-01-30 22:18:24 +01:00 committed by WarmUpTill
parent 394e2500e8
commit 41033f2230
6 changed files with 100 additions and 78 deletions

View File

@ -20,38 +20,33 @@ const static std::map<MacroConditionWebsocket::Type, std::string>
"AdvSceneSwitcher.condition.websocket.type.event"},
};
MacroConditionWebsocket::MacroConditionWebsocket(Macro *m)
: MacroCondition(m, true)
{
_messageBuffer = RegisterForWebsocketMessages();
}
bool MacroConditionWebsocket::CheckCondition()
{
std::vector<WSMessage> *messages;
switch (_type) {
case MacroConditionWebsocket::Type::REQUEST:
messages = &GetWebsocketMessages();
break;
case MacroConditionWebsocket::Type::EVENT: {
auto connection = _connection.lock();
if (!connection) {
return false;
}
messages = &connection->Events();
break;
}
default:
break;
if (!_messageBuffer) {
return false;
}
for (auto &m : *messages) {
while (!_messageBuffer->Empty()) {
auto message = _messageBuffer->ConsumeMessage();
if (!message) {
continue;
}
if (_regex.Enabled()) {
if (_regex.Matches(m.message, _message)) {
SetTempVarValue("message", m.message);
SetVariableValue(m.message);
m.processed = true;
if (_regex.Matches(*message, _message)) {
SetTempVarValue("message", *message);
SetVariableValue(*message);
return true;
}
} else {
if (m.message == std::string(_message)) {
SetTempVarValue("message", m.message);
SetVariableValue(m.message);
m.processed = true;
if (*message == std::string(_message)) {
SetTempVarValue("message", *message);
SetVariableValue(*message);
return true;
}
}
@ -84,6 +79,8 @@ bool MacroConditionWebsocket::Load(obs_data_t *obj)
}
_connection =
GetWeakConnectionByName(obs_data_get_string(obj, "connection"));
SetType(_type);
return true;
}
@ -95,6 +92,41 @@ std::string MacroConditionWebsocket::GetShortDesc() const
return GetWeakConnectionName(_connection);
}
void MacroConditionWebsocket::SetType(Type type)
{
_type = type;
if (_type == Type::REQUEST) {
_messageBuffer = RegisterForWebsocketMessages();
return;
}
auto connection = _connection.lock();
if (!connection) {
return;
}
_messageBuffer = connection->RegisterForEvents();
}
void MacroConditionWebsocket::SetConnection(const std::string &connectionName)
{
_connection = GetWeakConnectionByName(connectionName);
if (_type == Type::REQUEST) {
// This should not really happen, but let's be safe
return;
}
auto connection = _connection.lock();
if (!connection) {
return;
}
_messageBuffer = connection->RegisterForEvents();
}
std::weak_ptr<Connection> MacroConditionWebsocket::GetConnection() const
{
return _connection;
}
void MacroConditionWebsocket::SetupTempVars()
{
MacroCondition::SetupTempVars();
@ -187,12 +219,12 @@ void MacroConditionWebsocketEdit::UpdateEntryData()
return;
}
_conditions->setCurrentIndex(static_cast<int>(_entryData->_type));
_conditions->setCurrentIndex(static_cast<int>(_entryData->GetType()));
_message->setPlainText(_entryData->_message);
_regex->SetRegexConfig(_entryData->_regex);
_connection->SetConnection(_entryData->_connection);
_connection->SetConnection(_entryData->GetConnection());
if (_entryData->_type == MacroConditionWebsocket::Type::REQUEST) {
if (_entryData->GetType() == MacroConditionWebsocket::Type::REQUEST) {
SetupRequestEdit();
} else {
SetupEventEdit();
@ -209,8 +241,8 @@ void MacroConditionWebsocketEdit::ConditionChanged(int index)
}
auto lock = LockContext();
_entryData->_type = static_cast<MacroConditionWebsocket::Type>(index);
if (_entryData->_type == MacroConditionWebsocket::Type::REQUEST) {
_entryData->SetType(static_cast<MacroConditionWebsocket::Type>(index));
if (_entryData->GetType() == MacroConditionWebsocket::Type::REQUEST) {
SetupRequestEdit();
} else {
SetupEventEdit();
@ -240,7 +272,7 @@ void MacroConditionWebsocketEdit::ConnectionSelectionChanged(
}
auto lock = LockContext();
_entryData->_connection = GetWeakConnectionByQString(connection);
_entryData->SetConnection(connection.toStdString());
emit(HeaderInfoChanged(connection));
}

View File

@ -3,6 +3,7 @@
#include "connection-manager.hpp"
#include "variable-text-edit.hpp"
#include "regex-config.hpp"
#include "websocket-helpers.hpp"
#include <QCheckBox>
@ -10,7 +11,7 @@ namespace advss {
class MacroConditionWebsocket : public MacroCondition {
public:
MacroConditionWebsocket(Macro *m) : MacroCondition(m, true) {}
MacroConditionWebsocket(Macro *m);
bool CheckCondition();
bool Save(obs_data_t *obj) const;
bool Load(obs_data_t *obj);
@ -26,14 +27,20 @@ public:
EVENT,
};
Type _type = Type::REQUEST;
void SetType(Type);
Type GetType() const { return _type; }
void SetConnection(const std::string &);
std::weak_ptr<Connection> GetConnection() const;
StringVariable _message = obs_module_text("AdvSceneSwitcher.enterText");
RegexConfig _regex;
std::weak_ptr<Connection> _connection;
private:
void SetupTempVars();
Type _type = Type::REQUEST;
std::weak_ptr<Connection> _connection;
WebsocketMessageBuffer _messageBuffer;
static bool _registered;
static const std::string id;
};

View File

@ -199,6 +199,11 @@ void Connection::Save(obs_data_t *obj) const
obs_data_set_int(obj, "version", 1);
}
WebsocketMessageBuffer Connection::RegisterForEvents()
{
return _client.RegisterForEvents();
}
void Connection::UseOBSWebsocketProtocol(bool useOBSWSProtocol)
{
_useOBSWSProtocol = useOBSWSProtocol;

View File

@ -40,9 +40,9 @@ public:
void SendMsg(const std::string &msg);
void Load(obs_data_t *obj);
void Save(obs_data_t *obj) const;
std::string GetName() { return _name; }
std::vector<WSMessage> &Events() { return _client.Events(); }
bool IsUsingOBSProtocol() { return _useOBSWSProtocol; }
std::string GetName() const { return _name; }
WebsocketMessageBuffer RegisterForEvents();
bool IsUsingOBSProtocol() const { return _useOBSWSProtocol; }
private:
void UseOBSWebsocketProtocol(bool);

View File

@ -14,14 +14,14 @@ using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;
#define RPC_VERSION 1
#undef DispatchMessage
constexpr char VendorName[] = "AdvancedSceneSwitcher";
constexpr char VendorRequest[] = "AdvancedSceneSwitcherMessage";
constexpr char VendorEvent[] = "AdvancedSceneSwitcherEvent";
obs_websocket_vendor vendor;
static void clearWebsocketMessages();
static std::vector<WSMessage> websocketMessages;
static WebsocketMessageDispatcher websocketMessageDispatcher;
static void registerWebsocketVendor();
static bool setup();
@ -29,36 +29,13 @@ static bool setupDone = setup();
bool setup()
{
AddIntervalResetStep(clearWebsocketMessages);
AddPluginPostLoadStep(registerWebsocketVendor);
return true;
}
std::vector<WSMessage> &GetWebsocketMessages()
WebsocketMessageBuffer RegisterForWebsocketMessages()
{
return websocketMessages;
}
static void clearWebsocketMessages()
{
websocketMessages.erase(std::remove_if(websocketMessages.begin(),
websocketMessages.end(),
[](const WSMessage &message) {
return message.processed;
}),
websocketMessages.end());
for (auto &connection : GetConnections()) {
auto c = dynamic_cast<Connection *>(connection.get());
if (!c) {
continue;
}
auto &messages = c->Events();
messages.erase(std::remove_if(messages.begin(), messages.end(),
[](const WSMessage &message) {
return message.processed;
}),
messages.end());
}
return websocketMessageDispatcher.RegisterClient();
}
void SendWebsocketEvent(const std::string &eventMsg)
@ -79,8 +56,7 @@ static void receiveWebsocketMessage(obs_data_t *request_data, obs_data_t *,
}
auto msg = obs_data_get_string(request_data, "message");
auto lock = LockContext();
websocketMessages.emplace_back(msg);
websocketMessageDispatcher.DispatchMessage(msg);
vblog(LOG_INFO, "received message: %s", msg);
}
@ -247,6 +223,11 @@ void WSConnection::SendRequest(const std::string &msg)
Send(msg);
}
WebsocketMessageBuffer WSConnection::RegisterForEvents()
{
return _dispatcher.RegisterClient();
}
WSConnection::Status WSConnection::GetStatus() const
{
return _status;
@ -329,8 +310,8 @@ void WSConnection::HandleEvent(obs_data_t *msg)
return;
}
auto eventDataNested = obs_data_get_obj(eventData, "eventData");
auto lock = LockContext();
_messages.emplace_back(obs_data_get_string(eventDataNested, "message"));
_dispatcher.DispatchMessage(
obs_data_get_string(eventDataNested, "message"));
vblog(LOG_INFO, "received event msg \"%s\"",
obs_data_get_string(eventDataNested, "message"));
obs_data_release(eventDataNested);
@ -361,9 +342,8 @@ void WSConnection::OnGenericMessage(connection_hdl, client::message_ptr message)
return;
}
auto lock = LockContext();
const auto payload = message->get_payload();
_messages.emplace_back(payload);
_dispatcher.DispatchMessage(payload);
vblog(LOG_INFO, "received event msg \"%s\"", payload.c_str());
}

View File

@ -1,4 +1,6 @@
#pragma once
#include "message-buffer.hpp"
#include "message-dispatcher.hpp"
#include <set>
#include <QtCore/QObject>
@ -21,16 +23,12 @@
namespace advss {
using websocketpp::connection_hdl;
struct WSMessage {
WSMessage(const std::string &m) : message(m) {}
std::string message = "";
bool processed = false;
};
using WebsocketMessageBuffer = std::shared_ptr<MessageBuffer<std::string>>;
using WebsocketMessageDispatcher = MessageDispatcher<std::string>;
void SendWebsocketEvent(const std::string &);
std::string ConstructVendorRequestMessage(const std::string &message);
std::vector<WSMessage> &GetWebsocketMessages();
[[nodiscard]] WebsocketMessageBuffer RegisterForWebsocketMessages();
class WSConnection : public QObject {
using server = websocketpp::server<websocketpp::config::asio>;
@ -44,7 +42,7 @@ public:
bool _reconnect, int reconnectDelay = 10);
void Disconnect();
void SendRequest(const std::string &msg);
std::vector<WSMessage> &Events() { return _messages; }
[[nodiscard]] WebsocketMessageBuffer RegisterForEvents();
std::string GetFail() { return _failMsg; }
enum class Status {
@ -82,7 +80,7 @@ private:
std::atomic<Status> _status = {Status::DISCONNECTED};
std::atomic_bool _disconnect{false};
std::vector<WSMessage> _messages;
WebsocketMessageDispatcher _dispatcher;
};
} // namespace advss