Chatterino
PubSubManager.hpp
Go to the documentation of this file.
1 #pragma once
2 
11 
12 #include <QJsonObject>
13 #include <QString>
14 #include <pajlada/signals/signal.hpp>
15 #include <websocketpp/client.hpp>
16 
17 #include <atomic>
18 #include <chrono>
19 #include <map>
20 #include <memory>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 namespace chatterino {
26 
27 class PubSub
28 {
29  using WebsocketMessagePtr =
30  websocketpp::config::asio_tls_client::message_type::ptr;
31  using WebsocketContextPtr =
32  websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
33 
34  template <typename T>
35  using Signal =
36  pajlada::Signals::Signal<T>; // type-id is vector<T, Alloc<T>>
37 
38  struct NonceInfo {
39  std::weak_ptr<PubSubClient> client;
40  QString messageType; // e.g. LISTEN or UNLISTEN
41  std::vector<QString> topics;
42  std::vector<QString>::size_type topicCount;
43  };
44 
45  WebsocketClient websocketClient;
46  std::unique_ptr<std::thread> mainThread;
47 
48  // Account credentials
49  // Set from setAccount or setAccountData
50  QString token_;
51  QString userID_;
52 
53 public:
54  // The max amount of connections we may open
55  static constexpr int maxConnections = 10;
56 
57  PubSub(const QString &host,
58  std::chrono::seconds pingInterval = std::chrono::seconds(15));
59 
60  void setAccount(std::shared_ptr<TwitchAccount> account)
61  {
62  this->token_ = account->getOAuthToken();
63  this->userID_ = account->getUserId();
64  }
65 
66  void setAccountData(QString token, QString userID)
67  {
68  this->token_ = token;
69  this->userID_ = userID;
70  }
71 
72  ~PubSub() = delete;
73 
74  enum class State {
75  Connected,
76  Disconnected,
77  };
78 
79  void start();
80  void stop();
81 
82  bool isConnected() const
83  {
84  return this->state == State::Connected;
85  }
86 
87  struct {
88  struct {
89  Signal<ClearChatAction> chatCleared;
90  Signal<DeleteAction> messageDeleted;
91  Signal<ModeChangedAction> modeChanged;
92  Signal<ModerationStateAction> moderationStateChanged;
93 
94  Signal<BanAction> userBanned;
95  Signal<UnbanAction> userUnbanned;
96 
97  // Message caught by automod
98  // channelID
99  pajlada::Signals::Signal<PubSubAutoModQueueMessage, QString>
101 
102  // Message blocked by moderator
103  Signal<AutomodAction> autoModMessageBlocked;
104 
105  Signal<AutomodUserAction> automodUserMessage;
106  Signal<AutomodInfoAction> automodInfoMessage;
107  } moderation;
108 
109  struct {
110  // Parsing should be done in PubSubManager as well,
111  // but for now we just send the raw data
112  Signal<const PubSubWhisperMessage &> received;
113  Signal<const PubSubWhisperMessage &> sent;
114  } whisper;
115 
116  struct {
117  Signal<const QJsonObject &> redeemed;
118  } pointReward;
119  } signals_;
120 
122  void unlistenAutomod();
123  void unlistenWhispers();
124 
125  bool listenToWhispers();
126  void listenToChannelModerationActions(const QString &channelID);
127  void listenToAutomod(const QString &channelID);
128 
129  void listenToChannelPointRewards(const QString &channelID);
130 
131  std::vector<QString> requests;
132 
133  struct {
134  std::atomic<uint32_t> connectionsClosed{0};
135  std::atomic<uint32_t> connectionsOpened{0};
136  std::atomic<uint32_t> connectionsFailed{0};
137  std::atomic<uint32_t> messagesReceived{0};
138  std::atomic<uint32_t> messagesFailedToParse{0};
139  std::atomic<uint32_t> failedListenResponses{0};
140  std::atomic<uint32_t> listenResponses{0};
141  std::atomic<uint32_t> unlistenResponses{0};
142  } diag;
143 
144  void listenToTopic(const QString &topic);
145 
146 private:
147  void listen(PubSubListenMessage msg);
148  bool tryListen(PubSubListenMessage msg);
149 
150  bool isListeningToTopic(const QString &topic);
151 
152  void addClient();
153  std::atomic<bool> addingClient{false};
154  ExponentialBackoff<5> connectBackoff{std::chrono::milliseconds(1000)};
155 
156  State state = State::Connected;
157 
158  std::map<WebsocketHandle, std::shared_ptr<PubSubClient>,
159  std::owner_less<WebsocketHandle>>
160  clients;
161 
162  std::unordered_map<
163  QString, std::function<void(const QJsonObject &, const QString &)>>
164  moderationActionHandlers;
165 
166  std::unordered_map<
167  QString, std::function<void(const QJsonObject &, const QString &)>>
168  channelTermsActionHandlers;
169 
170  void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
171  void onConnectionOpen(websocketpp::connection_hdl hdl);
172  void onConnectionFail(websocketpp::connection_hdl hdl);
173  void onConnectionClose(websocketpp::connection_hdl hdl);
174  WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
175 
176  void handleResponse(const PubSubMessage &message);
177  void handleListenResponse(const NonceInfo &info, bool failed);
178  void handleUnlistenResponse(const NonceInfo &info, bool failed);
179  void handleMessageResponse(const PubSubMessageMessage &message);
180 
181  // Register a nonce for a specific client
182  void registerNonce(QString nonce, NonceInfo nonceInfo);
183 
184  // Find client associated with a nonce
185  boost::optional<NonceInfo> findNonceInfo(QString nonce);
186 
187  std::unordered_map<QString, NonceInfo> nonces_;
188 
189  void runThread();
190 
191  std::shared_ptr<boost::asio::io_service::work> work{nullptr};
192 
193  const QString host_;
194  const PubSubClientOptions clientOptions_;
195 
196  bool stopping_{false};
197 };
198 
199 } // namespace chatterino
websocketpp::client< chatterinoconfig > WebsocketClient
Definition: PubSubWebsocket.hpp:28
std::atomic< uint32_t > connectionsOpened
Definition: PubSubManager.hpp:135
struct chatterino::PubSub::@6::@8 moderation
Signal< ModerationStateAction > moderationStateChanged
Definition: PubSubManager.hpp:92
std::vector< QString > requests
Definition: PubSubManager.hpp:131
std::atomic< uint32_t > messagesFailedToParse
Definition: PubSubManager.hpp:138
std::atomic< uint32_t > failedListenResponses
Definition: PubSubManager.hpp:139
void stop()
Definition: PubSubManager.cpp:512
struct chatterino::PubSub::@6::@9 whisper
void listenToTopic(const QString &topic)
Definition: PubSubManager.cpp:1167
Definition: Message.hpp:13
void setAccount(std::shared_ptr< TwitchAccount > account)
Definition: PubSubManager.hpp:60
Signal< BanAction > userBanned
Definition: PubSubManager.hpp:94
bool isConnected() const
Definition: PubSubManager.hpp:82
Definition: Application.cpp:48
Signal< const PubSubWhisperMessage & > sent
Definition: PubSubManager.hpp:113
void unlistenAutomod()
Definition: PubSubManager.cpp:550
State
Definition: PubSubManager.hpp:74
std::atomic< uint32_t > messagesReceived
Definition: PubSubManager.hpp:137
static constexpr int maxConnections
Definition: PubSubManager.hpp:55
struct chatterino::PubSub::@7 diag
bool listenToWhispers()
Definition: PubSubManager.cpp:587
std::atomic< uint32_t > unlistenResponses
Definition: PubSubManager.hpp:141
struct chatterino::PubSub::@6::@10 pointReward
PubSub(const QString &host, std::chrono::seconds pingInterval=std::chrono::seconds(15))
Definition: PubSubManager.cpp:22
Definition: PubSubManager.hpp:27
Signal< AutomodUserAction > automodUserMessage
Definition: PubSubManager.hpp:105
std::atomic< uint32_t > connectionsFailed
Definition: PubSubManager.hpp:136
Signal< ModeChangedAction > modeChanged
Definition: PubSubManager.hpp:91
std::atomic< uint32_t > listenResponses
Definition: PubSubManager.hpp:140
void setAccountData(QString token, QString userID)
Definition: PubSubManager.hpp:66
void listenToAutomod(const QString &channelID)
Definition: PubSubManager.cpp:630
void listenToChannelModerationActions(const QString &channelID)
Definition: PubSubManager.cpp:606
Signal< UnbanAction > userUnbanned
Definition: PubSubManager.hpp:95
void start()
Definition: PubSubManager.cpp:504
Signal< const QJsonObject & > redeemed
Definition: PubSubManager.hpp:117
Options to change the behaviour of the underlying websocket clients.
Definition: PubSubClientOptions.hpp:10
Signal< const PubSubWhisperMessage & > received
Definition: PubSubManager.hpp:112
Signal< AutomodInfoAction > automodInfoMessage
Definition: PubSubManager.hpp:106
void unlistenAllModerationActions()
Definition: PubSubManager.cpp:531
Signal< ClearChatAction > chatCleared
Definition: PubSubManager.hpp:89
Signal< DeleteAction > messageDeleted
Definition: PubSubManager.hpp:90
void unlistenWhispers()
Definition: PubSubManager.cpp:569
Definition: Listen.hpp:10
std::atomic< uint32_t > connectionsClosed
Definition: PubSubManager.hpp:134
struct chatterino::PubSub::@6 signals_
Definition: Base.hpp:13
void listenToChannelPointRewards(const QString &channelID)
Definition: PubSubManager.cpp:654
Signal< AutomodAction > autoModMessageBlocked
Definition: PubSubManager.hpp:103
pajlada::Signals::Signal< PubSubAutoModQueueMessage, QString > autoModMessageCaught
Definition: PubSubManager.hpp:100