10#include <condition_variable>
30 virtual void newHub(
int hub,
const std::string &name,
int nranks,
const std::string &address,
31 const std::string &logname,
const std::string &realname) = 0;
35 virtual void newModule(
int moduleId,
const boost::uuids::uuid &spawnUuid,
const std::string &moduleName) = 0;
48 virtual void newParameter(
int moduleId,
const std::string ¶meterName) = 0;
53 virtual void newPort(
int moduleId,
const std::string &portName) = 0;
54 virtual void deletePort(
int moduleId,
const std::string &portName) = 0;
56 virtual void newConnection(
int fromId,
const std::string &fromName,
int toId,
const std::string &toName) = 0;
58 virtual void deleteConnection(
int fromId,
const std::string &fromName,
int toId,
const std::string &toName) = 0;
60 virtual void info(
const std::string &text, message::SendText::TextType textType,
int senderId,
int senderRank,
63 virtual void status(
int id,
const std::string &text, message::UpdateStatus::Importance importance) = 0;
65 virtual void updateStatus(
int id,
const std::string &text, message::UpdateStatus::Importance importance) = 0;
67 virtual void quitRequested();
69 virtual void resetModificationCount();
70 virtual void incModificationCount();
71 long modificationCount()
const;
73 virtual void loadedWorkflowChanged(
const std::string &filename);
74 virtual void sessionUrlChanged(
const std::string &url);
77 long m_modificationCount;
81 HubData(
int id,
const std::string &name): id(id), name(name), port(0), dataPort(0) {}
95 friend class ClusterManager;
97 friend class DataProxy;
101 StateTracker(
const std::string &name, std::shared_ptr<PortTracker> portTracker = std::shared_ptr<PortTracker>());
110 int getMasterHub()
const;
111 std::vector<int> getHubs()
const;
112 std::vector<int> getSlaveHubs()
const;
113 const std::string &hubName(
int id)
const;
114 std::vector<int> getRunningList()
const;
115 std::vector<int> getBusyList()
const;
116 int getHub(
int id)
const;
117 const HubData &getHubData(
int id)
const;
118 std::string getModuleName(
int id)
const;
119 int getModuleState(
int id)
const;
120 int getMirrorId(
int id)
const;
121 const std::set<int> &getMirrors(
int id)
const;
123 std::vector<std::string> getParameters(
int id)
const;
124 std::shared_ptr<Parameter> getParameter(
int id,
const std::string &name)
const;
129 bool handle(
const message::Message &msg,
const char *payload,
size_t payloadSize,
bool track =
true);
133 std::shared_ptr<PortTracker> portTracker()
const;
137 : message(m), payload(payload)
147 const std::map<AvailableModule::Key, AvailableModule> &availableModules()
const;
152 std::shared_ptr<message::Buffer> waitForReply(
const message::uuid_t &uuid);
154 std::vector<int> waitForSlaveHubs(
size_t count);
155 std::vector<int> waitForSlaveHubs(
const std::vector<std::string> &names);
157 int graphChangeCount()
const;
159 std::string loadedWorkflowFile()
const;
160 std::string sessionUrl()
const;
163 std::shared_ptr<message::Buffer> removeRequest(
const message::uuid_t &uuid);
166 typedef std::map<std::string, std::shared_ptr<Parameter>>
ParameterMap;
170 int mirrorOfId = message::Id::Invalid;
181 message::UpdateStatus::Importance statusImportance = message::UpdateStatus::Bulk;
182 unsigned long statusTime = 0;
189 bool isSink()
const {
return height == 0; }
197 , objectPolicy(message::ObjectReceivePolicy::Local)
198 , schedulingPolicy(message::SchedulingPolicy::Single)
199 , reducePolicy(message::ReducePolicy::Locally)
205 void computeHeights();
209 int m_graphChangeCount = 0;
217 void cleanQueue(
int moduleId);
218 bool m_processingQueue =
false;
220 std::string statusText()
const;
260 std::shared_ptr<PortTracker> m_portTracker;
262 std::set<message::uuid_t> m_alreadySeen;
265 std::condition_variable_any m_replyCondition;
266 std::map<message::uuid_t, std::shared_ptr<message::Buffer>> m_outstandingReplies;
269 std::condition_variable_any m_slaveCondition;
271 message::Type m_traceType;
274 std::vector<HubData> m_hubs;
276 unsigned long m_statusTime = 1;
277 int m_currentStatusId = message::Id::Invalid;
278 std::string m_currentStatus;
279 message::UpdateStatus::Importance m_currentStatusImportance = message::UpdateStatus::Bulk;
281 size_t m_numMessages = 0;
282 size_t m_numObjects = 0;
283 size_t m_aggregatedPayload = 0;
285 mutable mutex m_stateMutex;
287 std::string m_loadedWorkflowFile;
288 std::string m_sessionUrl;
Definition: availablemodule.h:120
Definition: parameter.h:26
Definition: porttracker.h:18
Definition: statetracker.h:25
virtual void parameterChoicesChanged(int moduleId, const std::string ¶meterName)=0
virtual void deletePort(int moduleId, const std::string &portName)=0
virtual void parameterValueChanged(int moduleId, const std::string ¶meterName)=0
virtual void deleteConnection(int fromId, const std::string &fromName, int toId, const std::string &toName)=0
virtual void deleteHub(int hub)=0
virtual void deleteModule(int moduleId)=0
virtual void newHub(int hub, const std::string &name, int nranks, const std::string &address, const std::string &logname, const std::string &realname)=0
virtual void newParameter(int moduleId, const std::string ¶meterName)=0
ModuleStateBits
Definition: statetracker.h:38
virtual void moduleStateChanged(int moduleId, int stateBits)=0
virtual void status(int id, const std::string &text, message::UpdateStatus::Importance importance)=0
a module sends at status update
virtual void moduleAvailable(const AvailableModule &mod)=0
StateObserver()
Definition: statetracker.h:27
virtual void updateStatus(int id, const std::string &text, message::UpdateStatus::Importance importance)=0
the overall status has changed
virtual void deleteParameter(int moduleId, const std::string ¶meterName)=0
virtual ~StateObserver()
Definition: statetracker.h:28
virtual void info(const std::string &text, message::SendText::TextType textType, int senderId, int senderRank, message::Type refType, const message::uuid_t &refUuid)=0
virtual void newModule(int moduleId, const boost::uuids::uuid &spawnUuid, const std::string &moduleName)=0
virtual void newConnection(int fromId, const std::string &fromName, int toId, const std::string &toName)=0
virtual void newPort(int moduleId, const std::string &portName)=0
Definition: statetracker.h:94
std::map< std::string, std::shared_ptr< Parameter > > ParameterMap
Definition: statetracker.h:166
std::map< int, std::string > ParameterOrder
Definition: statetracker.h:167
std::set< StateObserver * > m_observers
Definition: statetracker.h:213
RunningMap quitMap
Definition: statetracker.h:206
std::vector< MessageWithPayload > VistleState
Definition: statetracker.h:143
std::map< AvailableModule::Key, AvailableModule > m_availableModules
Definition: statetracker.h:211
std::map< int, Module > RunningMap
Definition: statetracker.h:203
bool dispatch(bool &received)
RunningMap runningMap
Definition: statetracker.h:204
std::set< int > ModuleSet
Definition: statetracker.h:207
VistleState m_queue
Definition: statetracker.h:215
ModuleSet busySet
Definition: statetracker.h:208
std::unique_lock< mutex > mutex_locker
Definition: statetracker.h:105
std::recursive_mutex mutex
Definition: statetracker.h:104
announce that a (slave) hub has connected
Definition: messages.h:77
add an object to the input queue of an input port
Definition: messages.h:359
notification that a module has created a parameter
Definition: messages.h:460
notification that a module has created an input/output port
Definition: messages.h:334
Definition: messages.h:586
Definition: messages.h:581
Definition: message.h:286
indicate that a module has started computing
Definition: messages.h:318
terminate a socket connection
Definition: messages.h:961
connect an output port to an input port of another module
Definition: messages.h:414
disconnect an output port from an input port of another module
Definition: messages.h:437
trigger execution of a module function
Definition: messages.h:271
steer execution stages
Definition: messages.h:736
indicate that a module has finished computing
Definition: messages.h:326
request a module to quit
Definition: messages.h:226
Definition: message.h:157
announce availability of a module to UI
Definition: messages.h:783
notify that a module has quit
Definition: messages.h:260
control where AddObject messages are sent
Definition: messages.h:682
debug: request a reply containing character 'c'
Definition: messages.h:134
debug: reply to pong
Definition: messages.h:145
request all modules to quit for terminating the session
Definition: messages.h:238
control whether/when prepare() and reduce() are called
Definition: messages.h:711
request that a slave hub be deleted
Definition: messages.h:124
notification that a module has removed a parameter
Definition: messages.h:483
notification that a module has destroyed an input/output port
Definition: messages.h:347
Definition: messages.h:601
request hub to listen on TCP port and forward incoming connections
Definition: messages.h:807
Definition: messages.h:693
send text messages to user interfaces
Definition: messages.h:607
set list of choice descriptions for a choice parameter
Definition: messages.h:553
request parameter value update or notify that a parameter value has been changed
Definition: messages.h:501
spawn a module
Definition: messages.h:158
acknowledge that a module has been spawned
Definition: messages.h:214
enable/disable message tracing for a module
Definition: messages.h:755
update status of a module (or other entity)
Definition: messages.h:644
#define V_COREEXPORT
Definition: export.h:9
boost::uuids::uuid uuid_t
Definition: message.h:154
Definition: allobjects.cpp:30
std::vector< char, allocator< char > > buffer
Definition: buffer.h:9
std::set< std::shared_ptr< Parameter > > ParameterSet
Definition: statetracker.h:21
Definition: statetracker.h:80
unsigned short dataPort
Definition: statetracker.h:89
std::string realName
Definition: statetracker.h:86
HubData(int id, const std::string &name)
Definition: statetracker.h:81
unsigned short port
Definition: statetracker.h:88
boost::asio::ip::address address
Definition: statetracker.h:90
std::string logName
Definition: statetracker.h:85
std::string name
Definition: statetracker.h:84
int id
Definition: statetracker.h:83
Definition: statetracker.h:135
MessageWithPayload(const message::Message &m, std::shared_ptr< const buffer > payload)
Definition: statetracker.h:136
message::Buffer message
Definition: statetracker.h:140
std::shared_ptr< const buffer > payload
Definition: statetracker.h:141
Definition: statetracker.h:168
bool busy
Definition: statetracker.h:175
message::SchedulingPolicy::Schedule schedulingPolicy
Definition: statetracker.h:185
ParameterMap parameters
Definition: statetracker.h:177
bool initialized
Definition: statetracker.h:173
message::ObjectReceivePolicy::Policy objectPolicy
Definition: statetracker.h:184
bool isSink() const
Definition: statetracker.h:189
bool killed
Definition: statetracker.h:174
int hub
Definition: statetracker.h:172
message::ReducePolicy::Reduce reducePolicy
Definition: statetracker.h:186
std::string statusText
Definition: statetracker.h:180
ParameterOrder paramOrder
Definition: statetracker.h:178
int height
Definition: statetracker.h:179
std::string name
Definition: statetracker.h:176
Module(int id, int hub)
Definition: statetracker.h:190
int id
Definition: statetracker.h:169
std::set< int > mirrors
Definition: statetracker.h:171