Refactor Twitch event server migration and reconnect handling
Some checks failed
debian-build / build (push) Waiting to run
Push to master / Check Formatting 🔍 (push) Waiting to run
Push to master / Build Project 🧱 (push) Waiting to run
Push to master / Create Release 🛫 (push) Blocked by required conditions
Check locale / ubuntu64 (push) Has been cancelled

This should avoid any events being lost due to server migration.
This commit is contained in:
WarmUpTill 2025-10-21 21:47:32 +02:00 committed by WarmUpTill
parent 00db0cf7c4
commit 6932de866d
2 changed files with 246 additions and 72 deletions

View File

@ -32,28 +32,17 @@ static const int reconnectDelay = 15;
#undef DispatchMessage
EventSub::EventSub() : QObject(nullptr)
EventSub::EventSub()
: QObject(nullptr),
_client(std::make_unique<EventSubWSClient>())
{
_client.get_alog().clear_channels(
websocketpp::log::alevel::frame_header |
websocketpp::log::alevel::frame_payload |
websocketpp::log::alevel::control);
_client.init_asio();
#ifndef _WIN32
_client.set_reuse_addr(true);
#endif
SetupClient(*_client);
_client.set_open_handler(bind(&EventSub::OnOpen, this, _1));
_client.set_message_handler(bind(&EventSub::OnMessage, this, _1, _2));
_client.set_close_handler(bind(&EventSub::OnClose, this, _1));
_client.set_fail_handler(bind(&EventSub::OnFail, this, _1));
_client->set_open_handler(bind(&EventSub::OnOpen, this, _1));
_client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2));
_client->set_close_handler(bind(&EventSub::OnClose, this, _1));
_client->set_fail_handler(bind(&EventSub::OnFail, this, _1));
#ifndef USE_TWITCH_CLI_MOCK
_client.set_tls_init_handler([](websocketpp::connection_hdl) {
return websocketpp::lib::make_shared<asio::ssl::context>(
asio::ssl::context::sslv23_client);
});
#endif
_url = defaultURL.data();
RegisterInstance();
}
@ -82,28 +71,34 @@ void EventSub::UnregisterInstance()
void EventSub::ConnectThread()
{
while (!_disconnect) {
std::unique_lock<std::mutex> lock(_waitMtx);
_client.reset();
_connected = true;
websocketpp::lib::error_code ec;
EventSubWSClient::connection_ptr con =
_client.get_connection(_url, ec);
if (ec) {
blog(LOG_INFO, "Twitch EventSub failed: %s",
ec.message().c_str());
} else {
_client.connect(con);
_connection = connection_hdl(con);
_client.run();
}
_client->reset();
_connected = true;
websocketpp::lib::error_code ec;
EventSubWSClient::connection_ptr con =
_client->get_connection(_url, ec);
if (ec) {
blog(LOG_INFO, "Twitch EventSub failed: %s",
ec.message().c_str());
} else {
_client->connect(con);
_connection = connection_hdl(con);
_client->run();
}
_connected = false;
}
void EventSub::WaitAndReconnect()
{
auto thread = std::thread([this]() {
std::unique_lock<std::mutex> lock(_waitMtx);
blog(LOG_INFO,
"Twitch EventSub trying to reconnect to in %d seconds.",
reconnectDelay);
_cv.wait_for(lock, std::chrono::seconds(reconnectDelay));
}
_connected = false;
Connect();
});
thread.detach();
}
void EventSub::Connect()
@ -137,8 +132,8 @@ void EventSub::Disconnect()
std::lock_guard<std::mutex> lock(_connectMtx);
_disconnect = true;
websocketpp::lib::error_code ec;
_client.close(_connection, websocketpp::close::status::normal,
"Twitch EventSub stopping", ec);
_client->close(_connection, websocketpp::close::status::normal,
"Twitch EventSub stopping", ec);
{
std::unique_lock<std::mutex> waitLock(_waitMtx);
_cv.notify_all();
@ -146,8 +141,8 @@ void EventSub::Disconnect()
while (_connected) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
_client.close(_connection, websocketpp::close::status::normal,
"Twitch EventSub stopping", ec);
_client->close(_connection, websocketpp::close::status::normal,
"Twitch EventSub stopping", ec);
}
if (_thread.joinable()) {
@ -238,6 +233,24 @@ std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
return subscription.id;
}
void EventSub::SetupClient(EventSubWSClient &client)
{
client.get_alog().clear_channels(
websocketpp::log::alevel::frame_header |
websocketpp::log::alevel::frame_payload |
websocketpp::log::alevel::control);
client.init_asio();
#ifndef _WIN32
client.set_reuse_addr(true);
#endif
#ifndef USE_TWITCH_CLI_MOCK
client.set_tls_init_handler([](websocketpp::connection_hdl) {
return websocketpp::lib::make_shared<asio::ssl::context>(
asio::ssl::context::sslv23_client);
});
#endif
}
void EventSub::OnOpen(connection_hdl)
{
vblog(LOG_INFO, "Twitch EventSub connection opened");
@ -303,13 +316,14 @@ bool EventSub::IsValidID(const std::string &id)
return !_sessionID.empty() && id == _sessionID;
}
void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message)
std::optional<const EventSub::ParsedMessage>
EventSub::ParseWebSocketMessage(const EventSubWSClient::message_ptr &message)
{
if (!message) {
return;
return {};
}
if (message->get_opcode() != websocketpp::frame::opcode::text) {
return;
return {};
}
std::string payload = message->get_payload();
@ -317,7 +331,7 @@ void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message)
if (!json) {
blog(LOG_ERROR, "invalid JSON payload received for '%s'",
payload.c_str());
return;
return {};
}
OBSDataAutoRelease metadata = obs_data_get_obj(json, "metadata");
@ -325,31 +339,46 @@ void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message)
obs_data_get_string(metadata, "message_timestamp");
if (_validateTimestamps && !isValidTimestamp(timestamp)) {
blog(LOG_WARNING,
"Discarding Twitch EventSub with invalid timestamp %s",
"discarding Twitch EventSub with invalid timestamp %s",
timestamp.c_str());
return;
return {};
}
std::string id = obs_data_get_string(metadata, "message_id");
if (!IsValidMessageID(id)) {
blog(LOG_WARNING,
"Discarding Twitch EventSub with invalid message_id");
"discarding Twitch EventSub with invalid message_id");
return {};
}
ParsedMessage parsedMessage{obs_data_get_string(metadata,
"message_type"),
obs_data_get_obj(json, "payload")};
return parsedMessage;
}
void EventSub::OnMessage(connection_hdl, EventSubWSClient::message_ptr message)
{
const auto msg = ParseWebSocketMessage(message);
if (!msg) {
return;
}
std::string messageType = obs_data_get_string(metadata, "message_type");
OBSDataAutoRelease payloadJson = obs_data_get_obj(json, "payload");
if (messageType == "session_welcome") {
HandleWelcome(payloadJson);
} else if (messageType == "session_keepalive") {
const auto &type = msg->type;
const auto &data = msg->payload;
if (type == "session_welcome") {
HandleWelcome(data);
} else if (type == "session_keepalive") {
HandleKeepAlive();
} else if (messageType == "notification") {
HandleNotification(payloadJson);
} else if (messageType == "session_reconnect") {
HandleReconnect(payloadJson);
} else if (messageType == "revocation") {
HandleRevocation(payloadJson);
} else if (type == "notification") {
HandleNotification(data);
} else if (type == "session_reconnect") {
HandleServerMigration(data);
} else if (type == "revocation") {
HandleRevocation(data);
} else {
vblog(LOG_INFO, "ignoring message of unknown type '%s'",
messageType.c_str());
vblog(LOG_INFO,
"Twitch EventSub ignoring message of unknown type '%s'",
type.c_str());
}
}
@ -377,9 +406,96 @@ void EventSub::HandleNotification(obs_data_t *data)
_dispatcher.DispatchMessage(event);
}
void EventSub::HandleReconnect(obs_data_t *data)
void EventSub::OnServerMigrationWelcome(
connection_hdl newHdl, std::unique_ptr<EventSubWSClient> &newClient)
{
std::lock_guard<std::mutex> lock(_connectMtx);
// Disable reconnect handling for old connection which will be closed
_client->set_close_handler([](connection_hdl) {});
_client->set_fail_handler([](connection_hdl) {});
auto connection = _client->get_con_from_hdl(_connection);
connection->set_close_handler([](connection_hdl) {
vblog(LOG_INFO, "previous Twitch EventSub connection closed");
});
connection->set_fail_handler([](connection_hdl) {});
websocketpp::lib::error_code ec;
_client->close(_connection, websocketpp::close::status::normal,
"Switching to new connection", ec);
_client.swap(newClient);
_sessionID = _migrationSessionID;
_connection = newHdl;
_client->set_open_handler(bind(&EventSub::OnOpen, this, _1));
_client->set_message_handler(bind(&EventSub::OnMessage, this, _1, _2));
_client->set_close_handler(bind(&EventSub::OnClose, this, _1));
_client->set_fail_handler(bind(&EventSub::OnFail, this, _1));
auto newConnection = _client->get_con_from_hdl(_connection);
newConnection->set_open_handler(bind(&EventSub::OnOpen, this, _1));
newConnection->set_message_handler(
bind(&EventSub::OnMessage, this, _1, _2));
newConnection->set_close_handler(bind(&EventSub::OnClose, this, _1));
newConnection->set_fail_handler(bind(&EventSub::OnFail, this, _1));
_connected = true;
_migrating = false;
}
void EventSub::StartServerMigrationClient(const std::string &url)
{
auto client = std::make_unique<EventSubWSClient>();
SetupClient(*client);
client->set_open_handler([this](connection_hdl hdl) {
vblog(LOG_INFO, "Twitch EventSub migration client opened");
});
client->set_message_handler([this,
&client](connection_hdl hdl,
EventSubWSClient::message_ptr
message) {
const auto msg = ParseWebSocketMessage(message);
if (!msg) {
return;
}
const auto &type = msg->type;
const auto &data = msg->payload;
if (type == "session_welcome") {
vblog(LOG_INFO,
"Twitch EventSub migration successful - switching to new connection");
OBSDataAutoRelease session =
obs_data_get_obj(data, "session");
_migrationSessionID =
obs_data_get_string(session, "id");
OnServerMigrationWelcome(hdl, client);
} else {
OnMessage(hdl, message);
}
});
websocketpp::lib::error_code ec;
auto con = client->get_connection(url, ec);
if (ec) {
blog(LOG_ERROR,
"Twitch EventSub migration connection failed: %s",
ec.message().c_str());
_migrating = false;
return;
}
_migrationConnection = con;
client->connect(con);
client->run();
}
void EventSub::HandleServerMigration(obs_data_t *data)
{
blog(LOG_INFO, "Twitch EventSub session_reconnect received");
OBSDataAutoRelease session = obs_data_get_obj(data, "session");
auto id = obs_data_get_string(session, "id");
if (!IsValidID(id)) {
@ -388,8 +504,25 @@ void EventSub::HandleReconnect(obs_data_t *data)
return;
}
// TODO:
// Implement proper reconnect handing to avoid dropped events
const std::string newURL =
obs_data_get_string(session, "reconnect_url");
if (newURL.empty()) {
blog(LOG_WARNING, "missing reconnect_url in session_reconnect");
return;
}
if (_migrating.exchange(true)) {
vblog(LOG_INFO,
"ignoring Twitch EventSub session_reconnect - already in progress");
return;
}
vblog(LOG_INFO, "Twitch EventSub reconnect: connecting to %s",
newURL.c_str());
std::thread([this, newURL]() {
StartServerMigrationClient(newURL);
}).detach();
}
void EventSub::HandleRevocation(obs_data_t *data)
@ -425,23 +558,41 @@ void EventSub::HandleRevocation(obs_data_t *data)
void EventSub::OnClose(connection_hdl hdl)
{
EventSubWSClient::connection_ptr con = _client.get_con_from_hdl(hdl);
EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl);
const auto msg = con->get_ec().message();
const auto reason = con->get_remote_close_reason();
const auto code = con->get_remote_close_code();
blog(LOG_INFO, "Twitch EventSub connection closed: %s / %s (%d)",
msg.c_str(), reason.c_str(), code);
ClearActiveSubscriptions();
if (_migrating) {
ClearActiveSubscriptions();
}
_connected = false;
if (_disconnect) {
return;
}
WaitAndReconnect();
}
void EventSub::OnFail(connection_hdl hdl)
{
EventSubWSClient::connection_ptr con = _client.get_con_from_hdl(hdl);
EventSubWSClient::connection_ptr con = _client->get_con_from_hdl(hdl);
auto msg = con->get_ec().message();
blog(LOG_INFO, "Twitch EventSub connection failed: %s", msg.c_str());
ClearActiveSubscriptions();
if (!_migrating) {
ClearActiveSubscriptions();
}
_connected = false;
if (_disconnect) {
return;
}
WaitAndReconnect();
}
bool Subscription::operator<(const Subscription &other) const

