-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.cpp
More file actions
156 lines (122 loc) · 4.28 KB
/
Copy pathmain.cpp
File metadata and controls
156 lines (122 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// reflector-cpp-MQTT--asio-client : bridges an MQTT message environment with a TCP (asio) reflector server.
//
#include <cassert>
#include <signal.h>
#include <string>
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#endif
#include <chrono>
using namespace std::chrono_literals;
#include <iostream>
#include <thread>
#include <vector>
#ifdef _WIN32
#pragma comment(lib, "wininet.lib")
#endif
#include "msg_cache-dedup.h"
#include "IPSME/IPSME_Bridge.hpp"
#include "cpp-EventLog.git/InMemory_EventLog.h"
#if defined(ROLE_SERVER)
#include "cpp-asio.git/asio_server.h"
using transport_t = asio_server;
#else
#include "cpp-asio.git/asio_client.h"
using transport_t = asio_client;
#endif
// the asio TCP transport; created in main(), reached by the MQTT-side callback below
transport_t* g_ptr_asio = nullptr;
duplicate g_duplicate;
#ifndef BUILD_NAME
#define BUILD_NAME "reflector"
#endif
#ifndef BUILD_TRANSPORT
#define BUILD_TRANSPORT "transport"
#endif
#ifndef BUILD_MSGENV
#define BUILD_MSGENV "MsgEnv"
#endif
static constexpr const char* kpsz_PARTICIPANT_ = BUILD_NAME; // == build NAME (-DNAME)
static const std::string ks_INSTANCE_ = JSON::gen_uuid(); // fresh random GUID per run
// reflector server we bridge MQTT <-> TCP with
static constexpr const char* kpsz_REFLECTOR_ADDRESS = "127.0.0.1";
static constexpr unsigned short kus_REFLECTOR_PORT = 4999;
//----------------------------------------------------------------------------------------------------------------
class App : public Interface_App {
public:
App(const JSON::JSON_Msg::Referer& referer) : Interface_App(referer) {}
//-------------
// transport -> MsgEnv : a complete message arrived from the peer; dedup, publish
void on_transport_read(std::string str_msg)
{
auto bridge = IPSME_Bridge::get_instance();
if (true == g_duplicate.exists(str_msg)) {
std::cerr << BUILD_TRANSPORT " ->| *DUP -- [" << str_msg << "]" << std::endl;
return;
}
std::cerr << BUILD_TRANSPORT " -> " BUILD_MSGENV " -- [" << str_msg << "]" << std::endl;
bridge->get_IPSME()->publish(str_msg.c_str());
}
//-------------
// MsgEnv -> transport : hand the message to the transport (it frames it)
bool on_MsgEnv_msg(const char* psz_msg, std::string str_msg)
{
std::cerr << BUILD_MSGENV " -> " BUILD_TRANSPORT " -- [" << str_msg << "]" << std::endl;
g_duplicate.cache(str_msg, t_entry_context(30s));
if (g_ptr_asio)
g_ptr_asio->write(str_msg);
return true;
}
};
//----------------------------------------------------------------------------------------------------------------
// mark main()
bool gb_quit_ = false;
void handler_sigint_(int s)
{
printf("\nCaught SIG[%d]\n", s);
// if the user presses ^C twice, then just exit.
if (gb_quit_)
exit(s);
gb_quit_ = true;
}
int main()
{
#ifdef POSIX_SIGNAL
struct sigaction sa;
sa.sa_handler = handler_sigint_;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL); // so `docker stop` shuts the server down cleanly
#else
//signal(SIGINT, handler_sigint_);
#endif
try {
std::cerr << __func__ << ": " << "Connecting ..." << std::endl;
std::unique_ptr<App> uptr_app = std::make_unique<App>(JSON::JSON_Msg::Referer(kpsz_PARTICIPANT_, ks_INSTANCE_));
std::unique_ptr<InMemoryMsg> _uptr_eventLog = std::make_unique<InMemoryMsg>();
auto bridge= IPSME_Bridge::get_instance(uptr_app.get(), _uptr_eventLog.get());
// TCP side
#if defined(ROLE_SERVER)
transport_t transport(kus_REFLECTOR_PORT,
[app = uptr_app.get()](std::string m){ app->on_transport_read(std::move(m)); }); // listen + accept
g_ptr_asio = &transport;
transport.start();
std::cout << __func__ << ": " << kpsz_PARTICIPANT_ << " listening on [" << kus_REFLECTOR_PORT << "]" << std::endl;
std::cerr << __func__ << ": " << "Running ..." << std::endl;
#else
// client: start with no connections (configured later)
#endif
while (!gb_quit_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep for 100 milliseconds
bridge->process_msgs();
}
}
catch (...) {
std::cout << "main(): unknown exception" << std::endl;
}
printf("Exit!\n");
return 0;
}