Add MQTT helpers

This commit is contained in:
WarmUpTill 2025-04-22 15:24:04 +02:00 committed by WarmUpTill
parent 88514e209d
commit cab50e0922
6 changed files with 1162 additions and 1 deletions

View File

@ -275,12 +275,15 @@ static bool containsSensitiveData(obs_data_t *data)
obs_data_get_array(data, "twitchConnections");
OBSDataArrayAutoRelease websocketConnections =
obs_data_get_array(data, "websocketConnections");
OBSDataArrayAutoRelease mqttConnections =
obs_data_get_array(data, "mqttConnections");
auto isNotEmpty = [](obs_data_array *array) {
return obs_data_array_count(array) > 0;
};
return isNotEmpty(twitchTokens) || isNotEmpty(websocketConnections);
return isNotEmpty(twitchTokens) || isNotEmpty(websocketConnections) ||
isNotEmpty(mqttConnections);
}
void AdvSceneSwitcher::on_exportSettings_clicked()

View File

@ -0,0 +1,45 @@
cmake_minimum_required(VERSION 3.14)
project(advanced-scene-switcher-mqtt)
# --- Check paho.mqtt.cpp requirements ---
find_package(PahoMqttCpp)
if(NOT PahoMqttCpp_FOUND)
message(WARNING "PahoMqttCpp not found!\n"
"MQTT support will be disabled!\n\n")
return()
endif()
# --- End of section ---
add_library(${PROJECT_NAME} MODULE)
if(PAHO_MQTT_CPP_VERSION VERSION_GREATER_EQUAL "1.1.0")
target_compile_definitions(${PROJECT_NAME} PRIVATE ENABLE_MQTT5_SUPPORT=1)
endif()
# PDB was not found with paho-mqttpp3-static.lib
if(MSVC)
target_link_options(${PROJECT_NAME} PRIVATE /IGNORE:4099)
endif()
target_sources(
${PROJECT_NAME}
PRIVATE macro-action-mqtt.cpp
macro-action-mqtt.hpp
macro-condition-mqtt.hpp
macro-condition-mqtt.cpp
mqtt-helpers.cpp
mqtt-helpers.hpp
topic-selection.cpp
topic-selection.hpp)
setup_advss_plugin(${PROJECT_NAME})
set_target_properties(${PROJECT_NAME} PROPERTIES PREFIX "")
if(OS_LINUX)
target_link_libraries(${PROJECT_NAME} PRIVATE PahoMqttCpp::paho-mqttpp3)
else()
target_link_libraries(${PROJECT_NAME}
PRIVATE PahoMqttCpp::paho-mqttpp3-static)
endif()
install_advss_plugin(${PROJECT_NAME})

View File

