11 #include <QJsonObject> 13 #include <pajlada/signals/signal.hpp> 14 #include <websocketpp/client.hpp> 23 #include <unordered_map> 57 template <
typename Subscription>
62 : host_(
std::move(host))
64 this->websocketClient_.set_access_channels(
65 websocketpp::log::alevel::all);
66 this->websocketClient_.clear_access_channels(
67 websocketpp::log::alevel::frame_payload |
68 websocketpp::log::alevel::frame_header);
70 this->websocketClient_.init_asio();
73 this->websocketClient_.set_tls_init_handler([
this](
auto hdl) {
74 return this->onTLSInit(hdl);
77 this->websocketClient_.set_message_handler([
this](
auto hdl,
auto msg) {
80 this->websocketClient_.set_open_handler([
this](
auto hdl) {
81 this->onConnectionOpen(hdl);
83 this->websocketClient_.set_close_handler([
this](
auto hdl) {
84 this->onConnectionClose(hdl);
86 this->websocketClient_.set_fail_handler([
this](
auto hdl) {
87 this->onConnectionFail(hdl);
90 " (" CHATTERINO_GIT_HASH
")");
109 this->work_ = std::make_shared<boost::asio::io_service::work>(
110 this->websocketClient_.get_io_service());
111 this->mainThread_.reset(
new std::thread([
this] {
118 this->stopping_ =
true;
120 for (
const auto &client : this->clients_)
122 client.second->close(
"Shutting down");
127 if (this->mainThread_->joinable())
129 this->mainThread_->join();
132 assert(this->clients_.empty());
137 websocketpp::config::asio_tls_client::message_type::ptr;
139 websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
141 virtual void onMessage(websocketpp::connection_hdl hdl,
147 return std::make_shared<BasicPubSubClient<Subscription>>(client, hdl);
155 websocketpp::connection_hdl hdl)
157 auto clientIt = this->clients_.find(hdl);
159 if (clientIt == this->clients_.end())
164 return clientIt->second;
169 for (
auto &client : this->clients_)
171 if (client.second->unsubscribe(subscription))
180 if (this->trySubscribe(subscription))
186 this->pendingSubscriptions_.emplace_back(subscription);
191 void onConnectionOpen(websocketpp::connection_hdl hdl)
194 this->addingClient_ =
false;
195 this->
diag.connectionsOpened.fetch_add(1, std::memory_order_acq_rel);
197 this->connectBackoff_.
reset();
199 auto client = this->
createClient(this->websocketClient_, hdl);
205 this->clients_.emplace(hdl, client);
207 auto pendingSubsToTake = (std::min)(this->pendingSubscriptions_.size(),
208 client->maxSubscriptions);
210 qCDebug(chatterinoLiveupdates)
211 <<
"LiveUpdate connection opened, subscribing to" 212 << pendingSubsToTake <<
"subscriptions!";
214 while (pendingSubsToTake > 0 && !this->pendingSubscriptions_.empty())
216 const auto last = std::move(this->pendingSubscriptions_.back());
217 this->pendingSubscriptions_.pop_back();
218 if (!client->subscribe(last))
220 qCDebug(chatterinoLiveupdates)
221 <<
"Failed to subscribe to" << last <<
"on new client.";
229 if (!this->pendingSubscriptions_.empty())
235 void onConnectionFail(websocketpp::connection_hdl hdl)
238 this->
diag.connectionsFailed.fetch_add(1, std::memory_order_acq_rel);
240 if (
auto conn = this->websocketClient_.get_con_from_hdl(std::move(hdl)))
242 qCDebug(chatterinoLiveupdates)
243 <<
"LiveUpdates connection attempt failed (error: " 244 << conn->get_ec().message().c_str() <<
")";
248 qCDebug(chatterinoLiveupdates)
249 <<
"LiveUpdates connection attempt failed but we can't get the " 250 "connection from a handle.";
252 this->addingClient_ =
false;
253 if (!this->pendingSubscriptions_.empty())
255 runAfter(this->websocketClient_.get_io_service(),
256 this->connectBackoff_.
next(), [
this](
auto ) {
262 void onConnectionClose(websocketpp::connection_hdl hdl)
264 qCDebug(chatterinoLiveupdates) <<
"Connection closed";
266 this->
diag.connectionsClosed.fetch_add(1, std::memory_order_acq_rel);
268 auto clientIt = this->clients_.find(hdl);
272 assert(clientIt != this->clients_.end());
274 auto client = clientIt->second;
276 this->clients_.erase(clientIt);
280 if (!this->stopping_)
282 for (
const auto &sub : client->subscriptions_)
292 new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12));
296 ctx->set_options(boost::asio::ssl::context::default_workarounds |
297 boost::asio::ssl::context::no_sslv2 |
298 boost::asio::ssl::context::single_dh_use);
300 catch (
const std::exception &e)
302 qCDebug(chatterinoLiveupdates)
303 <<
"Exception caught in OnTLSInit:" << e.what();
311 qCDebug(chatterinoLiveupdates) <<
"Start LiveUpdates manager thread";
312 this->websocketClient_.run();
313 qCDebug(chatterinoLiveupdates)
314 <<
"Done with LiveUpdates manager thread";
319 if (this->addingClient_)
324 qCDebug(chatterinoLiveupdates) <<
"Adding an additional client";
326 this->addingClient_ =
true;
328 websocketpp::lib::error_code ec;
329 auto con = this->websocketClient_.get_connection(
330 this->host_.toStdString(), ec);
334 qCDebug(chatterinoLiveupdates)
335 <<
"Unable to establish connection:" << ec.message().c_str();
339 this->websocketClient_.connect(con);
344 for (
auto &client : this->clients_)
346 if (client.second->subscribe(subscription))
355 std::shared_ptr<BasicPubSubClient<Subscription>>,
356 std::owner_less<liveupdates::WebsocketHandle>>
359 std::vector<Subscription> pendingSubscriptions_;
360 std::atomic<bool> addingClient_{
false};
363 std::shared_ptr<boost::asio::io_service::work> work_{
nullptr};
366 std::unique_ptr<std::thread> mainThread_;
370 bool stopping_{
false};
void subscribe(const Subscription &subscription)
Definition: BasicPubSubManager.hpp:178
std::chrono::milliseconds next()
Definition: ExponentialBackoff.hpp:33
virtual std::shared_ptr< BasicPubSubClient< Subscription > > createClient(liveupdates::WebsocketClient &client, websocketpp::connection_hdl hdl)
Definition: BasicPubSubManager.hpp:144
void reset()
Definition: ExponentialBackoff.hpp:50
BasicPubSubManager(QString host)
Definition: BasicPubSubManager.hpp:61
void start()
Definition: BasicPubSubManager.hpp:107
BasicPubSubManager & operator=(const BasicPubSubManager &)=delete
virtual void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg)=0
Definition: SeventvEventAPISubscription.hpp:67
Definition: Application.cpp:48
websocketpp::lib::shared_ptr< boost::asio::ssl::context > WebsocketContextPtr
Definition: BasicPubSubManager.hpp:139
void unsubscribe(const Subscription &subscription)
Definition: BasicPubSubManager.hpp:167
websocketpp::config::asio_tls_client::message_type::ptr WebsocketMessagePtr
Definition: BasicPubSubManager.hpp:137
std::atomic< uint32_t > connectionsOpened
Definition: BasicPubSubManager.hpp:103
void runAfter(boost::asio::io_service &ioService, Duration duration, Callback cb)
Definition: PubSubHelpers.hpp:17
websocketpp::client< chatterino::BasicPubSubConfig > WebsocketClient
Definition: BasicPubSubWebsocket.hpp:31
static void increase(const QString &name)
Definition: DebugCount.hpp:16
std::atomic< uint32_t > connectionsClosed
Definition: BasicPubSubManager.hpp:102
virtual ~BasicPubSubManager()=default
std::atomic< uint32_t > connectionsFailed
Definition: BasicPubSubManager.hpp:104
static void decrease(const QString &name)
Definition: DebugCount.hpp:45
Definition: BasicPubSubManager.hpp:58
std::shared_ptr< BasicPubSubClient< Subscription > > findClient(websocketpp::connection_hdl hdl)
Definition: BasicPubSubManager.hpp:154
websocketpp::connection_hdl WebsocketHandle
Definition: BasicPubSubWebsocket.hpp:32
#define CHATTERINO_VERSION
Definition: Version.hpp:6
void stop()
Definition: BasicPubSubManager.hpp:116
struct chatterino::BasicPubSubManager::@0 diag