View on GitHub

Vistle

Distributed Data-parallel Scientific Visualization in VR

statetracker.h
Go to the documentation of this file.
1#ifndef STATETRACKER_H
2#define STATETRACKER_H
3
4#include <vector>
5#include <map>
6#include <set>
7#include <string>
8
9#include <mutex>
10#include <condition_variable>
11
12#include <vistle/util/buffer.h>
13
14#include "export.h"
15#include "message.h"
16#include "messages.h"
17#include "availablemodule.h"
18
19namespace vistle {
20
21class Parameter;
22typedef std::set<std::shared_ptr<Parameter>> ParameterSet;
23class PortTracker;
24
26public:
27 StateObserver(): m_modificationCount(0) {}
28 virtual ~StateObserver() {}
29
30 virtual void newHub(int hub, const std::string &name, int nranks, const std::string &address,
31 const std::string &logname, const std::string &realname) = 0;
32 virtual void deleteHub(int hub) = 0;
33 virtual void moduleAvailable(const AvailableModule &mod) = 0;
34
35 virtual void newModule(int moduleId, const boost::uuids::uuid &spawnUuid, const std::string &moduleName) = 0;
36 virtual void deleteModule(int moduleId) = 0;
37
39 Unknown = 0,
40 Known = 1,
41 Initialized = 2,
42 Killed = 4,
43 Quit = 8,
44 Busy = 16,
45 };
46 virtual void moduleStateChanged(int moduleId, int stateBits) = 0;
47
48 virtual void newParameter(int moduleId, const std::string &parameterName) = 0;
49 virtual void parameterValueChanged(int moduleId, const std::string &parameterName) = 0;
50 virtual void parameterChoicesChanged(int moduleId, const std::string &parameterName) = 0;
51 virtual void deleteParameter(int moduleId, const std::string &parameterName) = 0;
52
53 virtual void newPort(int moduleId, const std::string &portName) = 0;
54 virtual void deletePort(int moduleId, const std::string &portName) = 0;
55
56 virtual void newConnection(int fromId, const std::string &fromName, int toId, const std::string &toName) = 0;
57
58 virtual void deleteConnection(int fromId, const std::string &fromName, int toId, const std::string &toName) = 0;
59
60 virtual void info(const std::string &text, message::SendText::TextType textType, int senderId, int senderRank,
61 message::Type refType, const message::uuid_t &refUuid) = 0;
63 virtual void status(int id, const std::string &text, message::UpdateStatus::Importance importance) = 0;
65 virtual void updateStatus(int id, const std::string &text, message::UpdateStatus::Importance importance) = 0;
66
67 virtual void quitRequested();
68
69 virtual void resetModificationCount();
70 virtual void incModificationCount();
71 long modificationCount() const;
72
73 virtual void loadedWorkflowChanged(const std::string &filename);
74 virtual void sessionUrlChanged(const std::string &url);
75
76private:
77 long m_modificationCount;
78};
79
81 HubData(int id, const std::string &name): id(id), name(name), port(0), dataPort(0) {}
82
83 int id;
84 std::string name;
85 std::string logName;
86 std::string realName;
87 int numRanks = 0;
88 unsigned short port;
89 unsigned short dataPort;
90 boost::asio::ip::address address;
91 bool hasUi = false;
92};
93
95 friend class ClusterManager;
96 friend class Hub;
97 friend class DataProxy;
98 friend class PortTracker;
99
100public:
101 StateTracker(const std::string &name, std::shared_ptr<PortTracker> portTracker = std::shared_ptr<PortTracker>());
103
104 typedef std::recursive_mutex mutex;
105 typedef std::unique_lock<mutex> mutex_locker;
106 mutex &getMutex();
107
108 bool dispatch(bool &received);
109
110 int getMasterHub() const;
111 std::vector<int> getHubs() const;
112 std::vector<int> getSlaveHubs() const;
113 const std::string &hubName(int id) const;
114 std::vector<int> getRunningList() const;
115 std::vector<int> getBusyList() const;
116 int getHub(int id) const;
117 const HubData &getHubData(int id) const;
118 std::string getModuleName(int id) const;
119 int getModuleState(int id) const;
120 int getMirrorId(int id) const;
121 const std::set<int> &getMirrors(int id) const;
122
123 std::vector<std::string> getParameters(int id) const;
124 std::shared_ptr<Parameter> getParameter(int id, const std::string &name) const;
125
126 ParameterSet getConnectedParameters(const Parameter &param) const;
127
128 bool handle(const message::Message &msg, const buffer *payload, bool track = true);
129 bool handle(const message::Message &msg, const char *payload, size_t payloadSize, bool track = true);
130 bool handleConnect(const message::Connect &conn);
131 bool handleDisconnect(const message::Disconnect &disc);
132
133 std::shared_ptr<PortTracker> portTracker() const;
134
136 MessageWithPayload(const message::Message &m, std::shared_ptr<const buffer> payload)
137 : message(m), payload(payload)
138 {}
139
141 std::shared_ptr<const buffer> payload;
142 };
143 typedef std::vector<MessageWithPayload> VistleState;
144
145 VistleState getState() const;
146
147 const std::map<AvailableModule::Key, AvailableModule> &availableModules() const;
148
149 void registerObserver(StateObserver *observer);
150
151 bool registerRequest(const message::uuid_t &uuid);
152 std::shared_ptr<message::Buffer> waitForReply(const message::uuid_t &uuid);
153
154 std::vector<int> waitForSlaveHubs(size_t count);
155 std::vector<int> waitForSlaveHubs(const std::vector<std::string> &names);
156
157 int graphChangeCount() const;
158
159 std::string loadedWorkflowFile() const;
160 std::string sessionUrl() const;
161
162protected:
163 std::shared_ptr<message::Buffer> removeRequest(const message::uuid_t &uuid);
164 bool registerReply(const message::uuid_t &uuid, const message::Message &msg);
165
166 typedef std::map<std::string, std::shared_ptr<Parameter>> ParameterMap;
167 typedef std::map<int, std::string> ParameterOrder;
168 struct Module {
169 int id;
170 int mirrorOfId = message::Id::Invalid;
171 std::set<int> mirrors;
172 int hub;
174 bool killed;
175 bool busy;
176 std::string name;
179 int height; //< length of shortest path to a sink
180 std::string statusText;
181 message::UpdateStatus::Importance statusImportance = message::UpdateStatus::Bulk;
182 unsigned long statusTime = 0;
183
184 message::ObjectReceivePolicy::Policy objectPolicy;
185 message::SchedulingPolicy::Schedule schedulingPolicy;
186 message::ReducePolicy::Reduce reducePolicy;
187
188 int state() const;
189 bool isSink() const { return height == 0; }
190 Module(int id, int hub)
191 : id(id)
192 , hub(hub)
193 , initialized(false)
194 , killed(false)
195 , busy(false)
196 , height(0)
197 , objectPolicy(message::ObjectReceivePolicy::Local)
198 , schedulingPolicy(message::SchedulingPolicy::Single)
199 , reducePolicy(message::ReducePolicy::Locally)
200 {}
201 };
202
203 typedef std::map<int, Module> RunningMap;
204 RunningMap runningMap; //< currently running modules on all connected clusters
205 void computeHeights(); //< compute heights for all modules in runningMap
206 RunningMap quitMap; //< history of already terminated modules - for module -> hub mapping
207 typedef std::set<int> ModuleSet;
209 int m_graphChangeCount = 0;
210
211 std::map<AvailableModule::Key, AvailableModule> m_availableModules;
212
213 std::set<StateObserver *> m_observers;
214
216 void processQueue();
217 void cleanQueue(int moduleId);
218 bool m_processingQueue = false;
219
220 std::string statusText() const;
221
222private:
223 void updateStatus();
224
225 bool handlePriv(const message::AddHub &slave);
226 bool handlePriv(const message::RemoveHub &slave);
227 bool handlePriv(const message::Ping &ping);
228 bool handlePriv(const message::Pong &pong);
229 bool handlePriv(const message::Trace &trace);
230 bool handlePriv(const message::Spawn &spawn);
231 bool handlePriv(const message::Started &started);
232 bool handlePriv(const message::Connect &connect);
233 bool handlePriv(const message::Disconnect &disc);
234 bool handlePriv(const message::ModuleExit &moduleExit);
235 bool handlePriv(const message::Execute &execute);
236 bool handlePriv(const message::ExecutionProgress &prog);
237 bool handlePriv(const message::Busy &busy);
238 bool handlePriv(const message::Idle &idle);
239 bool handlePriv(const message::AddPort &createPort);
240 bool handlePriv(const message::RemovePort &destroyPort);
241 bool handlePriv(const message::AddParameter &addParam);
242 bool handlePriv(const message::RemoveParameter &removeParam);
243 bool handlePriv(const message::SetParameter &setParam);
244 bool handlePriv(const message::SetParameterChoices &choices, const buffer &payload);
245 bool handlePriv(const message::Kill &kill);
246 bool handlePriv(const message::AddObject &addObj);
247 bool handlePriv(const message::Barrier &barrier);
248 bool handlePriv(const message::BarrierReached &barrierReached);
249 bool handlePriv(const message::SendText &info, const buffer &payload);
250 bool handlePriv(const message::UpdateStatus &status);
251 bool handlePriv(const message::ReplayFinished &reset);
252 bool handlePriv(const message::Quit &quit);
253 bool handlePriv(const message::ModuleAvailable &mod, const buffer &payload);
254 bool handlePriv(const message::ObjectReceivePolicy &pol);
255 bool handlePriv(const message::ReducePolicy &pol);
256 bool handlePriv(const message::SchedulingPolicy &pol);
257 bool handlePriv(const message::RequestTunnel &tunnel);
258 bool handlePriv(const message::CloseConnection &close);
259
260 std::shared_ptr<PortTracker> m_portTracker;
261
262 std::set<message::uuid_t> m_alreadySeen;
263
264 mutex m_replyMutex;
265 std::condition_variable_any m_replyCondition;
266 std::map<message::uuid_t, std::shared_ptr<message::Buffer>> m_outstandingReplies;
267
268 mutex m_slaveMutex;
269 std::condition_variable_any m_slaveCondition;
270
271 message::Type m_traceType;
272 int m_traceId;
273 std::string m_name;
274 std::vector<HubData> m_hubs;
275
276 unsigned long m_statusTime = 1;
277 int m_currentStatusId = message::Id::Invalid;
278 std::string m_currentStatus;
279 message::UpdateStatus::Importance m_currentStatusImportance = message::UpdateStatus::Bulk;
280
281 size_t m_numMessages = 0;
282 size_t m_numObjects = 0;
283 size_t m_aggregatedPayload = 0;
284
285 mutable mutex m_stateMutex;
286
287 std::string m_loadedWorkflowFile;
288 std::string m_sessionUrl;
289};
290
291} // namespace vistle
292
293#endif
Definition: availablemodule.h:120
Definition: parameter.h:26
Definition: porttracker.h:18
Definition: statetracker.h:25
virtual void parameterChoicesChanged(int moduleId, const std::string &parameterName)=0
virtual void deletePort(int moduleId, const std::string &portName)=0
virtual void parameterValueChanged(int moduleId, const std::string &parameterName)=0
virtual void deleteConnection(int fromId, const std::string &fromName, int toId, const std::string &toName)=0
virtual void deleteHub(int hub)=0
virtual void deleteModule(int moduleId)=0
virtual void newHub(int hub, const std::string &name, int nranks, const std::string &address, const std::string &logname, const std::string &realname)=0
virtual void newParameter(int moduleId, const std::string &parameterName)=0
ModuleStateBits
Definition: statetracker.h:38
virtual void moduleStateChanged(int moduleId, int stateBits)=0
virtual void status(int id, const std::string &text, message::UpdateStatus::Importance importance)=0
a module sends at status update
virtual void moduleAvailable(const AvailableModule &mod)=0
StateObserver()
Definition: statetracker.h:27
virtual void updateStatus(int id, const std::string &text, message::UpdateStatus::Importance importance)=0
the overall status has changed
virtual void deleteParameter(int moduleId, const std::string &parameterName)=0
virtual ~StateObserver()
Definition: statetracker.h:28
virtual void info(const std::string &text, message::SendText::TextType textType, int senderId, int senderRank, message::Type refType, const message::uuid_t &refUuid)=0
virtual void newModule(int moduleId, const boost::uuids::uuid &spawnUuid, const std::string &moduleName)=0
virtual void newConnection(int fromId, const std::string &fromName, int toId, const std::string &toName)=0
virtual void newPort(int moduleId, const std::string &portName)=0
Definition: statetracker.h:94
std::map< std::string, std::shared_ptr< Parameter > > ParameterMap
Definition: statetracker.h:166
std::map< int, std::string > ParameterOrder
Definition: statetracker.h:167
std::set< StateObserver * > m_observers
Definition: statetracker.h:213
RunningMap quitMap
Definition: statetracker.h:206
std::vector< MessageWithPayload > VistleState
Definition: statetracker.h:143
std::map< AvailableModule::Key, AvailableModule > m_availableModules
Definition: statetracker.h:211
std::map< int, Module > RunningMap
Definition: statetracker.h:203
bool dispatch(bool &received)
RunningMap runningMap
Definition: statetracker.h:204
std::set< int > ModuleSet
Definition: statetracker.h:207
VistleState m_queue
Definition: statetracker.h:215
ModuleSet busySet
Definition: statetracker.h:208
std::unique_lock< mutex > mutex_locker
Definition: statetracker.h:105
std::recursive_mutex mutex
Definition: statetracker.h:104
announce that a (slave) hub has connected
Definition: messages.h:77
add an object to the input queue of an input port
Definition: messages.h:359
notification that a module has created a parameter
Definition: messages.h:460
notification that a module has created an input/output port
Definition: messages.h:334
Definition: messages.h:586
Definition: messages.h:581
Definition: message.h:286
indicate that a module has started computing
Definition: messages.h:318
terminate a socket connection
Definition: messages.h:961
connect an output port to an input port of another module
Definition: messages.h:414
disconnect an output port from an input port of another module
Definition: messages.h:437
trigger execution of a module function
Definition: messages.h:271
steer execution stages
Definition: messages.h:736
indicate that a module has finished computing
Definition: messages.h:326
request a module to quit
Definition: messages.h:226
Definition: message.h:157
announce availability of a module to UI
Definition: messages.h:783
notify that a module has quit
Definition: messages.h:260
control where AddObject messages are sent
Definition: messages.h:682
debug: request a reply containing character 'c'
Definition: messages.h:134
debug: reply to pong
Definition: messages.h:145
request all modules to quit for terminating the session
Definition: messages.h:238
control whether/when prepare() and reduce() are called
Definition: messages.h:711
request that a slave hub be deleted
Definition: messages.h:124
notification that a module has removed a parameter
Definition: messages.h:483
notification that a module has destroyed an input/output port
Definition: messages.h:347
Definition: messages.h:601
request hub to listen on TCP port and forward incoming connections
Definition: messages.h:807
Definition: messages.h:693
send text messages to user interfaces
Definition: messages.h:607
set list of choice descriptions for a choice parameter
Definition: messages.h:553
request parameter value update or notify that a parameter value has been changed
Definition: messages.h:501
spawn a module
Definition: messages.h:158
acknowledge that a module has been spawned
Definition: messages.h:214
enable/disable message tracing for a module
Definition: messages.h:755
update status of a module (or other entity)
Definition: messages.h:644
#define V_COREEXPORT
Definition: export.h:9
boost::uuids::uuid uuid_t
Definition: message.h:154
Definition: allobjects.cpp:30
std::vector< char, allocator< char > > buffer
Definition: buffer.h:9
std::set< std::shared_ptr< Parameter > > ParameterSet
Definition: statetracker.h:21
Definition: statetracker.h:80
unsigned short dataPort
Definition: statetracker.h:89
std::string realName
Definition: statetracker.h:86
HubData(int id, const std::string &name)
Definition: statetracker.h:81
unsigned short port
Definition: statetracker.h:88
boost::asio::ip::address address
Definition: statetracker.h:90
std::string logName
Definition: statetracker.h:85
std::string name
Definition: statetracker.h:84
int id
Definition: statetracker.h:83
Definition: statetracker.h:135
MessageWithPayload(const message::Message &m, std::shared_ptr< const buffer > payload)
Definition: statetracker.h:136
message::Buffer message
Definition: statetracker.h:140
std::shared_ptr< const buffer > payload
Definition: statetracker.h:141
Definition: statetracker.h:168
bool busy
Definition: statetracker.h:175
message::SchedulingPolicy::Schedule schedulingPolicy
Definition: statetracker.h:185
ParameterMap parameters
Definition: statetracker.h:177
bool initialized
Definition: statetracker.h:173
message::ObjectReceivePolicy::Policy objectPolicy
Definition: statetracker.h:184
bool isSink() const
Definition: statetracker.h:189
bool killed
Definition: statetracker.h:174
int hub
Definition: statetracker.h:172
message::ReducePolicy::Reduce reducePolicy
Definition: statetracker.h:186
std::string statusText
Definition: statetracker.h:180
ParameterOrder paramOrder
Definition: statetracker.h:178
int height
Definition: statetracker.h:179
std::string name
Definition: statetracker.h:176
Module(int id, int hub)
Definition: statetracker.h:190
int id
Definition: statetracker.h:169
std::set< int > mirrors
Definition: statetracker.h:171