View File

@ -19,10 +19,10 @@
namespace advss {
#ifdef USE_TWITCH_CLI_MOCK
typedef websocketpp::client<websocketpp::config::asio_client> EventSubWSClient;
using EventSubWSClient = websocketpp::client<websocketpp::config::asio_client>;
#else
typedef websocketpp::client<websocketpp::config::asio_tls_client>
EventSubWSClient;
using EventSubWSClient =
websocketpp::client<websocketpp::config::asio_tls_client>;
#endif
struct Event;
@ -61,12 +61,16 @@ public:
void EnableTimestampValidation(bool enable);
private:
static void SetupClient(EventSubWSClient &);
void OnOpen(connection_hdl hdl);
void OnMessage(connection_hdl hdl,
EventSubWSClient::message_ptr message);
void OnClose(connection_hdl hdl);
void OnFail(connection_hdl hdl);
void ConnectThread();
void WaitAndReconnect();
bool IsValidMessageID(const std::string &);
bool IsValidID(const std::string &);
@ -74,20 +78,39 @@ private:
void HandleWelcome(obs_data_t *);
void HandleKeepAlive() const;
void HandleNotification(obs_data_t *);
void HandleReconnect(obs_data_t *);
void HandleServerMigration(obs_data_t *);
void HandleRevocation(obs_data_t *);
void RegisterInstance();
void UnregisterInstance();
EventSubWSClient _client;
void StartServerMigrationClient(const std::string &url);
void OnServerMigrationWelcome(connection_hdl,
std::unique_ptr<EventSubWSClient> &);
struct ParsedMessage {
std::string type;
OBSDataAutoRelease payload;
};
std::optional<const ParsedMessage>
ParseWebSocketMessage(const EventSubWSClient::message_ptr &);
std::unique_ptr<EventSubWSClient> _client;
connection_hdl _connection;
std::unique_ptr<EventSubWSClient> _migrationClient;
connection_hdl _migrationConnection;
std::atomic_bool _migrating{false};
std::string _migrationSessionID;
std::thread _thread;
std::mutex _waitMtx;
std::mutex _connectMtx;
std::condition_variable _cv;
std::atomic_bool _connected{false};
std::atomic_bool _disconnect{false};
std::string _url;
std::string _sessionID;
bool _validateTimestamps = true;