@ -0,0 +1,639 @@
#include "mqtt-helpers.hpp"
#include "layout-helpers.hpp"
#include "log-helper.hpp"
#include "obs-module-helper.hpp"
#include "plugin-state-helpers.hpp"
#include "ui-helpers.hpp"
#include <obs.hpp>
#include <QTimer>
#undef DispatchMessage
namespace advss {
MqttConnection::~MqttConnection()
{
Disconnect();
}
MqttConnection::MqttConnection(const MqttConnection &other)
: Item(other._name),
_uri(other._uri),
_username(other._username),
_password(other._password),
_connectOnStart(other._connectOnStart),
_reconnect(other._reconnect),
_reconnectDelay(other._reconnectDelay)
{
}
MqttConnection::MqttConnection(const std::string &name, const std::string &uri,
const std::string &username,
const std::string &password, bool connectOnStart,
bool reconnect, int reconnectDelay)
: Item(name),
_uri(uri),
_username(username),
_password(password),
_connectOnStart(connectOnStart),
_reconnect(reconnect),
_reconnectDelay(reconnectDelay)
{
}
void MqttConnection::Connect()
{
static std::mutex mutex;
std::scoped_lock<std::mutex> lock(mutex);
if (_connecting) {
return;
}
if (_thread.joinable()) {
_thread.join();
}
_connecting = true;
_disconnect = false;
_thread = std::thread(&MqttConnection::ConnectThread, this);
}
void MqttConnection::Disconnect()
{
_disconnect = true;
{
std::unique_lock<std::mutex> waitLck(_waitMtx);
_cv.notify_all();
}
if (_thread.joinable()) {
_thread.join();
}
}
void MqttConnection::ConnectThread()
{
using namespace std::chrono_literals;
const auto waitBeforeReconnect = [this]() {
std::unique_lock<std::mutex> lck(_waitMtx);
vblog(LOG_INFO,
"trying to reconnect to MQTT server \"%s\" in %d seconds.",
_name.c_str(), _reconnectDelay);
_cv.wait_for(lck, std::chrono::seconds(_reconnectDelay));
};
const auto logConnectionLost = [this](const std::string &cause) {
blog(LOG_INFO, "MQTT connection \"%s\" lost: %s", _name.c_str(),
cause.c_str());
};
const auto dispatchMessage = [this](mqtt::const_message_ptr msg) {
if (!msg) {
return;
}
vblog(LOG_INFO, "MQTT connection \"%s\" received message: %s",
_name.c_str(), msg->to_string().c_str());
_dispatcher.DispatchMessage(msg->to_string());
};
do {
std::unique_lock<std::mutex> clientLock(_clientMtx);
_client = std::make_shared<mqtt::async_client>(
_uri, std::string("advss_") + _name);
#ifdef ENABLE_MQTT5_SUPPORT
auto connOpts = mqtt::connect_options_builder::v5()
#else
auto connOpts = mqtt::connect_options_builder()
#endif
.clean_start(false)
.clean_session(true)
.connect_timeout(5s)
.user_name(_username)
.password(_password)
.finalize();
_client->set_connection_lost_handler(logConnectionLost);
_client->set_message_callback(dispatchMessage);
_client->start_consuming();
try {
vblog(LOG_INFO, "connecting to MQTT server \"%s\" ...",
_name.c_str());
auto tok = _client->connect(connOpts);
auto rsp = tok->get_connect_response();
#ifdef ENABLE_MQTT5_SUPPORT
if (rsp.get_mqtt_version() < MQTTVERSION_5) {
blog(LOG_INFO,
"\"%s\" did not get an MQTT v5 connection",
_name.c_str());
break;
}
#endif
if (!rsp.is_session_present()) {
vblog(LOG_INFO,
"\"%s\" session not present on broker. subscribing...",
_name.c_str());
const auto topics = std::make_shared<
mqtt::string_collection>(_topics);
_client->subscribe(topics, _qos)->wait();
}
blog(LOG_INFO,
"\"%s\" connection established to MQTT server!",
_name.c_str());
_connected = true;
clientLock.unlock();
while (!_disconnect && _client->is_connected()) {
std::unique_lock<std::mutex> lock(_waitMtx);
_cv.wait_for(lock, std::chrono::seconds(
_reconnectDelay));
}
clientLock.lock();
if (_client->is_connected()) {
blog(LOG_INFO,
"Disconnecting from the MQTT server \"%s\"",
_name.c_str());
_client->stop_consuming();
_client->disconnect()->wait();
} else {
blog(LOG_INFO,
"MQTT client %s was disconnected",
_name.c_str());
}
_client.reset();
_connected = false;
if (!_reconnect || _disconnect) {
break;
}
waitBeforeReconnect();
} catch (const std::exception &e) {
_lastError = e.what();
blog(LOG_INFO, "%s %s", __func__, _lastError.c_str());
_client.reset();
_connected = false;
if (!_reconnect || _disconnect) {
break;
}
waitBeforeReconnect();
}
} while (_reconnect && !_disconnect);
_connecting = false;
}
bool MqttConnection::SendMessage(const std::string &topic,
const std::string &payload, int qos,
bool retained)
{
std::scoped_lock<std::mutex> lock(_clientMtx);
if (!_client || !_client->is_connected()) {
blog(LOG_WARNING,
"Cannot send message: MQTT client \"%s\" is not connected",
_name.c_str());
Connect();
return false;
}
try {
auto msg = mqtt::make_message(topic, payload);
msg->set_qos(qos);
msg->set_retained(retained);
_client->publish(msg);
vblog(LOG_INFO, "Sent MQTT message on \"%s\": %s",
topic.c_str(), payload.c_str());
return true;
} catch (const mqtt::exception &e) {
blog(LOG_WARNING, "MQTT send error for \"%s\": %s",
_name.c_str(), e.what());
return false;
}
}
void MqttConnection::Load(obs_data_t *data)
{
Item::Load(data);
_uri = obs_data_get_string(data, "uri");
_username = obs_data_get_string(data, "username");
_password = obs_data_get_string(data, "password");
_connectOnStart = obs_data_get_bool(data, "connectOnStart");
_reconnect = obs_data_get_bool(data, "reconnect");
_reconnectDelay = obs_data_get_int(data, "reconnectDelay");
_topics.clear();
OBSDataArrayAutoRelease array = obs_data_get_array(data, "topics");
size_t count = obs_data_array_count(array);
for (size_t i = 0; i < count; i++) {
OBSDataAutoRelease obj = obs_data_array_item(array, i);
_topics.push_back(obs_data_get_string(obj, "topic"));
}
_qos.clear();
array = obs_data_get_array(data, "qos");
count = obs_data_array_count(array);
for (size_t i = 0; i < count; i++) {
OBSDataAutoRelease obj = obs_data_array_item(array, i);
_qos.push_back(obs_data_get_int(obj, "qos"));
}
if (ConnectOnStartup()) {
Connect();
}
}
void MqttConnection::Save(obs_data_t *data) const
{
Item::Save(data);
obs_data_set_string(data, "uri", _uri.c_str());
obs_data_set_string(data, "username", _username.c_str());
obs_data_set_string(data, "password", _password.c_str());
obs_data_set_bool(data, "connectOnStart", _connectOnStart);
obs_data_set_bool(data, "reconnect", _reconnect);
obs_data_set_int(data, "reconnectDelay", _reconnectDelay);
OBSDataArrayAutoRelease array = obs_data_array_create();
for (const auto &topic : _topics) {
OBSDataAutoRelease obj = obs_data_create();
obs_data_set_string(obj, "topic", topic.c_str());
obs_data_array_push_back(array, obj);
}
obs_data_set_array(data, "topics", array);
array = obs_data_array_create();
for (const int qos : _qos) {
OBSDataAutoRelease obj = obs_data_create();
obs_data_set_int(obj, "qos", qos);
obs_data_array_push_back(array, obj);
}
obs_data_set_array(data, "qos", array);
}
MqttMessageBuffer MqttConnection::RegisterForEvents()
{
return _dispatcher.RegisterClient();
}
QString MqttConnection::GetStatus() const
{
if (_connected) {
return obs_module_text(
"AdvSceneSwitcher.mqttConnection.status.connected");
}
if (_connecting) {
return obs_module_text(
"AdvSceneSwitcher.mqttConnection.status.connecting");
}
if (_lastError.empty()) {
return obs_module_text(
"AdvSceneSwitcher.mqttConnection.status.disconnected");
}
QString status(obs_module_text(
"AdvSceneSwitcher.mqttConnection.status.disconnected"));
status += " (" + QString::fromStdString(_lastError) + ")";
return status;
}
MqttConnectionSettingsDialog::MqttConnectionSettingsDialog(
QWidget *parent, const MqttConnection &connection)
: ItemSettingsDialog(connection, GetMqttConnections(),
"AdvSceneSwitcher.mqttConnection.select",
"AdvSceneSwitcher.mqttConnection.add",
"AdvSceneSwitcher.item.nameNotAvailable", true,
parent),
_uri(new QLineEdit()),
_username(new QLineEdit()),
_password(new QLineEdit()),
_showPassword(new QPushButton()),
_topics(new MqttTopicListWidget(this)),
_connectOnStart(new QCheckBox()),
_reconnect(new QCheckBox()),
_reconnectDelay(new QSpinBox()),
_status(new QLabel()),
_test(new QPushButton(
obs_module_text("AdvSceneSwitcher.mqttConnection.test"))),
_layout(new QGridLayout())
{
_showPassword->setMaximumWidth(22);
_showPassword->setFlat(true);
_showPassword->setStyleSheet(
"QPushButton { background-color: transparent; border: 0px }");
_uri->setText(QString::fromStdString(connection._uri));
_username->setText(QString::fromStdString(connection._username));
_password->setText(QString::fromStdString(connection._password));
_topics->SetValues(connection._topics, connection._qos);
_reconnectDelay->setMaximum(9999);
_reconnectDelay->setSuffix("s");
_connectOnStart->setChecked(connection._connectOnStart);
_reconnect->setChecked(connection._reconnect);
_reconnectDelay->setValue(connection._reconnectDelay);
QWidget::connect(_showPassword, SIGNAL(pressed()), this,
SLOT(ShowPassword()));
QWidget::connect(_showPassword, SIGNAL(released()), this,
SLOT(HidePassword()));
QWidget::connect(_reconnect, SIGNAL(stateChanged(int)), this,
SLOT(ReconnectChanged(int)));
QWidget::connect(_test, SIGNAL(clicked()), this,
SLOT(TestConnection()));
int row = 0;
_layout->addWidget(new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.name")),
row, 0);
auto nameLayout = new QHBoxLayout();
nameLayout->addWidget(_name);
nameLayout->addWidget(_nameHint);
_layout->addLayout(nameLayout, row, 1);
++row;
_layout->addWidget(new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.address")),
row, 0);
_layout->addWidget(_uri, row, 1);
++row;
_layout->addWidget(new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.username")),
row, 0);
_layout->addWidget(_username, row, 1);
++row;
auto passLayout = new QHBoxLayout();
passLayout->addWidget(_password);
passLayout->addWidget(_showPassword);
_layout->addLayout(passLayout, row, 1);
++row;
_layout->addWidget(new QLabel(
obs_module_text("AdvSceneSwitcher.mqttConnection.topics")));
++row;
_layout->addWidget(_topics, row, 0, 1, -1);
++row;
_layout->addWidget(
new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.connectOnStart")),
row, 0);
_layout->addWidget(_connectOnStart, row, 1);
++row;
_layout->addWidget(
new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.reconnect")),
row, 0);
_layout->addWidget(_reconnect, row, 1);
++row;
_layout->addWidget(
new QLabel(obs_module_text(
"AdvSceneSwitcher.mqttConnection.reconnectDelay")),
row, 0);
_layout->addWidget(_reconnectDelay, row, 1);
++row;
_layout->addWidget(_test, row, 0);
_layout->addWidget(_status, row, 1);
++row;
_layout->addWidget(_buttonbox, row, 0, 1, -1);
setLayout(_layout);
MinimizeSizeOfColumn(_layout, 0);
ReconnectChanged(_reconnect->isChecked());
HidePassword();
}
bool MqttConnectionSettingsDialog::AskForSettings(QWidget *parent,
MqttConnection &connection)
{
MqttConnectionSettingsDialog dialog(parent, connection);
dialog.setWindowTitle(obs_module_text("AdvSceneSwitcher.windowTitle"));
if (dialog.exec() != DialogCode::Accepted) {
return false;
}
connection._name = dialog._name->text().toStdString();
connection._uri = dialog._uri->text().toStdString();
connection._username = dialog._username->text().toStdString();
connection._password = dialog._password->text().toStdString();
connection._topics = dialog._topics->GetTopics();
connection._qos = dialog._topics->GetQoS();
connection._connectOnStart = dialog._connectOnStart->isChecked();
connection._reconnect = dialog._reconnect->isChecked();
connection._reconnectDelay = dialog._reconnectDelay->value();
if (connection._connecting) {
connection.Disconnect();
}
connection.Connect();
return true;
}
void MqttConnectionSettingsDialog::ShowPassword()
{
SetButtonIcon(_showPassword, GetThemeTypeName() == "Light"
? ":res/images/visible.svg"
: "theme:Dark/visible.svg");
_password->setEchoMode(QLineEdit::Normal);
}
void MqttConnectionSettingsDialog::HidePassword()
{
SetButtonIcon(_showPassword, ":res/images/invisible.svg");
_password->setEchoMode(QLineEdit::PasswordEchoOnEdit);
}
void MqttConnectionSettingsDialog::ReconnectChanged(int state)
{
_reconnectDelay->setEnabled(state);
}
void MqttConnectionSettingsDialog::TestConnection()
{
if (_updateStatusTimer) {
_updateStatusTimer->stop();
_updateStatusTimer->deleteLater();
}
auto connection = std::make_shared<MqttConnection>();
connection->_name = _name->text().toStdString();
connection->_uri = _uri->text().toStdString();
connection->_username = _username->text().toStdString();
connection->_password = _password->text().toStdString();
connection->_topics = _topics->GetTopics();
connection->_qos = _topics->GetQoS();
connection->_connectOnStart = false;
connection->_reconnect = false;
connection->_reconnectDelay = _reconnectDelay->value();
connection->Connect();
_updateStatusTimer = new QTimer(this);
_updateStatusTimer->setInterval(300);
QObject::connect(_updateStatusTimer, &QTimer::timeout,
[this, connection]() {
_status->setText(connection->GetStatus());
});
_updateStatusTimer->start();
}
static bool AskForSettingsWrapper(QWidget *parent, Item &settings)
{
MqttConnection &connection = dynamic_cast<MqttConnection &>(settings);
if (MqttConnectionSettingsDialog::AskForSettings(parent, connection)) {
return true;
}
return false;
}
MqttConnectionSelection::MqttConnectionSelection(QWidget *parent)
: ItemSelection(GetMqttConnections(), MqttConnection::Create,
AskForSettingsWrapper,
"AdvSceneSwitcher.mqttConnection.select",
"AdvSceneSwitcher.mqttConnection.add",
"AdvSceneSwitcher.item.nameNotAvailable",
"AdvSceneSwitcher.mqttConnection.configure", parent)
{
// Connect to slots
QWidget::connect(MqttConnectionSignalManager::Instance(),
SIGNAL(Rename(const QString &, const QString &)), this,
SLOT(RenameItem(const QString &, const QString &)));
QWidget::connect(MqttConnectionSignalManager::Instance(),
SIGNAL(Add(const QString &)), this,
SLOT(AddItem(const QString &)));
QWidget::connect(MqttConnectionSignalManager::Instance(),
SIGNAL(Remove(const QString &)), this,
SLOT(RemoveItem(const QString &)));
// Forward signals
QWidget::connect(this,
SIGNAL(ItemRenamed(const QString &, const QString &)),
MqttConnectionSignalManager::Instance(),
SIGNAL(Rename(const QString &, const QString &)));
QWidget::connect(this, SIGNAL(ItemAdded(const QString &)),
MqttConnectionSignalManager::Instance(),
SIGNAL(Add(const QString &)));
QWidget::connect(this, SIGNAL(ItemRemoved(const QString &)),
MqttConnectionSignalManager::Instance(),
SIGNAL(Remove(const QString &)));
}
void MqttConnectionSelection::SetConnection(
const std::weak_ptr<MqttConnection> &connection_)
{
auto connection = connection_.lock();
if (connection) {
SetItem(connection->Name());
} else {
SetItem("");
}
}
MqttConnectionSignalManager::MqttConnectionSignalManager(QObject *parent)
: QObject(parent)
{
QWidget::connect(this, SIGNAL(Add(const QString &)), this,
SLOT(OpenNewConnection(const QString &)));
}
MqttConnectionSignalManager *MqttConnectionSignalManager::Instance()
{
static MqttConnectionSignalManager manager;
return &manager;
}
void MqttConnectionSignalManager::OpenNewConnection(const QString &name)
{
auto weakConnection = GetWeakMqttConnectionByName(name.toStdString());
auto connection = weakConnection.lock();
if (!connection) {
return;
}
if (connection->ConnectOnStartup()) {
connection->Connect();
}
}
std::deque<std::shared_ptr<Item>> &GetMqttConnections()
{
static std::deque<std::shared_ptr<Item>> connections;
return connections;
}
MqttConnection *GetMqttConnectionByName(const QString &name)
{
return GetMqttConnectionByName(name.toStdString());
}
MqttConnection *GetMqttConnectionByName(const std::string &name)
{
for (const auto &connection : GetMqttConnections()) {
if (connection->Name() == name) {
return dynamic_cast<MqttConnection *>(connection.get());
}
}
return nullptr;
}
std::weak_ptr<MqttConnection>
GetWeakMqttConnectionByName(const std::string &name)
{
for (const auto &connection : GetMqttConnections()) {
if (connection->Name() == name) {
std::weak_ptr<MqttConnection> weak =
std::dynamic_pointer_cast<MqttConnection>(
connection);
return weak;
}
}
return std::weak_ptr<MqttConnection>();
}
std::weak_ptr<MqttConnection>
GetWeakMqttConnectionByQString(const QString &name)
{
return GetWeakMqttConnectionByName(name.toStdString());
}
std::string
GetWeakMqttConnectionName(const std::weak_ptr<MqttConnection> &connection_)
{
auto connection = connection_.lock();
if (!connection) {
return obs_module_text(
"AdvSceneSwitcher.mqttConnection.invalid");
}
return connection->Name();
}
static void SaveMqttConnections(obs_data_t *obj)
{
OBSDataArrayAutoRelease array = obs_data_array_create();
for (const auto &connection : GetMqttConnections()) {
OBSDataAutoRelease obj = obs_data_create();
connection->Save(obj);
obs_data_array_push_back(array, obj);
}
obs_data_set_array(obj, "mqttConnections", array);
}
static void LoadMqttConnections(obs_data_t *obj)
{
GetMqttConnections().clear();
OBSDataArrayAutoRelease array =
obs_data_get_array(obj, "mqttConnections");
size_t count = obs_data_array_count(array);
for (size_t i = 0; i < count; i++) {
OBSDataAutoRelease obj = obs_data_array_item(array, i);
auto connection = MqttConnection::Create();
GetMqttConnections().emplace_back(connection);
GetMqttConnections().back()->Load(obj);
}
}
static bool setup()
{
AddSaveStep(SaveMqttConnections);
AddLoadStep(LoadMqttConnections);
return true;
}
static bool _ = setup();
} // namespace advss

