Merge lp:~ansharyan015/drizzle/rabbitmq_dynamic into lp:drizzle

Proposed by Daniel Nichter
Status: Merged
Approved by: Brian Aker
Approved revision: 2571
Merged at revision: 2582
Proposed branch: lp:~ansharyan015/drizzle/rabbitmq_dynamic
Merge into: lp:drizzle
Diff against target: 490 lines (+267/-46)
4 files modified
plugin/rabbitmq/rabbitmq_handler.cc (+39/-22)
plugin/rabbitmq/rabbitmq_handler.h (+5/-4)
plugin/rabbitmq/rabbitmq_log.cc (+219/-18)
plugin/rabbitmq/rabbitmq_log.h (+4/-2)
To merge this branch: bzr merge lp:~ansharyan015/drizzle/rabbitmq_dynamic
Reviewer Review Type Date Requested Status
Anshu Kumar (community) Approve
Daniel Nichter (community) code review Approve
Drizzle Merge Team Pending
Review via email: mp+116396@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Daniel Nichter (daniel-nichter) :
review: Approve (code review)
Revision history for this message
Anshu Kumar (ansharyan015) :
review: Approve
Revision history for this message
Anshu Kumar (ansharyan015) wrote :

For assistance, this is demonstrated @ http://anshsays.wordpress.com/2012/08/07/dynamicrabbitmq/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plugin/rabbitmq/rabbitmq_handler.cc'
2--- plugin/rabbitmq/rabbitmq_handler.cc 2011-04-01 19:31:01 +0000
3+++ plugin/rabbitmq/rabbitmq_handler.cc 2012-08-07 10:29:21 +0000
4@@ -30,8 +30,10 @@
5
6 using namespace std;
7
8-namespace drizzle_plugin
9-{
10+namespace drizzle_plugin {
11+namespace rabbitmq {
12+
13+extern bool sysvar_logging_enable;
14
15 RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
16 const in_port_t rabbitMQPort,
17@@ -48,7 +50,8 @@
18 password(rabbitMQPassword),
19 virtualhost(rabbitMQVirtualhost),
20 exchange(rabbitMQExchange),
21- routingKey(rabbitMQRoutingKey)
22+ routingKey(rabbitMQRoutingKey),
23+ rabbitmq_connection_established(false)
24 {
25 pthread_mutex_init(&publishLock, NULL);
26 connect();
27@@ -64,6 +67,10 @@
28 const int length)
29 throw(rabbitmq_handler_exception)
30 {
31+ // return if query logging is not enabled
32+ //if (sysvar_logging_enable == false)
33+ // return;
34+
35 pthread_mutex_lock(&publishLock);
36 amqp_bytes_t b;
37 b.bytes= message;
38@@ -112,25 +119,34 @@
39 sockfd = amqp_open_socket(hostname.c_str(), port);
40 if(sockfd < 0)
41 {
42- throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
43- }
44- amqp_set_sockfd(rabbitmqConnection, sockfd);
45- /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
46- handleAMQPError(amqp_login(rabbitmqConnection,
47- virtualhost.c_str(),
48- 0,
49- 131072,
50- 0,
51- AMQP_SASL_METHOD_PLAIN,
52- username.c_str(),
53- password.c_str()),
54- "rabbitmq login");
55- /* open the channel */
56- amqp_channel_open(rabbitmqConnection, 1);
57- handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
58- amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
59- amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
60- handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
61+ rabbitmq_connection_established= false;
62+ return;
63+ }
64+ try
65+ {
66+ amqp_set_sockfd(rabbitmqConnection, sockfd);
67+ /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
68+ handleAMQPError(amqp_login(rabbitmqConnection,
69+ virtualhost.c_str(),
70+ 0,
71+ 131072,
72+ 0,
73+ AMQP_SASL_METHOD_PLAIN,
74+ username.c_str(),
75+ password.c_str()),
76+ "rabbitmq login");
77+ /* open the channel */
78+ amqp_channel_open(rabbitmqConnection, 1);
79+ handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
80+ amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
81+ amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
82+ handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
83+ rabbitmq_connection_established= true;
84+ }
85+ catch(exception& e)
86+ {
87+ rabbitmq_connection_established= false;
88+ }
89 }
90
91 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
92@@ -162,4 +178,5 @@
93 }
94 }
95
96+} /* namespace rabbitmq */
97 } /* namespace drizzle_plugin */
98
99=== modified file 'plugin/rabbitmq/rabbitmq_handler.h'
100--- plugin/rabbitmq/rabbitmq_handler.h 2011-03-31 13:02:08 +0000
101+++ plugin/rabbitmq/rabbitmq_handler.h 2012-08-07 10:29:21 +0000
102@@ -30,8 +30,8 @@
103 #include <amqp_framing.h>
104 #include <netinet/in.h>
105
106-namespace drizzle_plugin
107-{
108+namespace drizzle_plugin {
109+namespace rabbitmq {
110
111 /**
112 * exception thrown by the rabbitmq handler
113@@ -70,6 +70,7 @@
114 const std::string &routingKey;
115 pthread_mutex_t publishLock;
116 public:
117+ bool rabbitmq_connection_established;
118 /**
119 * @brief
120 * Constructs a new RabbitMQHandler, purpose is to
121@@ -114,6 +115,7 @@
122
123 void reconnect() throw(rabbitmq_handler_exception);
124 void disconnect() throw(rabbitmq_handler_exception);
125+ void connect() throw(rabbitmq_handler_exception);
126
127 private:
128 /**
129@@ -130,9 +132,8 @@
130 */
131 void handleAMQPError(amqp_rpc_reply_t x, std::string context) throw(rabbitmq_handler_exception);
132
133- void connect() throw(rabbitmq_handler_exception);
134-
135 };
136
137+} /* namespace rabbitmq */
138 } /* namespace drizzle_plugin */
139
140
141=== modified file 'plugin/rabbitmq/rabbitmq_log.cc'
142--- plugin/rabbitmq/rabbitmq_log.cc 2012-04-24 08:42:13 +0000
143+++ plugin/rabbitmq/rabbitmq_log.cc 2012-08-07 10:29:21 +0000
144@@ -33,6 +33,7 @@
145 #include "rabbitmq_handler.h"
146 #include <boost/program_options.hpp>
147 #include <drizzled/module/option_map.h>
148+#include <drizzled/item.h>
149
150 namespace po= boost::program_options;
151
152@@ -40,14 +41,28 @@
153 using namespace drizzled;
154 using namespace google;
155
156-namespace drizzle_plugin
157-{
158+namespace drizzle_plugin {
159+namespace rabbitmq {
160
161 /**
162 * rabbitmq port
163 */
164 static port_constraint sysvar_rabbitmq_port;
165-
166+bool sysvar_logging_enable= true;
167+string sysvar_rabbitmq_host;
168+string sysvar_rabbitmq_username;
169+string sysvar_rabbitmq_password;
170+string sysvar_rabbitmq_virtualhost;
171+string sysvar_rabbitmq_exchange;
172+string sysvar_rabbitmq_routingkey;
173+void updateSysvarLoggingEnable(Session *, sql_var_t);
174+bool updateSysvarRabbitMQHost(Session *, set_var *var);
175+int updateSysvarRabbitMQPort(Session *, set_var *var);
176+bool updateSysvarRabbitMQUserName(Session *, set_var *var);
177+bool updateSysvarRabbitMQPassword(Session *, set_var *var);
178+bool updateSysvarRabbitMQVirtualHost(Session *, set_var *var);
179+bool updateSysvarRabbitMQExchange(Session *, set_var *var);
180+bool updateSysvarRabbitMQRoutingKey(Session *, set_var *var);
181
182 RabbitMQLog::RabbitMQLog(const string &name,
183 RabbitMQHandler* mqHandler) :
184@@ -64,6 +79,9 @@
185 plugin::ReplicationReturnCode
186 RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
187 {
188+ if(not sysvar_logging_enable)
189+ return plugin::SUCCESS;
190+
191 size_t message_byte_length= to_apply.ByteSize();
192 uint8_t* buffer= new uint8_t[message_byte_length];
193 if(buffer == NULL)
194@@ -102,10 +120,184 @@
195 return plugin::UNKNOWN_ERROR;
196 }
197
198+void RabbitMQLog::setRabbitMQHandler(RabbitMQHandler* new_rabbitMQHandler)
199+{
200+ _rabbitMQHandler= new_rabbitMQHandler;
201+}
202+
203 static RabbitMQLog *rabbitmqLogger; ///< the actual plugin
204 static RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
205
206
207+void updateSysvarLoggingEnable(Session *, sql_var_t)
208+{
209+ if(not sysvar_logging_enable)
210+ {
211+ sysvar_logging_enable = false;
212+ delete rabbitmqHandler;
213+ }
214+ else
215+ {
216+ rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host,
217+ sysvar_rabbitmq_port,
218+ sysvar_rabbitmq_username,
219+ sysvar_rabbitmq_password,
220+ sysvar_rabbitmq_virtualhost,
221+ sysvar_rabbitmq_exchange,
222+ sysvar_rabbitmq_routingkey);
223+ if(rabbitmqHandler->rabbitmq_connection_established)
224+ {
225+ rabbitmqLogger->setRabbitMQHandler(rabbitmqHandler);
226+ sysvar_logging_enable= true;
227+ }
228+ else
229+ {
230+ errmsg_printf(error::ERROR, _("Could not open socket, is rabbitmq running?"));
231+ sysvar_logging_enable= false;
232+ }
233+ }
234+}
235+
236+bool updateSysvarRabbitMQHost(Session *, set_var *var)
237+{
238+ if(sysvar_logging_enable)
239+ {
240+ errmsg_printf(error::ERROR, _("Value of rabbitmq_host cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
241+ return true; // error
242+ }
243+ if (not var->value->str_value.empty())
244+ {
245+ sysvar_rabbitmq_host = var->value->str_value.data();
246+ return false;
247+ }
248+ else
249+ {
250+ errmsg_printf(error::ERROR, _("rabbitmq_host cannot be NULL"));
251+ return true; // error
252+ }
253+ return true; // error
254+}
255+
256+int updateSysvarRabbitMQPort(Session *, set_var *var)
257+{
258+ if(sysvar_logging_enable)
259+ {
260+ errmsg_printf(error::ERROR, _("Value of rabbitmq_port cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
261+ return 1; // error
262+ }
263+ if (var->value->val_int())
264+ {
265+ sysvar_rabbitmq_port = var->value->val_int();
266+ return 0;
267+ }
268+ else
269+ {
270+ errmsg_printf(error::ERROR, _("rabbitmq_port cannot be NULL"));
271+ return 1; // error
272+ }
273+ return 1; // error
274+}
275+
276+bool updateSysvarRabbitMQUserName(Session *, set_var *var)
277+{
278+ if(sysvar_logging_enable)
279+ {
280+ errmsg_printf(error::ERROR, _("Value of rabbitmq_username cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
281+ return true; // error
282+ }
283+ if (not var->value->str_value.empty())
284+ {
285+ sysvar_rabbitmq_username = var->value->str_value.data();
286+ return false;
287+ }
288+ else
289+ {
290+ errmsg_printf(error::ERROR, _("rabbitmq_username cannot be NULL"));
291+ return true; // error
292+ }
293+ return true; // error
294+}
295+
296+bool updateSysvarRabbitMQPassword(Session *, set_var *var)
297+{
298+ if(sysvar_logging_enable)
299+ {
300+ errmsg_printf(error::ERROR, _("Value of rabbitmq_password cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
301+ return true; // error
302+ }
303+ if (not var->value->str_value.empty())
304+ {
305+ sysvar_rabbitmq_password = var->value->str_value.data();
306+ return false;
307+ }
308+ else
309+ {
310+ errmsg_printf(error::ERROR, _("rabbitmq_password cannot be NULL"));
311+ return true; // error
312+ }
313+ return true; // error
314+}
315+
316+bool updateSysvarRabbitMQVirtualHost(Session *, set_var *var)
317+{
318+ if(sysvar_logging_enable)
319+ {
320+ errmsg_printf(error::ERROR, _("Value of rabbitmq_virtualhost cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
321+ return true; // error
322+ }
323+ if (not var->value->str_value.empty())
324+ {
325+ sysvar_rabbitmq_virtualhost = var->value->str_value.data();
326+ return false;
327+ }
328+ else
329+ {
330+ errmsg_printf(error::ERROR, _("rabbitmq_virtualhost cannot be NULL"));
331+ return true; // error
332+ }
333+ return true; // error
334+}
335+
336+bool updateSysvarRabbitMQExchange(Session *, set_var *var)
337+{
338+ if(sysvar_logging_enable)
339+ {
340+ errmsg_printf(error::ERROR, _("Value of rabbitmq_exchange cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
341+ return true; // error
342+ }
343+ if (not var->value->str_value.empty())
344+ {
345+ sysvar_rabbitmq_exchange = var->value->str_value.data();
346+ return false;
347+ }
348+ else
349+ {
350+ errmsg_printf(error::ERROR, _("rabbitmq_exchange cannot be NULL"));
351+ return true; // error
352+ }
353+ return true; // error
354+}
355+
356+bool updateSysvarRabbitMQRoutingKey(Session *, set_var *var)
357+{
358+ if(sysvar_logging_enable)
359+ {
360+ errmsg_printf(error::ERROR, _("Value of rabbitmq_routingkey cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
361+ return true; // error
362+ }
363+ if (not var->value->str_value.empty())
364+ {
365+ sysvar_rabbitmq_routingkey = var->value->str_value.data();
366+ return false;
367+ }
368+ else
369+ {
370+ errmsg_printf(error::ERROR, _("rabbitmq_routingkey cannot be NULL"));
371+ return true; // error
372+ }
373+ return true; // error
374+}
375+
376 /**
377 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
378 * and creates the log handler with the dependency - makes it easier to swap out
379@@ -124,6 +316,10 @@
380 vm["virtualhost"].as<string>(),
381 vm["exchange"].as<string>(),
382 vm["routingkey"].as<string>());
383+ if(not rabbitmqHandler->rabbitmq_connection_established)
384+ {
385+ throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
386+ }
387 }
388 catch (exception& e)
389 {
390@@ -145,13 +341,14 @@
391 context.add(rabbitmqLogger);
392 ReplicationServices::attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
393
394- context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
395- context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
396- context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
397- context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
398- context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
399- context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
400- context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
401+ context.registerVariable(new sys_var_bool_ptr("logging_enable", &sysvar_logging_enable, &updateSysvarLoggingEnable));
402+ context.registerVariable(new sys_var_std_string("host", sysvar_rabbitmq_host, NULL, &updateSysvarRabbitMQHost));
403+ context.registerVariable(new sys_var_constrained_value<in_port_t>("port", sysvar_rabbitmq_port, &updateSysvarRabbitMQPort));
404+ context.registerVariable(new sys_var_std_string("username", sysvar_rabbitmq_username, NULL, &updateSysvarRabbitMQUserName));
405+ context.registerVariable(new sys_var_std_string("password", sysvar_rabbitmq_password, NULL, &updateSysvarRabbitMQPassword));
406+ context.registerVariable(new sys_var_std_string("virtualhost", sysvar_rabbitmq_virtualhost, NULL, &updateSysvarRabbitMQVirtualHost));
407+ context.registerVariable(new sys_var_std_string("exchange", sysvar_rabbitmq_exchange, NULL, &updateSysvarRabbitMQExchange));
408+ context.registerVariable(new sys_var_std_string("routingkey", sysvar_rabbitmq_routingkey, NULL, &updateSysvarRabbitMQRoutingKey));
409
410 return 0;
411 }
412@@ -159,32 +356,36 @@
413
414 static void init_options(drizzled::module::option_context &context)
415 {
416+ context("logging-enable",
417+ po::value<bool>(&sysvar_logging_enable)->default_value(true)->zero_tokens(),
418+ _("Enable logging to rabbitmq server"));
419 context("host",
420- po::value<string>()->default_value("localhost"),
421+ po::value<string>(&sysvar_rabbitmq_host)->default_value("localhost"),
422 _("Host name to connect to"));
423 context("port",
424 po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
425 _("Port to connect to"));
426 context("virtualhost",
427- po::value<string>()->default_value("/"),
428+ po::value<string>(&sysvar_rabbitmq_virtualhost)->default_value("/"),
429 _("RabbitMQ virtualhost"));
430 context("username",
431- po::value<string>()->default_value("guest"),
432+ po::value<string>(&sysvar_rabbitmq_username)->default_value("guest"),
433 _("RabbitMQ username"));
434 context("password",
435- po::value<string>()->default_value("guest"),
436+ po::value<string>(&sysvar_rabbitmq_password)->default_value("guest"),
437 _("RabbitMQ password"));
438 context("use-replicator",
439 po::value<string>()->default_value("default_replicator"),
440 _("Name of the replicator plugin to use (default='default_replicator')"));
441 context("exchange",
442- po::value<string>()->default_value("ReplicationExchange"),
443+ po::value<string>(&sysvar_rabbitmq_exchange)->default_value("ReplicationExchange"),
444 _("Name of RabbitMQ exchange to publish to"));
445 context("routingkey",
446- po::value<string>()->default_value("ReplicationRoutingKey"),
447+ po::value<string>(&sysvar_rabbitmq_routingkey)->default_value("ReplicationRoutingKey"),
448 _("Name of RabbitMQ routing key to use"));
449 }
450
451+} /* namespace rabbitmq */
452 } /* namespace drizzle_plugin */
453
454 DRIZZLE_DECLARE_PLUGIN
455@@ -195,8 +396,8 @@
456 "Marcus Eriksson",
457 N_("Publishes transactions to RabbitMQ"),
458 PLUGIN_LICENSE_GPL,
459- drizzle_plugin::init,
460+ drizzle_plugin::rabbitmq::init,
461 NULL,
462- drizzle_plugin::init_options
463+ drizzle_plugin::rabbitmq::init_options
464 }
465 DRIZZLE_DECLARE_PLUGIN_END;
466
467=== modified file 'plugin/rabbitmq/rabbitmq_log.h'
468--- plugin/rabbitmq/rabbitmq_log.h 2011-03-30 19:50:29 +0000
469+++ plugin/rabbitmq/rabbitmq_log.h 2012-08-07 10:29:21 +0000
470@@ -30,8 +30,8 @@
471 #include <string>
472 #include "rabbitmq_handler.h"
473
474-namespace drizzle_plugin
475-{
476+namespace drizzle_plugin {
477+namespace rabbitmq {
478
479 /**
480 * @brief
481@@ -75,8 +75,10 @@
482 */
483 drizzled::plugin::ReplicationReturnCode
484 apply(drizzled::Session &session, const drizzled::message::Transaction &to_apply);
485+ void setRabbitMQHandler(RabbitMQHandler* new_rabbitMQHandler);
486
487 };
488
489+} /* namespace rabbitmq */
490 } /* namespace drizzle_plugin */
491

Subscribers

People subscribed via source and target branches

to all changes: