Chatterino
BasicPubSubManager.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include "common/QLogging.hpp"
4 #include "common/Version.hpp"
8 #include "util/DebugCount.hpp"
10 
11 #include <QJsonObject>
12 #include <QString>
13 #include <pajlada/signals/signal.hpp>
14 #include <websocketpp/client.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <chrono>
19 #include <exception>
20 #include <map>
21 #include <memory>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25 #include <vector>
26 
27 namespace chatterino {
28 
57 template <typename Subscription>
59 {
60 public:
61  BasicPubSubManager(QString host)
62  : host_(std::move(host))
63  {
64  this->websocketClient_.set_access_channels(
65  websocketpp::log::alevel::all);
66  this->websocketClient_.clear_access_channels(
67  websocketpp::log::alevel::frame_payload |
68  websocketpp::log::alevel::frame_header);
69 
70  this->websocketClient_.init_asio();
71 
72  // SSL Handshake
73  this->websocketClient_.set_tls_init_handler([this](auto hdl) {
74  return this->onTLSInit(hdl);
75  });
76 
77  this->websocketClient_.set_message_handler([this](auto hdl, auto msg) {
78  this->onMessage(hdl, msg);
79  });
80  this->websocketClient_.set_open_handler([this](auto hdl) {
81  this->onConnectionOpen(hdl);
82  });
83  this->websocketClient_.set_close_handler([this](auto hdl) {
84  this->onConnectionClose(hdl);
85  });
86  this->websocketClient_.set_fail_handler([this](auto hdl) {
87  this->onConnectionFail(hdl);
88  });
89  this->websocketClient_.set_user_agent("Chatterino/" CHATTERINO_VERSION
90  " (" CHATTERINO_GIT_HASH ")");
91  }
92 
93  virtual ~BasicPubSubManager() = default;
94 
95  BasicPubSubManager(const BasicPubSubManager &) = delete;
96  BasicPubSubManager(const BasicPubSubManager &&) = delete;
99 
101  struct {
102  std::atomic<uint32_t> connectionsClosed{0};
103  std::atomic<uint32_t> connectionsOpened{0};
104  std::atomic<uint32_t> connectionsFailed{0};
105  } diag;
106 
107  void start()
108  {
109  this->work_ = std::make_shared<boost::asio::io_service::work>(
110  this->websocketClient_.get_io_service());
111  this->mainThread_.reset(new std::thread([this] {
112  runThread();
113  }));
114  }
115 
116  void stop()
117  {
118  this->stopping_ = true;
119 
120  for (const auto &client : this->clients_)
121  {
122  client.second->close("Shutting down");
123  }
124 
125  this->work_.reset();
126 
127  if (this->mainThread_->joinable())
128  {
129  this->mainThread_->join();
130  }
131 
132  assert(this->clients_.empty());
133  }
134 
135 protected:
136  using WebsocketMessagePtr =
137  websocketpp::config::asio_tls_client::message_type::ptr;
138  using WebsocketContextPtr =
139  websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
140 
141  virtual void onMessage(websocketpp::connection_hdl hdl,
142  WebsocketMessagePtr msg) = 0;
143 
144  virtual std::shared_ptr<BasicPubSubClient<Subscription>> createClient(
145  liveupdates::WebsocketClient &client, websocketpp::connection_hdl hdl)
146  {
147  return std::make_shared<BasicPubSubClient<Subscription>>(client, hdl);
148  }
149 
154  std::shared_ptr<BasicPubSubClient<Subscription>> findClient(
155  websocketpp::connection_hdl hdl)
156  {
157  auto clientIt = this->clients_.find(hdl);
158 
159  if (clientIt == this->clients_.end())
160  {
161  return {};
162  }
163 
164  return clientIt->second;
165  }
166 
167  void unsubscribe(const Subscription &subscription)
168  {
169  for (auto &client : this->clients_)
170  {
171  if (client.second->unsubscribe(subscription))
172  {
173  return;
174  }
175  }
176  }
177 
178  void subscribe(const Subscription &subscription)
179  {
180  if (this->trySubscribe(subscription))
181  {
182  return;
183  }
184 
185  this->addClient();
186  this->pendingSubscriptions_.emplace_back(subscription);
187  DebugCount::increase("LiveUpdates subscription backlog");
188  }
189 
190 private:
191  void onConnectionOpen(websocketpp::connection_hdl hdl)
192  {
193  DebugCount::increase("LiveUpdates connections");
194  this->addingClient_ = false;
195  this->diag.connectionsOpened.fetch_add(1, std::memory_order_acq_rel);
196 
197  this->connectBackoff_.reset();
198 
199  auto client = this->createClient(this->websocketClient_, hdl);
200 
201  // We separate the starting from the constructor because we will want to use
202  // shared_from_this
203  client->start();
204 
205  this->clients_.emplace(hdl, client);
206 
207  auto pendingSubsToTake = (std::min)(this->pendingSubscriptions_.size(),
208  client->maxSubscriptions);
209 
210  qCDebug(chatterinoLiveupdates)
211  << "LiveUpdate connection opened, subscribing to"
212  << pendingSubsToTake << "subscriptions!";
213 
214  while (pendingSubsToTake > 0 && !this->pendingSubscriptions_.empty())
215  {
216  const auto last = std::move(this->pendingSubscriptions_.back());
217  this->pendingSubscriptions_.pop_back();
218  if (!client->subscribe(last))
219  {
220  qCDebug(chatterinoLiveupdates)
221  << "Failed to subscribe to" << last << "on new client.";
222  // TODO: should we try to add a new client here?
223  return;
224  }
225  DebugCount::decrease("LiveUpdates subscription backlog");
226  pendingSubsToTake--;
227  }
228 
229  if (!this->pendingSubscriptions_.empty())
230  {
231  this->addClient();
232  }
233  }
234 
235  void onConnectionFail(websocketpp::connection_hdl hdl)
236  {
237  DebugCount::increase("LiveUpdates failed connections");
238  this->diag.connectionsFailed.fetch_add(1, std::memory_order_acq_rel);
239 
240  if (auto conn = this->websocketClient_.get_con_from_hdl(std::move(hdl)))
241  {
242  qCDebug(chatterinoLiveupdates)
243  << "LiveUpdates connection attempt failed (error: "
244  << conn->get_ec().message().c_str() << ")";
245  }
246  else
247  {
248  qCDebug(chatterinoLiveupdates)
249  << "LiveUpdates connection attempt failed but we can't get the "
250  "connection from a handle.";
251  }
252  this->addingClient_ = false;
253  if (!this->pendingSubscriptions_.empty())
254  {
255  runAfter(this->websocketClient_.get_io_service(),
256  this->connectBackoff_.next(), [this](auto /*timer*/) {
257  this->addClient();
258  });
259  }
260  }
261 
262  void onConnectionClose(websocketpp::connection_hdl hdl)
263  {
264  qCDebug(chatterinoLiveupdates) << "Connection closed";
265  DebugCount::decrease("LiveUpdates connections");
266  this->diag.connectionsClosed.fetch_add(1, std::memory_order_acq_rel);
267 
268  auto clientIt = this->clients_.find(hdl);
269 
270  // If this assert goes off, there's something wrong with the connection
271  // creation/preserving code KKona
272  assert(clientIt != this->clients_.end());
273 
274  auto client = clientIt->second;
275 
276  this->clients_.erase(clientIt);
277 
278  client->stop();
279 
280  if (!this->stopping_)
281  {
282  for (const auto &sub : client->subscriptions_)
283  {
284  this->subscribe(sub);
285  }
286  }
287  }
288 
289  WebsocketContextPtr onTLSInit(const websocketpp::connection_hdl & /*hdl*/)
290  {
292  new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12));
293 
294  try
295  {
296  ctx->set_options(boost::asio::ssl::context::default_workarounds |
297  boost::asio::ssl::context::no_sslv2 |
298  boost::asio::ssl::context::single_dh_use);
299  }
300  catch (const std::exception &e)
301  {
302  qCDebug(chatterinoLiveupdates)
303  << "Exception caught in OnTLSInit:" << e.what();
304  }
305 
306  return ctx;
307  }
308 
309  void runThread()
310  {
311  qCDebug(chatterinoLiveupdates) << "Start LiveUpdates manager thread";
312  this->websocketClient_.run();
313  qCDebug(chatterinoLiveupdates)
314  << "Done with LiveUpdates manager thread";
315  }
316 
317  void addClient()
318  {
319  if (this->addingClient_)
320  {
321  return;
322  }
323 
324  qCDebug(chatterinoLiveupdates) << "Adding an additional client";
325 
326  this->addingClient_ = true;
327 
328  websocketpp::lib::error_code ec;
329  auto con = this->websocketClient_.get_connection(
330  this->host_.toStdString(), ec);
331 
332  if (ec)
333  {
334  qCDebug(chatterinoLiveupdates)
335  << "Unable to establish connection:" << ec.message().c_str();
336  return;
337  }
338 
339  this->websocketClient_.connect(con);
340  }
341 
342  bool trySubscribe(const Subscription &subscription)
343  {
344  for (auto &client : this->clients_)
345  {
346  if (client.second->subscribe(subscription))
347  {
348  return true;
349  }
350  }
351  return false;
352  }
353 
355  std::shared_ptr<BasicPubSubClient<Subscription>>,
356  std::owner_less<liveupdates::WebsocketHandle>>
357  clients_;
358 
359  std::vector<Subscription> pendingSubscriptions_;
360  std::atomic<bool> addingClient_{false};
361  ExponentialBackoff<5> connectBackoff_{std::chrono::milliseconds(1000)};
362 
363  std::shared_ptr<boost::asio::io_service::work> work_{nullptr};
364 
365  liveupdates::WebsocketClient websocketClient_;
366  std::unique_ptr<std::thread> mainThread_;
367 
368  const QString host_;
369 
370  bool stopping_{false};
371 };
372 
373 } // namespace chatterino
void subscribe(const Subscription &subscription)
Definition: BasicPubSubManager.hpp:178
std::chrono::milliseconds next()
Definition: ExponentialBackoff.hpp:33
virtual std::shared_ptr< BasicPubSubClient< Subscription > > createClient(liveupdates::WebsocketClient &client, websocketpp::connection_hdl hdl)
Definition: BasicPubSubManager.hpp:144
void reset()
Definition: ExponentialBackoff.hpp:50
BasicPubSubManager(QString host)
Definition: BasicPubSubManager.hpp:61
void start()
Definition: BasicPubSubManager.hpp:107
BasicPubSubManager & operator=(const BasicPubSubManager &)=delete
virtual void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg)=0
Definition: SeventvEventAPISubscription.hpp:67
Definition: Application.cpp:48
websocketpp::lib::shared_ptr< boost::asio::ssl::context > WebsocketContextPtr
Definition: BasicPubSubManager.hpp:139
void unsubscribe(const Subscription &subscription)
Definition: BasicPubSubManager.hpp:167
websocketpp::config::asio_tls_client::message_type::ptr WebsocketMessagePtr
Definition: BasicPubSubManager.hpp:137
std::atomic< uint32_t > connectionsOpened
Definition: BasicPubSubManager.hpp:103
void runAfter(boost::asio::io_service &ioService, Duration duration, Callback cb)
Definition: PubSubHelpers.hpp:17
websocketpp::client< chatterino::BasicPubSubConfig > WebsocketClient
Definition: BasicPubSubWebsocket.hpp:31
static void increase(const QString &name)
Definition: DebugCount.hpp:16
std::atomic< uint32_t > connectionsClosed
Definition: BasicPubSubManager.hpp:102
virtual ~BasicPubSubManager()=default
std::atomic< uint32_t > connectionsFailed
Definition: BasicPubSubManager.hpp:104
static void decrease(const QString &name)
Definition: DebugCount.hpp:45
Definition: BasicPubSubManager.hpp:58
std::shared_ptr< BasicPubSubClient< Subscription > > findClient(websocketpp::connection_hdl hdl)
Definition: BasicPubSubManager.hpp:154
websocketpp::connection_hdl WebsocketHandle
Definition: BasicPubSubWebsocket.hpp:32
#define CHATTERINO_VERSION
Definition: Version.hpp:6
void stop()
Definition: BasicPubSubManager.hpp:116
struct chatterino::BasicPubSubManager::@0 diag