mirror of
https://github.com/WarmUpTill/SceneSwitcher.git
synced 2026-03-21 17:34:57 -05:00
If the connection was aborted by the local machine, the active subsciptions were not cleared causing the new connection not attempt to register any new subsciptions. An event sub connection without any active subsciptions will be dropped by Twitch after a certain amount of time. Additionally the reconnection logic was triggering to frequently causing unecessary load.
630 lines
16 KiB
C++
630 lines
16 KiB
C++
#include "event-sub.hpp"
|
|
#include "token.hpp"
|
|
#include "twitch-helpers.hpp"
|
|
|
|
#include <log-helper.hpp>
|
|
|
|
#ifdef VERIFY_TIMESTAMPS
|
|
#include "date/tz.h"
|
|
#endif
|
|
|
|
namespace advss {
|
|
|
|
using websocketpp::lib::placeholders::_1;
|
|
using websocketpp::lib::placeholders::_2;
|
|
using websocketpp::lib::bind;
|
|
|
|
#ifdef USE_TWITCH_CLI_MOCK
|
|
static constexpr std::string_view defaultURL = "ws://127.0.0.1:8080/ws";
|
|
static constexpr std::string_view registerSubscriptionURL =
|
|
"http://127.0.0.1:8080";
|
|
static constexpr std::string_view registerSubscriptionPath =
|
|
"/eventsub/subscriptions";
|
|
#else
|
|
static constexpr std::string_view defaultURL =
|
|
"wss://eventsub.wss.twitch.tv/ws";
|
|
static constexpr std::string_view registerSubscriptionURL =
|
|
"https://api.twitch.tv";
|
|
static constexpr std::string_view registerSubscriptionPath =
|
|
"/helix/eventsub/subscriptions";
|
|
#endif
|
|
static const int reconnectDelay = 15;
|
|
|
|
#undef DispatchMessage
|
|
|
|
EventSub::EventSub()
|
|
: QObject(nullptr),
|
|
_client(std::make_unique<EventSubWSClient>())
|
|
{
|
|
SetupClient(*_client);
|
|
|
|
_client->set_open_handler(bind(&EventSub::OnOpen, this, _1));
|
|
_client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2));
|
|
_client->set_close_handler(bind(&EventSub::OnClose, this, _1));
|
|
_client->set_fail_handler(bind(&EventSub::OnFail, this, _1));
|
|
|
|
_url = defaultURL.data();
|
|
RegisterInstance();
|
|
}
|
|
|
|
EventSub::~EventSub()
|
|
{
|
|
Disconnect();
|
|
UnregisterInstance();
|
|
}
|
|
|
|
std::mutex EventSub::_instancesMtx;
|
|
std::vector<EventSub *> EventSub::_instances;
|
|
|
|
void EventSub::RegisterInstance()
|
|
{
|
|
std::lock_guard<std::mutex> lock(_instancesMtx);
|
|
_instances.emplace_back(this);
|
|
}
|
|
|
|
void EventSub::UnregisterInstance()
|
|
{
|
|
std::lock_guard<std::mutex> lock(_instancesMtx);
|
|
auto it = std::remove(_instances.begin(), _instances.end(), this);
|
|
_instances.erase(it, _instances.end());
|
|
}
|
|
|
|
void EventSub::ConnectThread()
|
|
{
|
|
_client->reset();
|
|
websocketpp::lib::error_code ec;
|
|
EventSubWSClient::connection_ptr con =
|
|
_client->get_connection(_url, ec);
|
|
if (ec) {
|
|
blog(LOG_INFO, "Twitch EventSub failed: %s",
|
|
ec.message().c_str());
|
|
} else {
|
|
_client->connect(con);
|
|
_connection = connection_hdl(con);
|
|
_client->run();
|
|
}
|
|
|
|
_connected = false;
|
|
}
|
|
|
|
void EventSub::WaitAndReconnect()
|
|
{
|
|
if (_reconnecting) {
|
|
return;
|
|
}
|
|
|
|
_reconnecting = true;
|
|
|
|
auto thread = std::thread([this]() {
|
|
std::unique_lock<std::mutex> lock(_waitMtx);
|
|
blog(LOG_INFO,
|
|
"Twitch EventSub trying to reconnect to in %d seconds.",
|
|
reconnectDelay);
|
|
_cv.wait_for(lock, std::chrono::seconds(reconnectDelay));
|
|
_reconnecting = false;
|
|
|
|
if (_disconnect) {
|
|
return;
|
|
}
|
|
|
|
Connect();
|
|
});
|
|
thread.detach();
|
|
}
|
|
|
|
void EventSub::Connect()
|
|
{
|
|
if (_reconnecting) {
|
|
return;
|
|
}
|
|
|
|
std::lock_guard<std::mutex> lock(_connectMtx);
|
|
if (_connected) {
|
|
vblog(LOG_INFO, "Twitch EventSub connect already in progress");
|
|
return;
|
|
}
|
|
_disconnect = true;
|
|
if (_thread.joinable()) {
|
|
_thread.join();
|
|
}
|
|
_disconnect = false;
|
|
_connected = true;
|
|
_thread = std::thread(&EventSub::ConnectThread, this);
|
|
}
|
|
|
|
void EventSub::ClearActiveSubscriptions()
|
|
{
|
|
std::lock_guard<std::mutex> lock(_subscriptionMtx);
|
|
_activeSubscriptions.clear();
|
|
}
|
|
|
|
void EventSub::EnableTimestampValidation(bool enable)
|
|
{
|
|
_validateTimestamps = enable;
|
|
}
|
|
|
|
void EventSub::Disconnect()
|
|
{
|
|
std::lock_guard<std::mutex> lock(_connectMtx);
|
|
_disconnect = true;
|
|
websocketpp::lib::error_code ec;
|
|
_client->close(_connection, websocketpp::close::status::normal,
|
|
"Twitch EventSub stopping", ec);
|
|
{
|
|
std::unique_lock<std::mutex> waitLock(_waitMtx);
|
|
_cv.notify_all();
|
|
}
|
|
|
|
while (_connected) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
_client->close(_connection, websocketpp::close::status::normal,
|
|
"Twitch EventSub stopping", ec);
|
|
}
|
|
|
|
if (_thread.joinable()) {
|
|
_thread.join();
|
|
}
|
|
_connected = false;
|
|
ClearActiveSubscriptions();
|
|
}
|
|
|
|
EventSubMessageBuffer EventSub::RegisterForEvents()
|
|
{
|
|
return _dispatcher.RegisterClient();
|
|
}
|
|
|
|
bool EventSub::SubscriptionIsActive(const std::string &id)
|
|
{
|
|
std::lock_guard<std::mutex> lock(_subscriptionMtx);
|
|
for (const auto &subscription : _activeSubscriptions) {
|
|
if (subscription.id == id) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
static bool isAlreadySubscribed(const std::set<Subscription> &subscriptions,
|
|
const Subscription &newSubsciption)
|
|
{
|
|
return subscriptions.find(newSubsciption) != subscriptions.end();
|
|
}
|
|
|
|
static void setTransportData(const OBSData &data, const std::string &sessionID)
|
|
{
|
|
OBSDataAutoRelease transport = obs_data_create();
|
|
obs_data_set_string(transport, "method", "websocket");
|
|
obs_data_set_string(transport, "session_id", sessionID.c_str());
|
|
obs_data_set_obj(data, "transport", transport);
|
|
}
|
|
|
|
static obs_data_t *copyData(const OBSData &data)
|
|
{
|
|
auto json = obs_data_get_json(data);
|
|
if (!json) {
|
|
return nullptr;
|
|
}
|
|
return obs_data_create_from_json(json);
|
|
}
|
|
|
|
std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
|
|
Subscription subscription)
|
|
{
|
|
auto eventSub = token->GetEventSub();
|
|
if (!eventSub) {
|
|
blog(LOG_WARNING, "failed to get Twitch EventSub from token!");
|
|
return "";
|
|
}
|
|
|
|
std::unique_lock<std::mutex> lock(eventSub->_subscriptionMtx);
|
|
if (!eventSub->_connected) {
|
|
vblog(LOG_INFO, "Twitch EventSub connect started for %s",
|
|
token->GetName().c_str());
|
|
lock.unlock();
|
|
eventSub->Connect();
|
|
return "";
|
|
}
|
|
|
|
if (isAlreadySubscribed(eventSub->_activeSubscriptions, subscription)) {
|
|
return eventSub->_activeSubscriptions.find(subscription)->id;
|
|
}
|
|
|
|
OBSDataAutoRelease postData = copyData(subscription.data);
|
|
setTransportData(postData.Get(), eventSub->_sessionID);
|
|
auto result = SendPostRequest(*token, registerSubscriptionURL.data(),
|
|
registerSubscriptionPath.data(), {},
|
|
postData.Get());
|
|
|
|
if (result.status != 202) {
|
|
vblog(LOG_INFO, "failed to register Twitch EventSub (%d)",
|
|
result.status);
|
|
return "";
|
|
}
|
|
|
|
OBSDataArrayAutoRelease replyArray =
|
|
obs_data_get_array(result.data, "data");
|
|
OBSDataAutoRelease replyData = obs_data_array_item(replyArray, 0);
|
|
subscription.id = obs_data_get_string(replyData, "id");
|
|
eventSub->_activeSubscriptions.emplace(subscription);
|
|
return subscription.id;
|
|
}
|
|
|
|
void EventSub::SetupClient(EventSubWSClient &client)
|
|
{
|
|
client.get_alog().clear_channels(
|
|
websocketpp::log::alevel::frame_header |
|
|
websocketpp::log::alevel::frame_payload |
|
|
websocketpp::log::alevel::control);
|
|
client.init_asio();
|
|
#ifndef _WIN32
|
|
client.set_reuse_addr(true);
|
|
#endif
|
|
#ifndef USE_TWITCH_CLI_MOCK
|
|
client.set_tls_init_handler([](websocketpp::connection_hdl) {
|
|
return websocketpp::lib::make_shared<asio::ssl::context>(
|
|
asio::ssl::context::sslv23_client);
|
|
});
|
|
#endif
|
|
}
|
|
|
|
void EventSub::OnOpen(connection_hdl)
|
|
{
|
|
vblog(LOG_INFO, "Twitch EventSub connection opened");
|
|
_connected = true;
|
|
}
|
|
|
|
static bool isValidTimestamp(const std::string ×tamp)
|
|
{
|
|
#ifdef VERIFY_TIMESTAMPS
|
|
// Example input: 2023-07-19T14:56:51.634234626Z
|
|
try {
|
|
// Discard the nanosecond part
|
|
static constexpr size_t dotPos = 19;
|
|
std::string trimmed = timestamp.substr(0, dotPos);
|
|
auto tzStart = timestamp.find_first_of("Z+-", dotPos);
|
|
trimmed = timestamp.substr(0, dotPos);
|
|
if (tzStart != std::string::npos) {
|
|
trimmed += timestamp.substr(tzStart);
|
|
}
|
|
|
|
std::istringstream in(trimmed);
|
|
date::sys_time<std::chrono::seconds> parsedTime;
|
|
in >> date::parse("%FT%TZ", parsedTime);
|
|
if (in.fail()) {
|
|
blog(LOG_WARNING, "failed to parse timestamp %s",
|
|
timestamp.c_str());
|
|
return false;
|
|
}
|
|
|
|
auto now = date::zoned_time{date::current_zone(),
|
|
std::chrono::system_clock::now()}
|
|
.get_sys_time();
|
|
|
|
auto duration = now - parsedTime;
|
|
// Clocks might be off by a bit, so allow negative values also
|
|
return duration <= std::chrono::minutes(10) &&
|
|
duration >= std::chrono::minutes(-1);
|
|
} catch (const std::exception &e) {
|
|
blog(LOG_WARNING, "%s: %s", __func__, e.what());
|
|
return false;
|
|
}
|
|
#else
|
|
// Just assume timestamps are always valid
|
|
return true;
|
|
#endif
|
|
}
|
|
|
|
bool EventSub::IsValidMessageID(const std::string &id)
|
|
{
|
|
auto it = std::find(_messageIDs.begin(), _messageIDs.end(), id);
|
|
if (it != _messageIDs.end()) {
|
|
return false;
|
|
}
|
|
if (!_messageIDs.empty()) {
|
|
_messageIDs.pop_front();
|
|
}
|
|
_messageIDs.push_back(id);
|
|
return true;
|
|
}
|
|
|
|
bool EventSub::IsValidID(const std::string &id)
|
|
{
|
|
return !_sessionID.empty() && id == _sessionID;
|
|
}
|
|
|
|
std::optional<const EventSub::ParsedMessage>
|
|
EventSub::ParseWebSocketMessage(const EventSubWSClient::message_ptr &message)
|
|
{
|
|
if (!message) {
|
|
return {};
|
|
}
|
|
if (message->get_opcode() != websocketpp::frame::opcode::text) {
|
|
return {};
|
|
}
|
|
|
|
std::string payload = message->get_payload();
|
|
OBSDataAutoRelease json = obs_data_create_from_json(payload.c_str());
|
|
if (!json) {
|
|
blog(LOG_ERROR, "invalid JSON payload received for '%s'",
|
|
payload.c_str());
|
|
return {};
|
|
}
|
|
|
|
OBSDataAutoRelease metadata = obs_data_get_obj(json, "metadata");
|
|
std::string timestamp =
|
|
obs_data_get_string(metadata, "message_timestamp");
|
|
if (_validateTimestamps && !isValidTimestamp(timestamp)) {
|
|
blog(LOG_WARNING,
|
|
"discarding Twitch EventSub with invalid timestamp %s",
|
|
timestamp.c_str());
|
|
return {};
|
|
}
|
|
std::string id = obs_data_get_string(metadata, "message_id");
|
|
if (!IsValidMessageID(id)) {
|
|
blog(LOG_WARNING,
|
|
"discarding Twitch EventSub with invalid message_id");
|
|
return {};
|
|
}
|
|
|
|
ParsedMessage parsedMessage{obs_data_get_string(metadata,
|
|
"message_type"),
|
|
obs_data_get_obj(json, "payload")};
|
|
return parsedMessage;
|
|
}
|
|
|
|
void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message)
|
|
{
|
|
const auto msg = ParseWebSocketMessage(message);
|
|
if (!msg) {
|
|
return;
|
|
}
|
|
|
|
const auto &type = msg->type;
|
|
const auto &data = msg->payload;
|
|
if (type == "session_welcome") {
|
|
HandleWelcome(data);
|
|
} else if (type == "session_keepalive") {
|
|
HandleKeepAlive();
|
|
} else if (type == "notification") {
|
|
HandleNotification(data);
|
|
} else if (type == "session_reconnect") {
|
|
HandleServerMigration(data);
|
|
} else if (type == "revocation") {
|
|
HandleRevocation(data);
|
|
} else {
|
|
vblog(LOG_INFO,
|
|
"Twitch EventSub ignoring message of unknown type '%s'",
|
|
type.c_str());
|
|
}
|
|
}
|
|
|
|
void EventSub::HandleWelcome(obs_data_t *data)
|
|
{
|
|
OBSDataAutoRelease session = obs_data_get_obj(data, "session");
|
|
_sessionID = obs_data_get_string(session, "id");
|
|
blog(LOG_INFO, "Twitch EventSub connected");
|
|
}
|
|
|
|
void EventSub::HandleKeepAlive() const
|
|
{
|
|
// Nothing to do
|
|
}
|
|
|
|
void EventSub::HandleNotification(obs_data_t *data)
|
|
{
|
|
Event event;
|
|
OBSDataAutoRelease subscription =
|
|
obs_data_get_obj(data, "subscription");
|
|
event.id = obs_data_get_string(subscription, "id");
|
|
event.type = obs_data_get_string(subscription, "type");
|
|
OBSDataAutoRelease eventData = obs_data_get_obj(data, "event");
|
|
event.data = eventData;
|
|
_dispatcher.DispatchMessage(event);
|
|
}
|
|
|
|
void EventSub::OnServerMigrationWelcome(
|
|
connection_hdl newHdl, std::unique_ptr<EventSubWSClient> &newClient)
|
|
{
|
|
std::lock_guard<std::mutex> lock(_connectMtx);
|
|
|
|
// Disable reconnect handling for old connection which will be closed
|
|
_client->set_close_handler([](connection_hdl) {});
|
|
_client->set_fail_handler([](connection_hdl) {});
|
|
auto connection = _client->get_con_from_hdl(_connection);
|
|
connection->set_close_handler([](connection_hdl) {
|
|
vblog(LOG_INFO, "previous Twitch EventSub connection closed");
|
|
});
|
|
connection->set_fail_handler([](connection_hdl) {});
|
|
|
|
websocketpp::lib::error_code ec;
|
|
_client->close(_connection, websocketpp::close::status::normal,
|
|
"Switching to new connection", ec);
|
|
|
|
_client.swap(newClient);
|
|
_sessionID = _migrationSessionID;
|
|
_connection = newHdl;
|
|
|
|
_client->set_open_handler(bind(&EventSub::OnOpen, this, _1));
|
|
_client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2));
|
|
_client->set_close_handler(bind(&EventSub::OnClose, this, _1));
|
|
_client->set_fail_handler(bind(&EventSub::OnFail, this, _1));
|
|
auto newConnection = _client->get_con_from_hdl(_connection);
|
|
newConnection->set_open_handler(bind(&EventSub::OnOpen, this, _1));
|
|
newConnection->set_message_handler(
|
|
bind(&EventSub::OnMessage, this, _1, _2));
|
|
newConnection->set_close_handler(bind(&EventSub::OnClose, this, _1));
|
|
newConnection->set_fail_handler(bind(&EventSub::OnFail, this, _1));
|
|
|
|
_connected = true;
|
|
_migrating = false;
|
|
}
|
|
|
|
void EventSub::StartServerMigrationClient(const std::string &url)
|
|
{
|
|
auto client = std::make_unique<EventSubWSClient>();
|
|
SetupClient(*client);
|
|
|
|
client->set_open_handler([this](connection_hdl) {
|
|
vblog(LOG_INFO, "Twitch EventSub migration client opened");
|
|
});
|
|
|
|
client->set_message_handler([this,
|
|
&client](connection_hdl hdl,
|
|
EventSubWSClient::message_ptr
|
|
message) {
|
|
const auto msg = ParseWebSocketMessage(message);
|
|
if (!msg) {
|
|
return;
|
|
}
|
|
|
|
const auto &type = msg->type;
|
|
const auto &data = msg->payload;
|
|
|
|
if (type == "session_welcome") {
|
|
blog(LOG_INFO,
|
|
"Twitch EventSub migration successful - switching to new connection");
|
|
OBSDataAutoRelease session =
|
|
obs_data_get_obj(data, "session");
|
|
_migrationSessionID =
|
|
obs_data_get_string(session, "id");
|
|
OnServerMigrationWelcome(hdl, client);
|
|
} else {
|
|
OnMessage(hdl, message);
|
|
}
|
|
});
|
|
|
|
websocketpp::lib::error_code ec;
|
|
auto con = client->get_connection(url, ec);
|
|
if (ec) {
|
|
blog(LOG_ERROR,
|
|
"Twitch EventSub migration connection failed: %s",
|
|
ec.message().c_str());
|
|
_migrating = false;
|
|
return;
|
|
}
|
|
|
|
_migrationConnection = con;
|
|
client->connect(con);
|
|
client->run();
|
|
}
|
|
|
|
void EventSub::HandleServerMigration(obs_data_t *data)
|
|
{
|
|
blog(LOG_INFO, "Twitch EventSub session_reconnect received");
|
|
|
|
OBSDataAutoRelease session = obs_data_get_obj(data, "session");
|
|
auto id = obs_data_get_string(session, "id");
|
|
if (!IsValidID(id)) {
|
|
vblog(LOG_INFO,
|
|
"ignoring Twitch EventSub reconnect message with invalid id");
|
|
return;
|
|
}
|
|
|
|
const std::string newURL =
|
|
obs_data_get_string(session, "reconnect_url");
|
|
if (newURL.empty()) {
|
|
blog(LOG_WARNING, "missing reconnect_url in session_reconnect");
|
|
return;
|
|
}
|
|
|
|
if (_migrating.exchange(true)) {
|
|
vblog(LOG_INFO,
|
|
"ignoring Twitch EventSub session_reconnect - already in progress");
|
|
return;
|
|
}
|
|
|
|
vblog(LOG_INFO, "Twitch EventSub reconnect: connecting to %s",
|
|
newURL.c_str());
|
|
|
|
std::thread([this, newURL]() {
|
|
StartServerMigrationClient(newURL);
|
|
}).detach();
|
|
}
|
|
|
|
void EventSub::HandleRevocation(obs_data_t *data)
|
|
{
|
|
OBSDataAutoRelease subscription =
|
|
obs_data_get_obj(data, "subscription");
|
|
auto id = obs_data_get_string(subscription, "id");
|
|
auto status = obs_data_get_string(subscription, "status");
|
|
auto type = obs_data_get_string(subscription, "type");
|
|
auto version = obs_data_get_string(subscription, "version");
|
|
OBSDataAutoRelease condition =
|
|
obs_data_get_obj(subscription, "condition");
|
|
auto conditionJson = obs_data_get_json(condition);
|
|
blog(LOG_INFO,
|
|
"Twitch EventSub revoked:\n"
|
|
"id: %s\n"
|
|
"status: %s\n"
|
|
"type: %s\n"
|
|
"version: %s\n"
|
|
"condition: %s\n",
|
|
id, status, type, version, conditionJson ? conditionJson : "");
|
|
|
|
std::lock_guard<std::mutex> lock(_subscriptionMtx);
|
|
for (auto it = _activeSubscriptions.begin();
|
|
it != _activeSubscriptions.begin();) {
|
|
if (it->id == id) {
|
|
it = _activeSubscriptions.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
}
|
|
|
|
void EventSub::OnClose(connection_hdl hdl)
|
|
{
|
|
EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl);
|
|
const auto msg = con->get_ec().message();
|
|
const auto reason = con->get_remote_close_reason();
|
|
const auto code = con->get_remote_close_code();
|
|
blog(LOG_INFO, "Twitch EventSub connection closed: %s / %s (%d)",
|
|
msg.c_str(), reason.c_str(), code);
|
|
|
|
if (!_migrating) {
|
|
ClearActiveSubscriptions();
|
|
}
|
|
_connected = false;
|
|
|
|
if (_disconnect) {
|
|
return;
|
|
}
|
|
|
|
WaitAndReconnect();
|
|
}
|
|
|
|
void EventSub::OnFail(connection_hdl hdl)
|
|
{
|
|
EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl);
|
|
auto msg = con->get_ec().message();
|
|
blog(LOG_INFO, "Twitch EventSub connection failed: %s", msg.c_str());
|
|
|
|
if (!_migrating) {
|
|
ClearActiveSubscriptions();
|
|
}
|
|
_connected = false;
|
|
|
|
if (_disconnect) {
|
|
return;
|
|
}
|
|
|
|
WaitAndReconnect();
|
|
}
|
|
|
|
bool Subscription::operator<(const Subscription &other) const
|
|
{
|
|
auto json = obs_data_get_json(data);
|
|
std::string jsonString = json ? json : "";
|
|
auto otherJson = obs_data_get_json(other.data);
|
|
std::string otherJsonString = otherJson ? otherJson : "";
|
|
return jsonString < otherJsonString;
|
|
}
|
|
|
|
std::string Event::ToString() const
|
|
{
|
|
auto json = obs_data_get_json(data);
|
|
return json ? json : "";
|
|
}
|
|
|
|
} // namespace advss
|