From 6932de866ddac0c5d92dc2ae117f5d8d09ad2054 Mon Sep 17 00:00:00 2001 From: WarmUpTill <19472752+WarmUpTill@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:47:32 +0200 Subject: [PATCH] Refactor Twitch event server migration and reconnect handling This should avoid any events being lost due to server migration. --- plugins/twitch/event-sub.cpp | 285 +++++++++++++++++++++++++++-------- plugins/twitch/event-sub.hpp | 33 +++- 2 files changed, 246 insertions(+), 72 deletions(-) diff --git a/plugins/twitch/event-sub.cpp b/plugins/twitch/event-sub.cpp index 5233abe4..a5d41c13 100644 --- a/plugins/twitch/event-sub.cpp +++ b/plugins/twitch/event-sub.cpp @@ -32,28 +32,17 @@ static const int reconnectDelay = 15; #undef DispatchMessage -EventSub::EventSub() : QObject(nullptr) +EventSub::EventSub() + : QObject(nullptr), + _client(std::make_unique()) { - _client.get_alog().clear_channels( - websocketpp::log::alevel::frame_header | - websocketpp::log::alevel::frame_payload | - websocketpp::log::alevel::control); - _client.init_asio(); -#ifndef _WIN32 - _client.set_reuse_addr(true); -#endif + SetupClient(*_client); - _client.set_open_handler(bind(&EventSub::OnOpen, this, _1)); - _client.set_message_handler(bind(&EventSub::OnMessage, this, _1, _2)); - _client.set_close_handler(bind(&EventSub::OnClose, this, _1)); - _client.set_fail_handler(bind(&EventSub::OnFail, this, _1)); + _client->set_open_handler(bind(&EventSub::OnOpen, this, _1)); + _client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2)); + _client->set_close_handler(bind(&EventSub::OnClose, this, _1)); + _client->set_fail_handler(bind(&EventSub::OnFail, this, _1)); -#ifndef USE_TWITCH_CLI_MOCK - _client.set_tls_init_handler([](websocketpp::connection_hdl) { - return websocketpp::lib::make_shared( - asio::ssl::context::sslv23_client); - }); -#endif _url = defaultURL.data(); RegisterInstance(); } @@ -82,28 +71,34 @@ void EventSub::UnregisterInstance() void EventSub::ConnectThread() { - while (!_disconnect) { - std::unique_lock lock(_waitMtx); - _client.reset(); - _connected = true; - websocketpp::lib::error_code ec; - EventSubWSClient::connection_ptr con = - _client.get_connection(_url, ec); - if (ec) { - blog(LOG_INFO, "Twitch EventSub failed: %s", - ec.message().c_str()); - } else { - _client.connect(con); - _connection = connection_hdl(con); - _client.run(); - } + _client->reset(); + _connected = true; + websocketpp::lib::error_code ec; + EventSubWSClient::connection_ptr con = + _client->get_connection(_url, ec); + if (ec) { + blog(LOG_INFO, "Twitch EventSub failed: %s", + ec.message().c_str()); + } else { + _client->connect(con); + _connection = connection_hdl(con); + _client->run(); + } + _connected = false; +} + +void EventSub::WaitAndReconnect() +{ + auto thread = std::thread([this]() { + std::unique_lock lock(_waitMtx); blog(LOG_INFO, "Twitch EventSub trying to reconnect to in %d seconds.", reconnectDelay); _cv.wait_for(lock, std::chrono::seconds(reconnectDelay)); - } - _connected = false; + Connect(); + }); + thread.detach(); } void EventSub::Connect() @@ -137,8 +132,8 @@ void EventSub::Disconnect() std::lock_guard lock(_connectMtx); _disconnect = true; websocketpp::lib::error_code ec; - _client.close(_connection, websocketpp::close::status::normal, - "Twitch EventSub stopping", ec); + _client->close(_connection, websocketpp::close::status::normal, + "Twitch EventSub stopping", ec); { std::unique_lock waitLock(_waitMtx); _cv.notify_all(); @@ -146,8 +141,8 @@ void EventSub::Disconnect() while (_connected) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - _client.close(_connection, websocketpp::close::status::normal, - "Twitch EventSub stopping", ec); + _client->close(_connection, websocketpp::close::status::normal, + "Twitch EventSub stopping", ec); } if (_thread.joinable()) { @@ -238,6 +233,24 @@ std::string EventSub::AddEventSubscription(std::shared_ptr token, return subscription.id; } +void EventSub::SetupClient(EventSubWSClient &client) +{ + client.get_alog().clear_channels( + websocketpp::log::alevel::frame_header | + websocketpp::log::alevel::frame_payload | + websocketpp::log::alevel::control); + client.init_asio(); +#ifndef _WIN32 + client.set_reuse_addr(true); +#endif +#ifndef USE_TWITCH_CLI_MOCK + client.set_tls_init_handler([](websocketpp::connection_hdl) { + return websocketpp::lib::make_shared( + asio::ssl::context::sslv23_client); + }); +#endif +} + void EventSub::OnOpen(connection_hdl) { vblog(LOG_INFO, "Twitch EventSub connection opened"); @@ -303,13 +316,14 @@ bool EventSub::IsValidID(const std::string &id) return !_sessionID.empty() && id == _sessionID; } -void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message) +std::optional +EventSub::ParseWebSocketMessage(const EventSubWSClient::message_ptr &message) { if (!message) { - return; + return {}; } if (message->get_opcode() != websocketpp::frame::opcode::text) { - return; + return {}; } std::string payload = message->get_payload(); @@ -317,7 +331,7 @@ void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message) if (!json) { blog(LOG_ERROR, "invalid JSON payload received for '%s'", payload.c_str()); - return; + return {}; } OBSDataAutoRelease metadata = obs_data_get_obj(json, "metadata"); @@ -325,31 +339,46 @@ void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message) obs_data_get_string(metadata, "message_timestamp"); if (_validateTimestamps && !isValidTimestamp(timestamp)) { blog(LOG_WARNING, - "Discarding Twitch EventSub with invalid timestamp %s", + "discarding Twitch EventSub with invalid timestamp %s", timestamp.c_str()); - return; + return {}; } std::string id = obs_data_get_string(metadata, "message_id"); if (!IsValidMessageID(id)) { blog(LOG_WARNING, - "Discarding Twitch EventSub with invalid message_id"); + "discarding Twitch EventSub with invalid message_id"); + return {}; + } + + ParsedMessage parsedMessage{obs_data_get_string(metadata, + "message_type"), + obs_data_get_obj(json, "payload")}; + return parsedMessage; +} + +void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message) +{ + const auto msg = ParseWebSocketMessage(message); + if (!msg) { return; } - std::string messageType = obs_data_get_string(metadata, "message_type"); - OBSDataAutoRelease payloadJson = obs_data_get_obj(json, "payload"); - if (messageType == "session_welcome") { - HandleWelcome(payloadJson); - } else if (messageType == "session_keepalive") { + + const auto &type = msg->type; + const auto &data = msg->payload; + if (type == "session_welcome") { + HandleWelcome(data); + } else if (type == "session_keepalive") { HandleKeepAlive(); - } else if (messageType == "notification") { - HandleNotification(payloadJson); - } else if (messageType == "session_reconnect") { - HandleReconnect(payloadJson); - } else if (messageType == "revocation") { - HandleRevocation(payloadJson); + } else if (type == "notification") { + HandleNotification(data); + } else if (type == "session_reconnect") { + HandleServerMigration(data); + } else if (type == "revocation") { + HandleRevocation(data); } else { - vblog(LOG_INFO, "ignoring message of unknown type '%s'", - messageType.c_str()); + vblog(LOG_INFO, + "Twitch EventSub ignoring message of unknown type '%s'", + type.c_str()); } } @@ -377,9 +406,96 @@ void EventSub::HandleNotification(obs_data_t *data) _dispatcher.DispatchMessage(event); } -void EventSub::HandleReconnect(obs_data_t *data) +void EventSub::OnServerMigrationWelcome( + connection_hdl newHdl, std::unique_ptr &newClient) +{ + std::lock_guard lock(_connectMtx); + + // Disable reconnect handling for old connection which will be closed + _client->set_close_handler([](connection_hdl) {}); + _client->set_fail_handler([](connection_hdl) {}); + auto connection = _client->get_con_from_hdl(_connection); + connection->set_close_handler([](connection_hdl) { + vblog(LOG_INFO, "previous Twitch EventSub connection closed"); + }); + connection->set_fail_handler([](connection_hdl) {}); + + websocketpp::lib::error_code ec; + _client->close(_connection, websocketpp::close::status::normal, + "Switching to new connection", ec); + + _client.swap(newClient); + _sessionID = _migrationSessionID; + _connection = newHdl; + + _client->set_open_handler(bind(&EventSub::OnOpen, this, _1)); + _client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2)); + _client->set_close_handler(bind(&EventSub::OnClose, this, _1)); + _client->set_fail_handler(bind(&EventSub::OnFail, this, _1)); + auto newConnection = _client->get_con_from_hdl(_connection); + newConnection->set_open_handler(bind(&EventSub::OnOpen, this, _1)); + newConnection->set_message_handler( + bind(&EventSub::OnMessage, this, _1, _2)); + newConnection->set_close_handler(bind(&EventSub::OnClose, this, _1)); + newConnection->set_fail_handler(bind(&EventSub::OnFail, this, _1)); + + _connected = true; + _migrating = false; +} + +void EventSub::StartServerMigrationClient(const std::string &url) +{ + auto client = std::make_unique(); + SetupClient(*client); + + client->set_open_handler([this](connection_hdl hdl) { + vblog(LOG_INFO, "Twitch EventSub migration client opened"); + }); + + client->set_message_handler([this, + &client](connection_hdl hdl, + EventSubWSClient::message_ptr + message) { + const auto msg = ParseWebSocketMessage(message); + if (!msg) { + return; + } + + const auto &type = msg->type; + const auto &data = msg->payload; + + if (type == "session_welcome") { + vblog(LOG_INFO, + "Twitch EventSub migration successful - switching to new connection"); + OBSDataAutoRelease session = + obs_data_get_obj(data, "session"); + _migrationSessionID = + obs_data_get_string(session, "id"); + OnServerMigrationWelcome(hdl, client); + } else { + OnMessage(hdl, message); + } + }); + + websocketpp::lib::error_code ec; + auto con = client->get_connection(url, ec); + if (ec) { + blog(LOG_ERROR, + "Twitch EventSub migration connection failed: %s", + ec.message().c_str()); + _migrating = false; + return; + } + + _migrationConnection = con; + client->connect(con); + client->run(); +} + +void EventSub::HandleServerMigration(obs_data_t *data) { blog(LOG_INFO, "Twitch EventSub session_reconnect received"); + OBSDataAutoRelease session = obs_data_get_obj(data, "session"); auto id = obs_data_get_string(session, "id"); if (!IsValidID(id)) { @@ -388,8 +504,25 @@ void EventSub::HandleReconnect(obs_data_t *data) return; } - // TODO: - // Implement proper reconnect handing to avoid dropped events + const std::string newURL = + obs_data_get_string(session, "reconnect_url"); + if (newURL.empty()) { + blog(LOG_WARNING, "missing reconnect_url in session_reconnect"); + return; + } + + if (_migrating.exchange(true)) { + vblog(LOG_INFO, + "ignoring Twitch EventSub session_reconnect - already in progress"); + return; + } + + vblog(LOG_INFO, "Twitch EventSub reconnect: connecting to %s", + newURL.c_str()); + + std::thread([this, newURL]() { + StartServerMigrationClient(newURL); + }).detach(); } void EventSub::HandleRevocation(obs_data_t *data) @@ -425,23 +558,41 @@ void EventSub::HandleRevocation(obs_data_t *data) void EventSub::OnClose(connection_hdl hdl) { - EventSubWSClient::connection_ptr con = _client.get_con_from_hdl(hdl); + EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl); const auto msg = con->get_ec().message(); const auto reason = con->get_remote_close_reason(); const auto code = con->get_remote_close_code(); blog(LOG_INFO, "Twitch EventSub connection closed: %s / %s (%d)", msg.c_str(), reason.c_str(), code); - ClearActiveSubscriptions(); + + if (_migrating) { + ClearActiveSubscriptions(); + } _connected = false; + + if (_disconnect) { + return; + } + + WaitAndReconnect(); } void EventSub::OnFail(connection_hdl hdl) { - EventSubWSClient::connection_ptr con = _client.get_con_from_hdl(hdl); + EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl); auto msg = con->get_ec().message(); blog(LOG_INFO, "Twitch EventSub connection failed: %s", msg.c_str()); - ClearActiveSubscriptions(); + + if (!_migrating) { + ClearActiveSubscriptions(); + } _connected = false; + + if (_disconnect) { + return; + } + + WaitAndReconnect(); } bool Subscription::operator<(const Subscription &other) const diff --git a/plugins/twitch/event-sub.hpp b/plugins/twitch/event-sub.hpp index 710a9663..33d802c8 100644 --- a/plugins/twitch/event-sub.hpp +++ b/plugins/twitch/event-sub.hpp @@ -19,10 +19,10 @@ namespace advss { #ifdef USE_TWITCH_CLI_MOCK -typedef websocketpp::client EventSubWSClient; +using EventSubWSClient = websocketpp::client; #else -typedef websocketpp::client - EventSubWSClient; +using EventSubWSClient = + websocketpp::client; #endif struct Event; @@ -61,12 +61,16 @@ public: void EnableTimestampValidation(bool enable); private: + static void SetupClient(EventSubWSClient &); + void OnOpen(connection_hdl hdl); void OnMessage(connection_hdl hdl, EventSubWSClient::message_ptr message); void OnClose(connection_hdl hdl); void OnFail(connection_hdl hdl); + void ConnectThread(); + void WaitAndReconnect(); bool IsValidMessageID(const std::string &); bool IsValidID(const std::string &); @@ -74,20 +78,39 @@ private: void HandleWelcome(obs_data_t *); void HandleKeepAlive() const; void HandleNotification(obs_data_t *); - void HandleReconnect(obs_data_t *); + void HandleServerMigration(obs_data_t *); void HandleRevocation(obs_data_t *); void RegisterInstance(); void UnregisterInstance(); - EventSubWSClient _client; + void StartServerMigrationClient(const std::string &url); + void OnServerMigrationWelcome(connection_hdl, + std::unique_ptr &); + + struct ParsedMessage { + std::string type; + OBSDataAutoRelease payload; + }; + + std::optional + ParseWebSocketMessage(const EventSubWSClient::message_ptr &); + + std::unique_ptr _client; connection_hdl _connection; + + std::unique_ptr _migrationClient; + connection_hdl _migrationConnection; + std::atomic_bool _migrating{false}; + std::string _migrationSessionID; + std::thread _thread; std::mutex _waitMtx; std::mutex _connectMtx; std::condition_variable _cv; std::atomic_bool _connected{false}; std::atomic_bool _disconnect{false}; + std::string _url; std::string _sessionID; bool _validateTimestamps = true;