View File

@ -0,0 +1,134 @@
#pragma once
#include "message-dispatcher.hpp"
#include "item-selection-helpers.hpp"
#include "topic-selection.hpp"
#include <condition_variable>
#include <obs-data.h>
#include <QCheckBox>
#include <QComboBox>
#include <QLayout>
#include <QPushButton>
#include <QSpinBox>
#include <string>
#include <thread>
#include <mqtt/async_client.h>
namespace advss {
using MqttMessageBuffer = std::shared_ptr<MessageBuffer<std::string>>;
using MqttMessageDispatcher = MessageDispatcher<std::string>;
class MqttConnection : public Item {
public:
MqttConnection() = default;
~MqttConnection();
MqttConnection(const MqttConnection &other);
MqttConnection(const std::string &name, const std::string &uri,
const std::string &username, const std::string &password,
bool connectOnStart, bool reconnect, int reconnectDelay);
static std::shared_ptr<Item> Create()
{
return std::make_shared<MqttConnection>();
}
void Connect();
bool SendMessage(const std::string &topic, const std::string &payload,
int qos, bool retained);
void Load(obs_data_t *data);
void Save(obs_data_t *data) const;
MqttMessageBuffer RegisterForEvents();
bool ConnectOnStartup() const { return _connectOnStart; }
QString GetStatus() const;
private:
void ConnectThread();
void Disconnect();
std::string _uri = "mqtt://localhost:1883";
std::string _username = "user";
std::string _password = "password";
std::vector<std::string> _topics = {"/#"};
std::vector<int> _qos = {1};
std::thread _thread;
bool _connectOnStart = true;
bool _reconnect = true;
int _reconnectDelay = 3;
std::shared_ptr<mqtt::async_client> _client;
std::atomic_bool _disconnect{false};
std::atomic_bool _connected{false};
std::atomic_bool _connecting{false};
std::mutex _clientMtx;
std::mutex _waitMtx;
std::condition_variable _cv;
std::string _lastError = "";
MqttMessageDispatcher _dispatcher;
friend class MqttConnectionSettingsDialog;
};
class MqttConnectionSettingsDialog : public ItemSettingsDialog {
Q_OBJECT
public:
MqttConnectionSettingsDialog(QWidget *parent, const MqttConnection &);
static bool AskForSettings(QWidget *parent, MqttConnection &settings);
private slots:
void ShowPassword();
void HidePassword();
void ReconnectChanged(int state);
void TestConnection();
private:
QLineEdit *_uri;
QLineEdit *_username;
QLineEdit *_password;
QPushButton *_showPassword;
MqttTopicListWidget *_topics;
QCheckBox *_connectOnStart;
QCheckBox *_reconnect;
QSpinBox *_reconnectDelay;
QLabel *_status;
QPushButton *_test;
QGridLayout *_layout;
QTimer *_updateStatusTimer = nullptr;
};
class MqttConnectionSelection : public ItemSelection {
Q_OBJECT
public:
MqttConnectionSelection(QWidget *parent = 0);
void SetConnection(const std::weak_ptr<MqttConnection> &);
};
class MqttConnectionSignalManager : public QObject {
Q_OBJECT
public:
MqttConnectionSignalManager(QObject *parent = nullptr);
static MqttConnectionSignalManager *Instance();
private slots:
void OpenNewConnection(const QString &);
signals:
void Rename(const QString &, const QString &);
void Add(const QString &);
void Remove(const QString &);
};
std::deque<std::shared_ptr<Item>> &GetMqttConnections();
MqttConnection *GetMqttConnectionByName(const QString &);
MqttConnection *GetMqttConnectionByName(const std::string &);
std::weak_ptr<MqttConnection>
GetWeakMqttConnectionByName(const std::string &name);
std::weak_ptr<MqttConnection>
GetWeakMqttConnectionByQString(const QString &name);
std::string GetWeakMqttConnectionName(const std::weak_ptr<MqttConnection> &);
} // namespace advss

