View on GitHub

Vistle

Distributed Data-parallel Scientific Visualization in VR

message.h
Go to the documentation of this file.
1#ifndef MESSAGE_H
2#define MESSAGE_H
3
4#include <array>
5#include <cassert>
6
7#include <vistle/util/enum.h>
8#include <vistle/util/directory.h> // ModuleNameLength
11#include "uuid.h"
12#include "export.h"
13#include "shmname.h"
14
15#pragma pack(push)
16#pragma pack(1)
17
18namespace vistle {
19
20namespace message {
21
23 (CompressionNone)(CompressionLz4)(CompressionZstd)(CompressionSnappy))
24
25// clang-format off
27 Type,
28 (INVALID) // keep 1st
29 (ANY) //< for Trace: enables tracing of all message types -- keep 2nd
30 (IDENTIFY)
31 (CLOSECONNECTION)
32 (ADDHUB)
33 (REMOVEHUB)
34 (SETID)
35 (TRACE)
36 (SPAWN)
37 (SPAWNPREPARED)
38 (KILL)
39 (DEBUG)
40 (QUIT)
41 (STARTED)
42 (MODULEEXIT)
43 (BUSY)
44 (IDLE)
45 (EXECUTIONPROGRESS)
46 (EXECUTE)
47 (CANCELEXECUTE)
48 (ADDOBJECT)
49 (ADDOBJECTCOMPLETED)
50 (DATATRANSFERSTATE)
51 (ADDPORT)
52 (REMOVEPORT)
53 (CONNECT)
54 (DISCONNECT)
55 (ADDPARAMETER)
56 (REMOVEPARAMETER)
57 (SETPARAMETER)
58 (SETPARAMETERCHOICES)
59 (PING)
60 (PONG)
61 (BARRIER)
62 (BARRIERREACHED)
63 (SENDTEXT)
64 (UPDATESTATUS)
65 (OBJECTRECEIVEPOLICY)
66 (SCHEDULINGPOLICY)
67 (REDUCEPOLICY)
68 (MODULEAVAILABLE)
69 (CREATEMODULECOMPOUND)
70 (LOCKUI)
71 (REPLAYFINISHED)
72 (REQUESTTUNNEL)
73 (REQUESTOBJECT)
74 (SENDOBJECT)
75 (REMOTERENDERING)
76 (FILEQUERY)
77 (FILEQUERYRESULT)
78 (COVER)
79 (INSITU)
80 (NumMessageTypes) // keep last
81)
83// clang-format on
84
85struct V_COREEXPORT Id {
86 enum Reserved {
87 ModuleBase = 1, //< >= ModuleBase: modules
88 Invalid = 0,
89 Vistle = -1, //< session parameters
90 Broadcast = -2, //< master is broadcasting to all modules/hubs
91 ForBroadcast = -3, //< to master for broadcasting
92 NextHop = -4,
93 UI = -5,
94 LocalManager = -6,
95 LocalHub = -7,
96 MasterHub = -8, //< < MasterHub: slave hubs
97 };
98
99 static bool isHub(int id);
100 static bool isModule(int id);
101 static std::string toString(Reserved id);
102 static std::string toString(int id);
103};
104
106public:
108 static void init(int id, int rank);
109 static const DefaultSender &instance();
110 static int id();
111 static int rank();
112
113private:
114 int m_id;
115 int m_rank;
116 static DefaultSender s_instance;
117};
118
120public:
121 MessageFactory(int id = Id::Invalid, int rank = -1);
122
123 int id() const;
124 void setId(int id);
125 int rank() const;
126 void setRank(int rank);
127
128 template<class M, class... P>
129 M message(P &&...p) const
130 {
131 M m(std::forward<P>(p)...);
132 m.setSenderId(m_id);
133 m.setRank(m_rank);
134 return m;
135 }
136
137private:
138 int m_id = Id::Invalid;
139 int m_rank = -1;
140};
141
142const int ModuleNameLength = 50;
143
144typedef std::array<char, ModuleNameLength> module_name_t;
145typedef std::array<char, 32> port_name_t;
146typedef std::array<char, 32> param_name_t;
147typedef std::array<char, 256> param_value_t;
148typedef std::array<char, 50> param_choice_t;
149typedef std::array<char, 300> shmsegname_t;
150typedef std::array<char, 350> description_t;
151typedef std::array<char, 200> address_t;
152typedef std::array<char, 500> path_t;
153
154typedef boost::uuids::uuid uuid_t;
155
156
158 // this is POD
159
160public:
161 static const size_t MESSAGE_SIZE = 1024; // fixed message size is imposed by boost::interprocess::message_queue
162
163 Message(const Type type, const unsigned int size);
164 // Message (or its subclasses) may not require destructors
165
167 unsigned long typeFlags() const;
168
170 void setUuid(const uuid_t &uuid);
172 const uuid_t &uuid() const;
174 void setReferrer(const uuid_t &ref);
176 const uuid_t &referrer() const;
178 Type type() const;
180 int senderId() const;
182 void setSenderId(int id);
184 int rank() const;
186 void setRank(int rank);
188 int uiId() const;
190 size_t size() const;
192 bool isForBroadcast() const;
194 void setForBroadcast(bool enable = true);
196 bool wasBroadcast() const;
198 void setWasBroadcast(bool enable = true);
200 bool isNotification() const;
202 void setNotify(bool enable);
203
205 int destId() const;
207 void setDestId(int id);
208
210 int destRank() const;
212 void setDestRank(int r);
214 int destUiId() const;
216 void setDestUiId(int id);
218 size_t payloadSize() const;
220 void setPayloadSize(size_t size);
222 std::string payloadName() const;
224 void setPayloadName(const shm_name_t &name);
226 CompressionMode payloadCompression() const;
228 void setPayloadCompression(CompressionMode mode);
230 size_t payloadRawSize() const;
232 void setPayloadRawSize(size_t size);
233
234 template<class SomeMessage>
235 SomeMessage &as()
236 {
237 SomeMessage *m = static_cast<SomeMessage *>(this);
238 assert(m->type() == SomeMessage::s_type);
239 return *m;
240 }
241 template<class SomeMessage>
242 SomeMessage const &as() const
243 {
244 const SomeMessage *m = static_cast<const SomeMessage *>(this);
245 assert(m->type() == SomeMessage::s_type);
246 return *m;
247 }
248
249private:
251 Type m_type;
253 unsigned int m_size;
255 int m_senderId;
257 int m_rank;
259 int m_destId;
261 int m_destRank;
263 uuid_t m_uuid;
265 uuid_t m_referrer;
266
267protected:
277 bool m_forBroadcast, m_wasBroadcast;
281 char m_pad[1] = {};
282};
283// ensure alignment
284static_assert(sizeof(Message) % sizeof(double) == 0, "not padded to ensure double alignment");
285
287public:
288 Buffer(): Message(ANY, Message::MESSAGE_SIZE) { memset(payload.data(), 0, payload.size()); }
289 Buffer(const Message &message): Message(message)
290 {
291 memset(payload.data(), 0, payload.size());
292 memcpy(payload.data(), (char *)&message + sizeof(Message), message.size() - sizeof(Message));
293 }
294 const Buffer &operator=(const Buffer &rhs)
295 {
296 *static_cast<Message *>(this) = rhs;
297 memcpy(payload.data(), rhs.payload.data(), payload.size());
298 return *this;
299 }
300
301 template<class SomeMessage>
302 SomeMessage &as()
303 {
304 SomeMessage *m = static_cast<SomeMessage *>(static_cast<Message *>(this));
305 assert(m->type() == SomeMessage::s_type);
306 return *m;
307 }
308 template<class SomeMessage>
309 SomeMessage const &as() const
310 {
311 const SomeMessage *m = static_cast<const SomeMessage *>(static_cast<const Message *>(this));
312 assert(m->type() == SomeMessage::s_type);
313 return *m;
314 }
315
316 size_t bufferSize() const { return Message::MESSAGE_SIZE; }
317 size_t size() const { return Message::size(); }
318 char *data() { return static_cast<char *>(static_cast<void *>(this)); }
319 const char *data() const { return static_cast<const char *>(static_cast<const void *>(this)); }
320
321private:
322 std::array<char, Message::MESSAGE_SIZE - sizeof(Message)> payload;
323};
324static_assert(sizeof(Buffer) == Message::MESSAGE_SIZE, "message too large");
325
326template<class MessageClass, Type MessageType>
327class MessageBase: public Message {
328public:
329 static const Type s_type = MessageType;
330
331protected:
332 MessageBase(): Message(MessageType, sizeof(MessageClass))
333 {
334 static_assert(sizeof(MessageClass) <= Message::MESSAGE_SIZE, "message too large");
335 }
336};
337
338V_COREEXPORT buffer compressPayload(vistle::message::CompressionMode &mode, const char *raw, size_t size,
339 int speed = -1 /* algorithm default */);
340V_COREEXPORT buffer compressPayload(vistle::message::CompressionMode &mode, const buffer &raw,
341 int speed = -1 /* algorithm default */);
342V_COREEXPORT buffer compressPayload(vistle::message::CompressionMode mode, Message &msg, buffer &raw,
343 int speed = -1 /* algorithm default */);
344V_COREEXPORT buffer decompressPayload(CompressionMode mode, size_t size, size_t rawsize, const char *compressed);
345V_COREEXPORT buffer decompressPayload(vistle::message::CompressionMode mode, size_t size, size_t rawsize,
346 buffer &compressed);
347V_COREEXPORT buffer decompressPayload(const Message &msg, buffer &compressed);
348
349V_COREEXPORT std::ostream &operator<<(std::ostream &s, const Message &msg);
350
352public:
353 codec_error(const std::string &what = "unsupported codec");
354};
355
356} // namespace message
357} // namespace vistle
358
359#pragma pack(pop)
360#endif
Definition: exception.h:13
Definition: message.h:286
size_t size() const
Definition: message.h:317
char * data()
Definition: message.h:318
const Buffer & operator=(const Buffer &rhs)
Definition: message.h:294
Buffer(const Message &message)
Definition: message.h:289
Buffer()
Definition: message.h:288
SomeMessage & as()
Definition: message.h:302
SomeMessage const & as() const
Definition: message.h:309
const char * data() const
Definition: message.h:319
size_t bufferSize() const
Definition: message.h:316
Definition: message.h:105
Definition: message.h:327
MessageBase()
Definition: message.h:332
static const Type s_type
Definition: message.h:329
Definition: message.h:119
M message(P &&...p) const
Definition: message.h:129
Definition: message.h:157
static const size_t MESSAGE_SIZE
Definition: message.h:161
uint64_t m_payloadRawSize
raw (uncompressed) payload size
Definition: message.h:271
bool m_forBroadcast
broadcast to all ranks?
Definition: message.h:277
uint64_t m_payloadSize
payload size
Definition: message.h:269
int m_payloadCompression
payload compression method
Definition: message.h:273
SomeMessage & as()
Definition: message.h:235
shm_name_t m_payloadName
name of payload in shared memory
Definition: message.h:275
bool m_notification
message is not a request for action
Definition: message.h:279
SomeMessage const & as() const
Definition: message.h:242
size_t size() const
message size
Definition: message.cpp:251
Definition: message.h:351
#define V_COREEXPORT
Definition: export.h:9
#define V_ENUM_OUTPUT_OP(name, scope)
Definition: enum.h:72
Definition: message.cpp:29
std::array< char, 350 > description_t
Definition: message.h:150
std::array< char, 500 > path_t
Definition: message.h:152
const int ModuleNameLength
Definition: message.h:142
INVALID(ANY)(IDENTIFY)(CLOSECONNECTION)(ADDHUB)(REMOVEHUB)(SETID)(TRACE)(SPAWN)(SPAWNPREPARED)(KILL)(DEBUG)(QUIT)(STARTED)(MODULEEXIT)(BUSY)(IDLE)(EXECUTIONPROGRESS)(EXECUTE)(CANCELEXECUTE)(ADDOBJECT)(ADDOBJECTCOMPLETED)(DATATRANSFERSTATE)(ADDPORT)(REMOVEPORT)(CONNECT)(DISCONNECT)(ADDPARAMETER)(REMOVEPARAMETER)(SETPARAMETER)(SETPARAMETERCHOICES)(PING)(PONG)(BARRIER)(BARRIERREACHED)(SENDTEXT)(UPDATESTATUS)(OBJECTRECEIVEPOLICY)(SCHEDULINGPOLICY)(REDUCEPOLICY)(MODULEAVAILABLE)(CREATEMODULECOMPOUND)(LOCKUI)(REPLAYFINISHED)(REQUESTTUNNEL)(REQUESTOBJECT)(SENDOBJECT)(REMOTERENDERING)(FILEQUERY)(FILEQUERYRESULT)(COVER)(INSITU)(NumMessageTypes)) struct V_COREEXPORT Id
Definition: message.h:29
std::array< char, 200 > address_t
Definition: message.h:151
@ Broadcast
Definition: messagerouter.h:36
V_COREEXPORT std::ostream & operator<<(std::ostream &s, const Message &msg)
Definition: messages.cpp:1718
buffer compressPayload(CompressionMode &mode, const buffer &raw, int speed)
Definition: message.cpp:326
boost::uuids::uuid uuid_t
Definition: message.h:154
std::array< char, 32 > param_name_t
Definition: message.h:146
std::array< char, 256 > param_value_t
Definition: message.h:147
std::array< char, ModuleNameLength > module_name_t
Definition: message.h:144
std::array< char, 300 > shmsegname_t
Definition: message.h:149
std::array< char, 32 > port_name_t
Definition: message.h:145
DEFINE_ENUM_WITH_STRING_CONVERSIONS(CompressionMode,(CompressionNone)(CompressionLz4)(CompressionZstd)(CompressionSnappy)) DEFINE_ENUM_WITH_STRING_CONVERSIONS(Type
std::array< char, 50 > param_choice_t
Definition: message.h:148
buffer decompressPayload(CompressionMode mode, size_t size, size_t rawsize, buffer &compressed)
Definition: message.cpp:409
Definition: allobjects.cpp:30
std::vector< char, allocator< char > > buffer
Definition: buffer.h:9
Definition: shmname.h:11