Merge lp:~ansharyan015/drizzle/zeromq_dynamic into lp:drizzle

Proposed by Daniel Nichter
Status: Merged
Approved by: Brian Aker
Approved revision: 2568
Merged at revision: 2572
Proposed branch: lp:~ansharyan015/drizzle/zeromq_dynamic
Merge into: lp:drizzle
Diff against target: 173 lines (+81/-8)
2 files modified
plugin/zeromq/zeromq_log.cc (+57/-6)
plugin/zeromq/zeromq_log.h (+24/-2)
To merge this branch: bzr merge lp:~ansharyan015/drizzle/zeromq_dynamic
Reviewer Review Type Date Requested Status
Daniel Nichter (community) code review Approve
Drizzle Merge Team Pending
Review via email: mp+114037@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Daniel Nichter (daniel-nichter) wrote :

Need to guard changing _socket with the publishLock mutex so other threads can't write to it while it's being changed.

review: Needs Fixing (code review)
Revision history for this message
Daniel Nichter (daniel-nichter) :
review: Approve (code review)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plugin/zeromq/zeromq_log.cc'
2--- plugin/zeromq/zeromq_log.cc 2012-01-16 02:37:54 +0000
3+++ plugin/zeromq/zeromq_log.cc 2012-07-10 08:08:23 +0000
4@@ -29,6 +29,7 @@
5 #include <stdio.h>
6 #include <drizzled/module/registry.h>
7 #include <drizzled/plugin.h>
8+#include <drizzled/item.h>
9 #include <stdint.h>
10 #include <boost/program_options.hpp>
11 #include <drizzled/module/option_map.h>
12@@ -40,11 +41,15 @@
13 using namespace drizzled;
14 using namespace google;
15
16-namespace drizzle_plugin
17-{
18+namespace drizzle_plugin {
19+namespace zeromq {
20+
21+bool updateEndpoint(Session *, set_var* var);
22+
23
24 ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
25- plugin::TransactionApplier(name)
26+ plugin::TransactionApplier(name),
27+ sysvar_endpoint(endpoint)
28 {
29 void *context= zmq_init(1);
30 _socket= zmq_socket (context, ZMQ_PUB);
31@@ -60,6 +65,32 @@
32 pthread_mutex_destroy(&publishLock);
33 }
34
35+std::string& ZeroMQLog::getEndpoint()
36+{
37+ return sysvar_endpoint;
38+}
39+
40+bool ZeroMQLog::setEndpoint(std::string new_endpoint)
41+{
42+ void *tmp_context= zmq_init(1);
43+ void *tmp_socket= zmq_socket(tmp_context, ZMQ_PUB);
44+ if(!tmp_socket)
45+ return false;
46+ int tmp_rc= zmq_bind(tmp_socket, new_endpoint.c_str());
47+ if(tmp_rc!=0)
48+ return false;
49+ // need a mutex around this since other threads can try to write to _socket while we are changing the endpoint
50+ pthread_mutex_lock(&publishLock);
51+
52+ zmq_close(_socket);
53+ _socket= tmp_socket;
54+ sysvar_endpoint= new_endpoint;
55+
56+ //Releasing the mutex lock
57+ pthread_mutex_unlock(&publishLock);
58+ return true;
59+}
60+
61 plugin::ReplicationReturnCode
62 ZeroMQLog::apply(Session &, const message::Transaction &to_apply)
63 {
64@@ -126,6 +157,25 @@
65 static ZeroMQLog *zeromqLogger; ///< the actual plugin
66
67 /**
68+ * This function is called when the value of zeromq_endpoint is updated in the system
69+ *
70+ * @return False on success, True on error.
71+ */
72+bool updateEndpoint(Session *, set_var* var)
73+{
74+ if (not var->value->str_value.empty())
75+ {
76+ std::string new_endpoint(var->value->str_value.data());
77+ if (zeromqLogger->setEndpoint(new_endpoint))
78+ return false; //success
79+ else
80+ return true; // error
81+ }
82+ errmsg_printf(error::ERROR, _("zeromq_endpoint cannot be NULL"));
83+ return true; // error
84+}
85+
86+/**
87 * Initialize the zeromq logger
88 */
89 static int init(drizzled::module::Context &context)
90@@ -134,7 +184,7 @@
91 zeromqLogger= new ZeroMQLog("zeromq_applier", vm["endpoint"].as<string>());
92 context.add(zeromqLogger);
93 ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
94- context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
95+ context.registerVariable(new sys_var_std_string("endpoint", zeromqLogger->getEndpoint(), NULL, &updateEndpoint));
96 return 0;
97 }
98
99@@ -150,6 +200,7 @@
100
101 }
102
103+} /* namespace zeromq */
104 } /* namespace drizzle_plugin */
105
106 DRIZZLE_DECLARE_PLUGIN
107@@ -160,8 +211,8 @@
108 "Marcus Eriksson",
109 N_("Publishes transactions to ZeroMQ"),
110 PLUGIN_LICENSE_GPL,
111- drizzle_plugin::init,
112+ drizzle_plugin::zeromq::init,
113 NULL,
114- drizzle_plugin::init_options,
115+ drizzle_plugin::zeromq::init_options,
116 }
117 DRIZZLE_DECLARE_PLUGIN_END;
118
119=== modified file 'plugin/zeromq/zeromq_log.h'
120--- plugin/zeromq/zeromq_log.h 2011-08-08 18:51:23 +0000
121+++ plugin/zeromq/zeromq_log.h 2012-07-10 08:08:23 +0000
122@@ -30,8 +30,8 @@
123 #include <string>
124 #include <zmq.h>
125
126-namespace drizzle_plugin
127-{
128+namespace drizzle_plugin {
129+namespace zeromq {
130
131 /**
132 * @brief
133@@ -46,6 +46,7 @@
134 private:
135 void *_socket;
136 pthread_mutex_t publishLock;
137+ std::string sysvar_endpoint;
138 std::string getSchemaName(const drizzled::message::Transaction &txn);
139 public:
140
141@@ -60,6 +61,26 @@
142 */
143 ZeroMQLog(const std::string &name, const std::string &endpoint);
144 ~ZeroMQLog();
145+
146+ /**
147+ * @brief
148+ * Getter for endpoint
149+ *
150+ * @details
151+ * Returns value of sysvar_endpoint
152+ */
153+ std::string& getEndpoint();
154+
155+ /**
156+ * @brief
157+ * Setter for endpoint
158+ *
159+ * @details
160+ * This function is called to change the value of sysvar_endpoint
161+ *
162+ * @param[in] new endpoint string
163+ */
164+ bool setEndpoint(std::string new_endpoint);
165
166 /**
167 * @brief
168@@ -75,5 +96,6 @@
169
170 };
171
172+} /* namespace zeromq */
173 } /* namespace drizzle_plugin */
174

Subscribers

People subscribed via source and target branches

to all changes: