View on GitHub

Vistle

Distributed Data-parallel Scientific Visualization in VR

module.h
Go to the documentation of this file.
1#ifndef MODULE_H
2#define MODULE_H
3
13#if 0
14#ifndef MPICH_IGNORE_CXX_SEEK
15#define MPICH_IGNORE_CXX_SEEK
16#endif
17#include <mpi.h>
18#else
19#include <boost/mpi.hpp>
20#endif
21#include <boost/config.hpp>
22
23#include <iostream>
24#include <list>
25#include <map>
26#include <exception>
27#include <deque>
28#include <mutex>
29#include <future>
30
32#include <vistle/core/object.h>
34#include <vistle/core/port.h>
35#include <vistle/core/grid.h>
36#include <vistle/core/message.h>
40
41#include "objectcache.h"
42#include "export.h"
43
44#ifdef MODULE_THREAD
45#ifdef MODULE_STATIC
46#include "moduleregistry.h"
47#else
48#include <boost/dll/alias.hpp>
49#endif
50#endif
51
52namespace mpi = ::boost::mpi;
53
54namespace vistle {
55
56class StateTracker;
57struct HubData;
58class Module;
59class Renderer;
60
61namespace message {
62class Message;
63class Execute;
64class Buffer;
65class AddParameter;
66class SetParameter;
67class RemoveParameter;
68class MessageQueue;
69} // namespace message
70
71class V_MODULEEXPORT PortTask {
72 friend class Module;
73
74public:
75 explicit PortTask(Module *module);
76 virtual ~PortTask();
77
78 bool hasObject(const Port *p);
79 Object::const_ptr takeObject(const Port *p);
80 template<class Type>
81 typename Type::const_ptr accept(const Port *port);
82 template<class Type>
83 typename Type::const_ptr accept(const std::string &port);
84 template<class Type>
85 typename Type::const_ptr expect(const Port *port);
86 template<class Type>
87 typename Type::const_ptr expect(const std::string &port);
88
89 void addDependency(std::shared_ptr<PortTask> dep);
90 void addObject(Port *port, Object::ptr obj);
91 void addObject(const std::string &port, Object::ptr obj);
92 void passThroughObject(Port *port, Object::const_ptr obj);
93 void passThroughObject(const std::string &port, Object::const_ptr obj);
94
95 void addAllObjects();
96
97 bool isDone();
98 bool dependenciesDone();
99
100 bool wait();
101 bool waitDependencies();
102
103protected:
104 Module *m_module = nullptr;
105 std::map<const Port *, Object::const_ptr> m_input;
106 std::set<Port *> m_ports;
107 std::map<std::string, Port *> m_portsByString;
108 std::set<std::shared_ptr<PortTask>> m_dependencies;
109 std::map<Port *, std::deque<Object::ptr>> m_objects;
110 std::map<Port *, std::deque<bool>> m_passThrough;
111
112 std::mutex m_mutex;
113 std::shared_future<bool> m_future;
114};
115
116class V_MODULEEXPORT Module: public ParameterManager, public MessageSender {
117 friend class Reader;
118 friend class Renderer;
119 friend class PortTask;
120
121public:
122 static bool setup(const std::string &shmname, int moduleID, int rank);
123
124 Module(const std::string &name, const int moduleID, mpi::communicator comm);
125 virtual ~Module();
126 virtual void eventLoop(); // called from MODULE_MAIN
127 void initDone(); // to be called from eventLoop after module ctor has run
128
129 virtual bool dispatch(bool block = true, bool *messageReceived = nullptr);
130
131 Parameter *addParameterGeneric(const std::string &name, std::shared_ptr<Parameter> parameter) override;
132 bool removeParameter(Parameter *param) override;
133
134 const std::string &name() const;
135 const mpi::communicator &comm() const;
136 const mpi::communicator &commShmGroup() const;
137 int rank() const;
138 int size() const;
139 int shmLeader(int rank = -1) const; // return -1 or rank of leader of shm group, if rank is in current shm group
140 int id() const;
141
142 unsigned hardware_concurrency() const;
143
144 ObjectCache::CacheMode setCacheMode(ObjectCache::CacheMode mode, bool update = true);
145 ObjectCache::CacheMode cacheMode() const;
146
147 Port *createInputPort(const std::string &name, const std::string &description = "", const int flags = 0);
148 Port *createOutputPort(const std::string &name, const std::string &description = "", const int flags = 0);
149 bool destroyPort(const std::string &portName);
150 bool destroyPort(const Port *port);
151
152 bool sendObject(const mpi::communicator &comm, vistle::Object::const_ptr object, int destRank) const;
153 bool sendObject(vistle::Object::const_ptr object, int destRank) const;
154 vistle::Object::const_ptr receiveObject(const mpi::communicator &comm, int destRank) const;
155 vistle::Object::const_ptr receiveObject(int destRank) const;
156 bool broadcastObject(const mpi::communicator &comm, vistle::Object::const_ptr &object, int root) const;
157 bool broadcastObject(vistle::Object::const_ptr &object, int root) const;
158 bool broadcastObjectViaShm(vistle::Object::const_ptr &object, const std::string &objName, int root) const;
159
160 bool addObject(Port *port, vistle::Object::ptr object);
161 bool addObject(const std::string &portName, vistle::Object::ptr object);
162 bool passThroughObject(Port *port, vistle::Object::const_ptr object);
163 bool passThroughObject(const std::string &portName, vistle::Object::const_ptr object);
164
165 ObjectList getObjects(const std::string &portName);
166 bool hasObject(const Port *port) const;
167 bool hasObject(const std::string &portName) const;
168 vistle::Object::const_ptr takeFirstObject(Port *port);
169 vistle::Object::const_ptr takeFirstObject(const std::string &portName);
170
171 template<class Type>
172 typename Type::const_ptr accept(Port *port);
173 template<class Type>
174 typename Type::const_ptr accept(const std::string &port);
175 template<class Interface>
176 const Interface *acceptInterface(Port *port);
177 template<class Interface>
178 const Interface *acceptInterface(const std::string &port);
179
180 template<class Type>
181 typename Type::const_ptr expect(Port *port);
182 template<class Type>
183 typename Type::const_ptr expect(const std::string &port);
184
186 void requestPortMapping(unsigned short forwardPort, unsigned short localPort);
188 void removePortMapping(unsigned short forwardPort);
189
190 void sendParameterMessage(const message::Message &message, const buffer *payload) const override;
191 bool sendMessage(const message::Message &message, const buffer *payload = nullptr) const override;
192 bool sendMessage(const message::Message &message, const MessagePayload &payload) const override;
193 template<class Payload>
194 bool sendMessageWithPayload(message::Message &message, Payload &payload) const;
195
197 void sendText(int type, const std::string &msg) const;
198
200 void sendInfo(const char *fmt, ...) const
201#ifdef __GNUC__
202 __attribute__((format(printf, 2, 3)))
203#endif
204 ;
205
207 void sendWarning(const char *fmt, ...) const
208#ifdef __GNUC__
209 __attribute__((format(printf, 2, 3)))
210#endif
211 ;
212
214 void sendError(const char *fmt, ...) const
215#ifdef __GNUC__
216 __attribute__((format(printf, 2, 3)))
217#endif
218 ;
219
221 void sendError(const message::Message &msg, const char *fmt, ...) const
222#ifdef __GNUC__
223 __attribute__((format(printf, 3, 4)))
224#endif
225 ;
226
228 void sendInfo(const std::string &text) const;
230 void sendWarning(const std::string &text) const;
232 void sendError(const std::string &text) const;
234 void sendError(const message::Message &msg, const std::string &text) const;
235
236 int schedulingPolicy() const;
237 void setSchedulingPolicy(int schedulingPolicy /*< really message::SchedulingPolicy::Schedule */);
238
239 int reducePolicy() const;
240 void setReducePolicy(int reduceRequirement /*< really message::ReducePolicy::Reduce */);
241
242 void virtual prepareQuit();
243
244 const HubData &getHub() const;
245
246 bool isConnected(const Port &port) const;
247 bool isConnected(const std::string &portname) const;
248 std::string getModuleName(int id) const;
249 int mirrorId() const;
250 std::set<int> getMirrors() const;
251
252protected:
253 void setObjectReceivePolicy(int pol);
254 int objectReceivePolicy() const;
255 void startIteration(); //< increase iteration counter
256
257 const std::string m_name;
260 const int m_id;
261
262 int m_executionCount, m_iteration;
263 std::set<Port *> m_withOutput;
264
265 void setDefaultCacheMode(ObjectCache::CacheMode mode);
266 void updateMeta(vistle::Object::ptr object) const;
267
270 std::deque<message::Buffer> messageBacklog;
271 virtual bool handleMessage(const message::Message *message, const vistle::MessagePayload &payload);
272 virtual bool handleExecute(const message::Execute *exec);
273 bool cancelRequested(bool collective = false);
274 bool wasCancelRequested() const;
275 virtual void cancelExecuteMessageReceived(const message::Message *msg);
276 virtual bool addInputObject(int sender, const std::string &senderPort, const std::string &portName,
277 Object::const_ptr object);
278 virtual bool
279 objectAdded(int sender, const std::string &senderPort,
280 const Port *port); //< notification when data object has been added - called on each rank individually
281
282 bool syncMessageProcessing() const;
283 void setSyncMessageProcessing(bool sync);
284
285 virtual void connectionAdded(const Port *from, const Port *to);
286 virtual void connectionRemoved(const Port *from, const Port *to);
287
288
289 bool changeParameter(const Parameter *p) override;
290
291 int openmpThreads() const;
292 void setOpenmpThreads(int, bool updateParam = true);
293
294 void enableBenchmark(bool benchmark, bool updateParam = true);
295
296 virtual bool prepare(); //< prepare execution - called on each rank individually
297 virtual bool reduce(int timestep); //< do reduction for timestep (-1: global) - called on all ranks
298 virtual bool cancelExecute(); //< if execution has been canceled early before all objects have been processed
299 int numTimesteps() const;
300
301 void setStatus(const std::string &text, message::UpdateStatus::Importance prio = message::UpdateStatus::Low);
302 void clearStatus();
303
304 bool getNextMessage(message::Buffer &buf, bool block = true);
305
306 bool reduceWrapper(const message::Execute *exec, bool reordered = false);
307 bool prepareWrapper(const message::Execute *exec);
308
309private:
310 std::shared_ptr<StateTracker> m_stateTracker;
311 int m_receivePolicy;
312 int m_schedulingPolicy;
313 int m_reducePolicy;
314
315 bool havePort(const std::string &name); //< check whether a port or parameter already exists
316 Port *findInputPort(const std::string &name);
317 const Port *findInputPort(const std::string &name) const;
318 Port *findOutputPort(const std::string &name);
319 const Port *findOutputPort(const std::string &name) const;
320
321 bool needsSync(const message::Message &m) const;
322
324 virtual bool parameterAdded(const int senderId, const std::string &name, const message::AddParameter &msg,
325 const std::string &moduleName);
327 virtual bool parameterChanged(const int senderId, const std::string &name, const message::SetParameter &msg);
329 virtual bool parameterRemoved(const int senderId, const std::string &name, const message::RemoveParameter &msg);
330
331 virtual bool compute(); //< do processing - called on each rank individually
332 virtual bool compute(std::shared_ptr<PortTask> task) const;
333
334 std::map<std::string, Port> outputPorts;
335 std::map<std::string, Port> inputPorts;
336
337 ObjectCache m_cache;
338 ObjectCache::CacheMode m_defaultCacheMode;
339 bool m_prioritizeVisible;
340 void updateCacheMode();
341 bool m_syncMessageProcessing;
342
343 void updateOutputMode();
344 std::streambuf *m_origStreambuf = nullptr, *m_streambuf = nullptr;
345
346 int m_traceMessages;
347 bool m_benchmark;
348 double m_benchmarkStart;
349 double m_avgComputeTime;
350 mpi::communicator m_comm, m_commShmGroup, m_commShmLeaders;
351 std::vector<int> m_shmLeaders; // leader rank in m_comm of m_commShmGroup for every rank in m_comm
352
353 int m_numTimesteps;
354 bool m_cancelRequested = false, m_cancelExecuteCalled = false, m_executeAfterCancelFound = false;
355 bool m_prepared, m_computed, m_reduced;
356 bool m_readyForQuit;
357
358 IntParameter *m_concurrency = nullptr;
359 void waitAllTasks();
360 std::shared_ptr<PortTask> m_lastTask;
361 std::deque<std::shared_ptr<PortTask>> m_tasks;
362
363 unsigned m_hardware_concurrency = 1;
364};
365
366V_MODULEEXPORT int getTimestep(Object::const_ptr obj);
367V_MODULEEXPORT double getRealTime(Object::const_ptr obj);
368
369template<>
370V_MODULEEXPORT Object::const_ptr Module::expect<Object>(Port *port);
371
372} // namespace vistle
373
374#ifdef MODULE_THREAD
375#ifdef MODULE_STATIC
376#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
377 static std::shared_ptr<vistle::Module> newModuleInstance(const std::string &name, int moduleId, \
378 mpi::communicator comm) \
379 { \
380 vistle::Module::setup("dummy shm", moduleId, comm.rank()); \
381 return std::shared_ptr<X>(new X(name, moduleId, comm)); \
382 } \
383 static vistle::ModuleRegistry::RegisterClass registerModule##X(VISTLE_MODULE_NAME, newModuleInstance);
384#else
385#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
386 static std::shared_ptr<vistle::Module> newModuleInstance(const std::string &name, int moduleId, \
387 mpi::communicator comm) \
388 { \
389 vistle::Module::setup("dummy shm", moduleId, comm.rank()); \
390 return std::shared_ptr<X>(new X(name, moduleId, comm)); \
391 } \
392 BOOST_DLL_ALIAS(newModuleInstance, newModule)
393#endif
394
395#define MODULE_DEBUG(X)
396#else
397// MPI_THREAD_FUNNELED is sufficient, but apparently not provided by the CentOS build of MVAPICH2
398#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
399 int main(int argc, char **argv) \
400 { \
401 if (argc != 4) { \
402 std::cerr << "module requires exactly 4 parameters" << std::endl; \
403 exit(1); \
404 } \
405 int rank = -1, size = -1; \
406 try { \
407 std::string shmname = argv[1]; \
408 const std::string name = argv[2]; \
409 int moduleID = atoi(argv[3]); \
410 mpi::environment mpi_environment(argc, argv, THREAD_MODE, true); \
411 vistle::registerTypes(); \
412 mpi::communicator comm_world; \
413 rank = comm_world.rank(); \
414 size = comm_world.size(); \
415 vistle::Module::setup(shmname, moduleID, rank); \
416 { \
417 X module(name, moduleID, comm_world); \
418 module.eventLoop(); \
419 } \
420 comm_world.barrier(); \
421 } catch (vistle::exception & e) { \
422 std::cerr << "[" << rank << "/" << size << "]: fatal exception: " << e.what() << std::endl; \
423 std::cerr << " info: " << e.info() << std::endl; \
424 std::cerr << e.where() << std::endl; \
425 exit(1); \
426 } catch (std::exception & e) { \
427 std::cerr << "[" << rank << "/" << size << "]: fatal exception: " << e.what() << std::endl; \
428 exit(1); \
429 } \
430 return 0; \
431 }
432
433#ifdef NDEBUG
434#define MODULE_DEBUG(X)
435#else
436#define MODULE_DEBUG(X) \
437 std::cerr << #X << ": PID " << get_process_handle() << std::endl; \
438 std::cerr << " attach debugger within 10 s" << std::endl; \
439 sleep(10); \
440 std::cerr << " continuing..." << std::endl;
441#endif
442#endif
443
444#define MODULE_MAIN(X) MODULE_MAIN_THREAD(X, boost::mpi::threading::funneled)
445
446#include "module_impl.h"
447#endif
Definition: messagesender.h:12
Definition: module.h:116
message::MessageQueue * sendMessageQueue
Definition: module.h:268
int m_executionCount
Definition: module.h:262
const std::string m_name
Definition: module.h:257
const int m_id
Definition: module.h:260
int m_rank
Definition: module.h:258
std::set< Port * > m_withOutput
Definition: module.h:263
const Interface * acceptInterface(Port *port)
std::deque< message::Buffer > messageBacklog
Definition: module.h:270
int m_size
Definition: module.h:259
message::MessageQueue * receiveMessageQueue
Definition: module.h:269
const Interface * acceptInterface(const std::string &port)
std::shared_ptr< const Object > const_ptr
Definition: object.h:68
std::shared_ptr< Object > ptr
Definition: object.h:67
Definition: parametermanager.h:16
Definition: parameter.h:26
Definition: module.h:71
std::mutex m_mutex
Definition: module.h:112
std::map< const Port *, Object::const_ptr > m_input
Definition: module.h:105
std::set< std::shared_ptr< PortTask > > m_dependencies
Definition: module.h:108
std::map< std::string, Port * > m_portsByString
Definition: module.h:107
std::shared_future< bool > m_future
Definition: module.h:113
std::map< Port *, std::deque< Object::ptr > > m_objects
Definition: module.h:109
std::set< Port * > m_ports
Definition: module.h:106
std::map< Port *, std::deque< bool > > m_passThrough
Definition: module.h:110
Definition: port.h:29
base class for Vistle read modules
Definition: reader.h:22
notification that a module has created a parameter
Definition: messages.h:460
Definition: message.h:286
trigger execution of a module function
Definition: messages.h:271
Definition: messagequeue.h:18
Definition: message.h:157
notification that a module has removed a parameter
Definition: messages.h:483
request parameter value update or notify that a parameter value has been changed
Definition: messages.h:501
Definition: shm_reference.h:15
base class for Vistle modules
std::string module(const std::string &prefix)
Definition: directory.cpp:61
Definition: allobjects.cpp:30
std::vector< char, allocator< char > > buffer
Definition: buffer.h:9
std::deque< obj_const_ptr > ObjectList
Definition: port.h:18
V_MODULEEXPORT double getRealTime(Object::const_ptr obj)
Definition: module.cpp:162
V_MODULEEXPORT int getTimestep(Object::const_ptr obj)
Definition: module.cpp:145
Definition: statetracker.h:80