Chatterino
BasicPubSubClient.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <chrono>
5 #include <pajlada/signals/signal.hpp>
6 #include <unordered_set>
7 
8 #include "common/QLogging.hpp"
10 #include "singletons/Settings.hpp"
11 #include "util/DebugCount.hpp"
12 #include "util/Helpers.hpp"
13 
14 namespace chatterino {
15 
27 template <typename Subscription>
29  : public std::enable_shared_from_this<BasicPubSubClient<Subscription>>
30 {
31 public:
32  // The maximum amount of subscriptions this connections can handle
33  const size_t maxSubscriptions;
34 
37  size_t maxSubscriptions = 100)
38  : maxSubscriptions(maxSubscriptions)
39  , websocketClient_(websocketClient)
40  , handle_(std::move(handle))
41  {
42  }
43 
44  virtual ~BasicPubSubClient() = default;
45 
46  BasicPubSubClient(const BasicPubSubClient &) = delete;
47  BasicPubSubClient(const BasicPubSubClient &&) = delete;
49  BasicPubSubClient &operator=(const BasicPubSubClient &&) = delete;
50 
51 protected:
52  virtual void onConnectionEstablished()
53  {
54  }
55 
56  bool send(const char *payload)
57  {
59  this->websocketClient_.send(this->handle_, payload,
60  websocketpp::frame::opcode::text, ec);
61 
62  if (ec)
63  {
64  qCDebug(chatterinoLiveupdates) << "Error sending message" << payload
65  << ":" << ec.message().c_str();
66  return false;
67  }
68 
69  return true;
70  }
71 
80  bool subscribe(const Subscription &subscription)
81  {
82  if (this->subscriptions_.size() >= this->maxSubscriptions)
83  {
84  return false;
85  }
86 
87  if (!this->subscriptions_.emplace(subscription).second)
88  {
89  qCWarning(chatterinoLiveupdates)
90  << "Tried subscribing to" << subscription
91  << "but we're already subscribed!";
92  return true; // true because the subscription already exists
93  }
94 
95  qCDebug(chatterinoLiveupdates) << "Subscribing to" << subscription;
96  DebugCount::increase("LiveUpdates subscriptions");
97 
98  QByteArray encoded = subscription.encodeSubscribe();
99  this->send(encoded);
100 
101  return true;
102  }
103 
108  bool unsubscribe(const Subscription &subscription)
109  {
110  if (this->subscriptions_.erase(subscription) <= 0)
111  {
112  return false;
113  }
114 
115  qCDebug(chatterinoLiveupdates) << "Unsubscribing from" << subscription;
116  DebugCount::decrease("LiveUpdates subscriptions");
117 
118  QByteArray encoded = subscription.encodeUnsubscribe();
119  this->send(encoded);
120 
121  return true;
122  }
123 
124  void close(const std::string &reason,
125  websocketpp::close::status::value code =
126  websocketpp::close::status::normal)
127  {
129 
130  auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec);
131  if (ec)
132  {
133  qCDebug(chatterinoLiveupdates)
134  << "Error getting connection:" << ec.message().c_str();
135  return;
136  }
137 
138  conn->close(code, reason, ec);
139  if (ec)
140  {
141  qCDebug(chatterinoLiveupdates)
142  << "Error closing:" << ec.message().c_str();
143  return;
144  }
145  }
146 
147  bool isStarted() const
148  {
149  return this->started_.load(std::memory_order_acquire);
150  }
151 
153 
154 private:
155  void start()
156  {
157  assert(!this->isStarted());
158  this->started_.store(true, std::memory_order_release);
159  this->onConnectionEstablished();
160  }
161 
162  void stop()
163  {
164  assert(this->isStarted());
165  this->started_.store(false, std::memory_order_release);
166  }
167 
169  std::unordered_set<Subscription> subscriptions_;
170 
171  std::atomic<bool> started_{false};
172 
173  template <typename ManagerSubscription>
174  friend class BasicPubSubManager;
175 };
176 
177 } // namespace chatterino
BasicPubSubClient & operator=(const BasicPubSubClient &)=delete
void close(const std::string &reason, websocketpp::close::status::value code=websocketpp::close::status::normal)
Definition: BasicPubSubClient.hpp:124
websocketpp::lib::error_code WebsocketErrorCode
Definition: BasicPubSubWebsocket.hpp:33
BasicPubSubClient(liveupdates::WebsocketClient &websocketClient, liveupdates::WebsocketHandle handle, size_t maxSubscriptions=100)
Definition: BasicPubSubClient.hpp:35
Definition: BasicPubSubClient.hpp:28
bool subscribe(const Subscription &subscription)
Definition: BasicPubSubClient.hpp:80
Definition: SeventvEventAPISubscription.hpp:67
Definition: Application.cpp:48
bool isStarted() const
Definition: BasicPubSubClient.hpp:147
websocketpp::client< chatterino::BasicPubSubConfig > WebsocketClient
Definition: BasicPubSubWebsocket.hpp:31
static void increase(const QString &name)
Definition: DebugCount.hpp:16
virtual void onConnectionEstablished()
Definition: BasicPubSubClient.hpp:52
const size_t maxSubscriptions
Definition: BasicPubSubClient.hpp:33
static void decrease(const QString &name)
Definition: DebugCount.hpp:45
bool send(const char *payload)
Definition: BasicPubSubClient.hpp:56
Definition: BasicPubSubManager.hpp:58
websocketpp::connection_hdl WebsocketHandle
Definition: BasicPubSubWebsocket.hpp:32
liveupdates::WebsocketClient & websocketClient_
Definition: BasicPubSubClient.hpp:152
virtual ~BasicPubSubClient()=default
bool unsubscribe(const Subscription &subscription)
Definition: BasicPubSubClient.hpp:108