Fix subscription handling and improve logging

* Revocations were not handled properly
* Active subscriptions were not cleared on reconnect
* Migration client handling could cause crash
* Add error logging
This commit is contained in:
WarmUpTill 2026-03-14 10:55:54 +01:00 committed by WarmUpTill
parent 2405b6dbbf
commit bf216e0917
3 changed files with 72 additions and 20 deletions

View File

@ -207,6 +207,26 @@ static obs_data_t *copyData(const OBSData &data)
return obs_data_create_from_json(json);
}
void EventSub::LogActiveSubscriptions() const
{
if (!VerboseLoggingEnabled()) {
return;
}
if (_activeSubscriptions.empty()) {
blog(LOG_INFO, "Twitch EventSub active subscriptions: none");
return;
}
blog(LOG_INFO, "Twitch EventSub active subscriptions: %zu",
_activeSubscriptions.size());
for (const auto &sub : _activeSubscriptions) {
const char *type = obs_data_get_string(sub.data, "type");
blog(LOG_INFO, " id=%s type=%s", sub.id.c_str(),
type ? type : "");
}
}
std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
Subscription subscription)
{
@ -218,7 +238,12 @@ std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
std::unique_lock<std::mutex> lock(eventSub->_subscriptionMtx);
if (!eventSub->_connected) {
vblog(LOG_INFO, "Twitch EventSub connect started for %s",
if (eventSub->_reconnecting) {
return "";
}
vblog(LOG_INFO,
"new Twitch EventSub connect attempt started for %s",
token->GetName().c_str());
lock.unlock();
eventSub->Connect();
@ -226,9 +251,17 @@ std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
}
if (isAlreadySubscribed(eventSub->_activeSubscriptions, subscription)) {
eventSub->LogActiveSubscriptions();
return eventSub->_activeSubscriptions.find(subscription)->id;
}
if (!eventSub->IsValidID(eventSub->_sessionID)) {
vblog(LOG_INFO,
"session ID of Twitch event sub invalid - skip %s",
__func__);
return "";
}
OBSDataAutoRelease postData = copyData(subscription.data);
setTransportData(postData.Get(), eventSub->_sessionID);
auto result = SendPostRequest(*token, registerSubscriptionURL.data(),
@ -236,8 +269,14 @@ std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
postData.Get());
if (result.status != 202) {
vblog(LOG_INFO, "failed to register Twitch EventSub (%d)",
result.status);
const char *error = obs_data_get_string(result.data, "error");
const char *message =
obs_data_get_string(result.data, "message");
vblog(LOG_INFO,
"failed to register Twitch EventSub (%d): %s - %s",
result.status, error ? error : "no error",
message ? message : "no message");
eventSub->LogActiveSubscriptions();
return "";
}
@ -246,6 +285,7 @@ std::string EventSub::AddEventSubscription(std::shared_ptr<TwitchToken> token,
OBSDataAutoRelease replyData = obs_data_array_item(replyArray, 0);
subscription.id = obs_data_get_string(replyData, "id");
eventSub->_activeSubscriptions.emplace(subscription);
eventSub->LogActiveSubscriptions();
return subscription.id;
}
@ -402,7 +442,9 @@ void EventSub::HandleWelcome(obs_data_t *data)
{
OBSDataAutoRelease session = obs_data_get_obj(data, "session");
_sessionID = obs_data_get_string(session, "id");
ClearActiveSubscriptions();
blog(LOG_INFO, "Twitch EventSub connected");
vblog(LOG_INFO, "Twitch EventSub session id: %s", _sessionID.c_str());
}
void EventSub::HandleKeepAlive() const
@ -430,11 +472,14 @@ void EventSub::OnServerMigrationWelcome(
// Disable reconnect handling for old connection which will be closed
_client->set_close_handler([](connection_hdl) {});
_client->set_fail_handler([](connection_hdl) {});
auto connection = _client->get_con_from_hdl(_connection);
connection->set_close_handler([](connection_hdl) {
vblog(LOG_INFO, "previous Twitch EventSub connection closed");
});
connection->set_fail_handler([](connection_hdl) {});
if (!_connection.expired()) {
auto connection = _client->get_con_from_hdl(_connection);
connection->set_close_handler([](connection_hdl) {
vblog(LOG_INFO,
"previous Twitch EventSub connection closed");
});
connection->set_fail_handler([](connection_hdl) {});
}
websocketpp::lib::error_code ec;
_client->close(_connection, websocketpp::close::status::normal,
@ -442,6 +487,7 @@ void EventSub::OnServerMigrationWelcome(
_client.swap(newClient);
_sessionID = _migrationSessionID;
vblog(LOG_INFO, "Twitch EventSub session id: %s", _sessionID.c_str());
_connection = newHdl;
_client->set_open_handler(bind(&EventSub::OnOpen, this, _1));
@ -461,17 +507,16 @@ void EventSub::OnServerMigrationWelcome(
void EventSub::StartServerMigrationClient(const std::string &url)
{
auto client = std::make_unique<EventSubWSClient>();
SetupClient(*client);
_migrationClient = std::make_unique<EventSubWSClient>();
SetupClient(*_migrationClient);
client->set_open_handler([this](connection_hdl) {
_migrationClient->set_open_handler([this](connection_hdl) {
vblog(LOG_INFO, "Twitch EventSub migration client opened");
});
client->set_message_handler([this,
&client](connection_hdl hdl,
EventSubWSClient::message_ptr
message) {
_migrationClient->set_message_handler([this](connection_hdl hdl,
EventSubWSClient::message_ptr
message) {
const auto msg = ParseWebSocketMessage(message);
if (!msg) {
return;
@ -487,14 +532,14 @@ void EventSub::StartServerMigrationClient(const std::string &url)
obs_data_get_obj(data, "session");
_migrationSessionID =
obs_data_get_string(session, "id");
OnServerMigrationWelcome(hdl, client);
OnServerMigrationWelcome(hdl, _migrationClient);
} else {
OnMessage(hdl, message);
}
});
websocketpp::lib::error_code ec;
auto con = client->get_connection(url, ec);
auto con = _migrationClient->get_connection(url, ec);
if (ec) {
blog(LOG_ERROR,
"Twitch EventSub migration connection failed: %s",
@ -504,8 +549,8 @@ void EventSub::StartServerMigrationClient(const std::string &url)
}
_migrationConnection = con;
client->connect(con);
client->run();
_migrationClient->connect(con);
_migrationClient->run();
}
void EventSub::HandleServerMigration(obs_data_t *data)
@ -563,7 +608,7 @@ void EventSub::HandleRevocation(obs_data_t *data)
std::lock_guard<std::mutex> lock(_subscriptionMtx);
for (auto it = _activeSubscriptions.begin();
it != _activeSubscriptions.begin();) {
it != _activeSubscriptions.end();) {
if (it->id == id) {
it = _activeSubscriptions.erase(it);
} else {

View File

@ -81,6 +81,8 @@ private:
void HandleServerMigration(obs_data_t *);
void HandleRevocation(obs_data_t *);
void LogActiveSubscriptions() const;
void RegisterInstance();
void UnregisterInstance();

View File

@ -185,6 +185,11 @@ static RequestResult processResult(const httplib::Result &response,
return result;
}
if (response->status < 200 || response->status >= 300) {
vblog(LOG_INFO, "Twitch API error response: %s",
response->body.c_str());
}
OBSDataAutoRelease replyData =
obs_data_create_from_json(response->body.c_str());
result.data = replyData;