View File

@ -0,0 +1,284 @@
#include "topic-selection.hpp"
#include "layout-helpers.hpp"
#include "log-helper.hpp"
#include "obs-module-helper.hpp"
#include "ui-helpers.hpp"
#include <QHeaderView>
#include <QMessageBox>
namespace advss {
MqttTopicListWidget::MqttTopicListWidget(QWidget *parent)
: QWidget(parent),
_table(new QTableWidget(0, 2, this)),
_controls(new ListControls(this, false))
{
_table->setHorizontalHeaderLabels(
QStringList()
<< obs_module_text("AdvSceneSwitcher.mqttConnection.topic")
<< obs_module_text("AdvSceneSwitcher.mqttConnection.qos"));
_table->horizontalHeader()->setStretchLastSection(true);
_table->setSelectionBehavior(QAbstractItemView::SelectRows);
_table->setSelectionMode(QAbstractItemView::SingleSelection);
_table->setEditTriggers(QAbstractItemView::DoubleClicked |
QAbstractItemView::SelectedClicked);
_table->verticalHeader()->hide();
_table->horizontalHeader()->setSectionResizeMode(0,
QHeaderView::Stretch);
_table->horizontalHeader()->setSectionResizeMode(
1, QHeaderView::ResizeToContents);
connect(_table, &QTableWidget::itemChanged, this,
&MqttTopicListWidget::ModifyRow);
connect(_controls, &ListControls::Add, this,
&MqttTopicListWidget::AddTopic);
connect(_controls, &ListControls::Remove, this,
&MqttTopicListWidget::RemoveSelectedRow);
auto mainLayout = new QVBoxLayout(this);
mainLayout->addWidget(_table);
mainLayout->addWidget(_controls);
setLayout(mainLayout);
}
void MqttTopicListWidget::SetValues(const std::vector<std::string> &topics,
const std::vector<int> &qos)
{
assert(topics.size() == qos.size());
if (topics.size() != qos.size()) {
blog(LOG_WARNING, "%s topics and qos size mismatch", __func__);
return;
}
_table->setRowCount(0);
_topicSet.clear();
std::vector<std::pair<QString, int>> entries;
for (size_t i = 0; i < topics.size(); ++i) {
auto topic = QString::fromStdString(topics[i]).trimmed();
int qos_ = qos[i];
if (topic.isEmpty()) {
blog(LOG_INFO, "%s ignoring empty topic entry",
__func__);
continue;
}
if (_topicSet.contains(topic)) {
blog(LOG_INFO, "%s discarding duplicate topic \"%s\"",
__func__, topics[i].c_str());
continue;
}
if (qos_ < 0 || qos_ > 2) {
blog(LOG_INFO,
"%s discard invalid QoS level \"%d\" and set to \"1\"",
__func__, qos[i]);
qos_ = 1;
}
entries.emplace_back(topic, qos_);
_topicSet.insert(topic);
}
for (const auto &entry : entries) {
int row = _table->rowCount();
_table->insertRow(row);
_table->setItem(row, 0, new QTableWidgetItem(entry.first));
_table->setItem(
row, 1,
new QTableWidgetItem(QString::number(entry.second)));
}
SortTable();
}
std::vector<std::string> MqttTopicListWidget::GetTopics()
{
std::vector<std::string> result;
for (int i = 0; i < _table->rowCount(); ++i) {
auto topic = _table->item(i, 0)->text().trimmed();
result.push_back(topic.toStdString());
}
return result;
}
std::vector<int> MqttTopicListWidget::GetQoS()
{
std::vector<int> result;
for (int i = 0; i < _table->rowCount(); ++i) {
bool isValidInt = false;
int qos = _table->item(i, 1)->text().toInt(&isValidInt);
result.push_back(isValidInt ? qos : 1);
}
return result;
}
void MqttTopicListWidget::AddTopic()
{
AddMqttTopicDialog dialog(this);
if (dialog.exec() != QDialog::Accepted) {
return;
}
auto topic = dialog.Topic();
int qos = dialog.QoS();
if (topic.isEmpty()) {
ShowTopicEmptyWarning();
return;
}
if (_topicSet.contains(topic)) {
ShowTopicDuplicateWarning();
return;
}
InsertSorted(topic, qos);
_topicSet.insert(topic);
}
void MqttTopicListWidget::InsertSorted(const QString &topic, int qos)
{
int row = 0;
while (row < _table->rowCount()) {
QString current = _table->item(row, 0)->text();
if (topic.compare(current, Qt::CaseInsensitive) < 0)
break;
++row;
}
_table->insertRow(row);
_table->setItem(row, 0, new QTableWidgetItem(topic));
_table->setItem(row, 1, new QTableWidgetItem(QString::number(qos)));
}
void MqttTopicListWidget::SortTable()
{
_table->sortItems(0, Qt::AscendingOrder);
}
void MqttTopicListWidget::ShowTopicDuplicateWarning()
{
QMessageBox::warning(
this,
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.duplicateTopic.title"),
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.duplicateTopic"));
}
void MqttTopicListWidget::ShowTopicEmptyWarning()
{
QMessageBox::warning(
this,
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.emptyTopic.title"),
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.emptyTopic"));
}
void MqttTopicListWidget::ShowQoSRangeWarning()
{
QMessageBox::warning(
this,
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.qosRange.title"),
obs_module_text(
"AdvSceneSwitcher.mqttConnection.inputWarning.qosRange"));
}
void MqttTopicListWidget::RemoveSelectedRow()
{
auto selected = _table->currentRow();
if (selected >= 0) {
QString topic = _table->item(selected, 0)->text();
_topicSet.remove(topic);
_table->removeRow(selected);
}
}
void MqttTopicListWidget::ModifyRow(QTableWidgetItem *item)
{
if (!item) {
return;
}
int row = item->row();
auto tableItemTopic = _table->item(row, 0);
auto tableItemQos = _table->item(row, 1);
if (!tableItemTopic || !tableItemQos) {
return;
}
QString topic = tableItemTopic->text().trimmed();
QString qosStr = tableItemQos->text().trimmed();
// Validate topic column
if (item->column() == 0) {
QString oldTopic = _topicSet.values().value(row);
if (topic.isEmpty()) {
ShowTopicEmptyWarning();
item->setText(oldTopic);
return;
}
if (_topicSet.contains(topic) && topic != oldTopic) {
ShowTopicDuplicateWarning();
item->setText(oldTopic);
return;
}
_topicSet.remove(oldTopic);
_topicSet.insert(topic);
SortTable();
}
// Validate QoS column
if (item->column() == 1) {
bool isValidInt;
int qos = qosStr.toInt(&isValidInt);
if (!isValidInt || qos < 0 || qos > 2) {
ShowQoSRangeWarning();
item->setText("1");
}
}
SortTable();
}
AddMqttTopicDialog::AddMqttTopicDialog(QWidget *parent)
: QDialog(parent),
_topicEdit(new QLineEdit(this)),
_qosSpin(new QSpinBox(this))
{
setWindowTitle(
obs_module_text("AdvSceneSwitcher.mqttConnection.add.title"));
_topicEdit->setText("/#");
_qosSpin->setRange(0, 2);
_qosSpin->setValue(1);
auto buttonbox = new QDialogButtonBox(QDialogButtonBox::Ok |
QDialogButtonBox::Cancel);
buttonbox->setCenterButtons(true);
connect(buttonbox, &QDialogButtonBox::accepted, this, &QDialog::accept);
connect(buttonbox, &QDialogButtonBox::rejected, this, &QDialog::reject);
auto inputLayout = new QHBoxLayout();
PlaceWidgets(
obs_module_text("AdvSceneSwitcher.mqttConnection.add.layout"),
inputLayout, {{"{{topic}}", _topicEdit}, {"{{QoS}}", _qosSpin}},
false);
auto layout = new QVBoxLayout();
layout->addLayout(inputLayout);
layout->addWidget(buttonbox);
setLayout(layout);
}
QString AddMqttTopicDialog::Topic() const
{
return _topicEdit->text().trimmed();
}
int AddMqttTopicDialog::QoS() const
{
return _qosSpin->value();
}
} // namespace advss

View File

@ -0,0 +1,56 @@
#pragma once
#include "string-list.hpp"
#include "list-controls.hpp"
#include "variable-line-edit.hpp"
#include <QDialog>
#include <QLineEdit>
#include <QSet>
#include <QSpinBox>
#include <QTableWidget>
namespace advss {
class MqttTopicListWidget : public QWidget {
Q_OBJECT
public:
MqttTopicListWidget(QWidget *parent = nullptr);
void SetValues(const std::vector<std::string> &topics,
const std::vector<int> &qos);
std::vector<std::string> GetTopics();
std::vector<int> GetQoS();
private slots:
void AddTopic();
void RemoveSelectedRow();
void ModifyRow(QTableWidgetItem *);
private:
void InsertSorted(const QString &topic, int qos);
void SortTable();
void ShowTopicDuplicateWarning();
void ShowTopicEmptyWarning();
void ShowQoSRangeWarning();
QTableWidget *_table;
ListControls *_controls;
QSet<QString> _topicSet;
};
class AddMqttTopicDialog : public QDialog {
Q_OBJECT
public:
AddMqttTopicDialog(QWidget *parent = nullptr);
QString Topic() const;
int QoS() const;
private:
QLineEdit *_topicEdit;
QSpinBox *_qosSpin;
};
} // namespace advss