Merge lp:~ajayaa/drizzle/event-notify-interface into lp:drizzle

Proposed by Ajaya Agrawal
Status: Needs review
Proposed branch: lp:~ajayaa/drizzle/event-notify-interface
Merge into: lp:drizzle
Diff against target: 819 lines (+672/-1)
18 files modified
drizzled/plugin/event_observer.cc (+16/-0)
drizzled/plugin/event_observer.h (+17/-0)
plugin/event_notify/event_notify.cc (+346/-0)
plugin/event_notify/event_notify.h (+81/-0)
plugin/event_notify/plugin.ini (+3/-0)
plugin/event_notify/tests/r/delete.result (+13/-0)
plugin/event_notify/tests/r/insert.result (+18/-0)
plugin/event_notify/tests/r/sys_replication_log.result (+12/-0)
plugin/event_notify/tests/r/time_out.result (+3/-0)
plugin/event_notify/tests/r/update.result (+13/-0)
plugin/event_notify/tests/t/delete.test (+21/-0)
plugin/event_notify/tests/t/insert.test (+28/-0)
plugin/event_notify/tests/t/master.opt (+1/-0)
plugin/event_notify/tests/t/sys_replication_log.test (+20/-0)
plugin/event_notify/tests/t/time_out.test (+2/-0)
plugin/event_notify/tests/t/update.test (+21/-0)
plugin/slave/queue_producer.cc (+56/-0)
plugin/slave/queue_thread.h (+1/-1)
To merge this branch: bzr merge lp:~ajayaa/drizzle/event-notify-interface
Reviewer Review Type Date Requested Status
Stewart Smith (community) Approve
Review via email: mp+185343@code.launchpad.net

Description of the change

Added event_notify interface. Currently three types of event i.e. INSERT, UPDATE, DELETE in any type of table is observed. Client can issue wait_for() call to wait for certain number of events and then return.

To post a comment you must log in.
2649. By Ajaya Agrawal

changed slave code

Revision history for this message
Stewart Smith (stewart) :
review: Approve

Unmerged revisions

2649. By Ajaya Agrawal

changed slave code

2648. By Ajaya Agrawal

added test cases for all type of events including insert in sys_replication_log and timeout

2647. By Ajaya Agrawal

added test cases for all type of events including insert in sys_replication_log and timeout

2646. By Ajaya Agrawal

fixed memory leak.working correctly. writing more test cases

2645. By Ajaya Agrawal

memory leak need to be fixed

2644. By Ajaya Agrawal

added a new event for replicationlog

2643. By Ajaya Agrawal

Added time_out support and test cases

2642. By Ajaya Agrawal

Added time_out support and test cases

2641. By Ajaya Agrawal

removed all hard coding and print statements and added destructor for event_t class

2640. By Ajaya Agrawal

removed all hard coding\n added destructor for event_t class

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'drizzled/plugin/event_observer.cc'
--- drizzled/plugin/event_observer.cc 2011-03-22 12:01:19 +0000
+++ drizzled/plugin/event_observer.cc 2013-09-18 20:41:29 +0000
@@ -421,6 +421,13 @@
421 return (in_table.getMutableShare()->getTableObservers() != NULL);421 return (in_table.getMutableShare()->getTableObservers() != NULL);
422 }422 }
423423
424 // bool InsertToSysReplicationLogEventData::callEventObservers()
425 // {
426 // EventObserverList *observers = new EventObserverList();
427 // observers->addObserver(this, AFTER_INSERT_TO_SYS_REPLICATION_LOG);
428
429 // }
430
424 /*==========================================================*/431 /*==========================================================*/
425 /* Static meathods called by drizzle to notify interested plugins 432 /* Static meathods called by drizzle to notify interested plugins
426 * of a schema event.433 * of a schema event.
@@ -599,6 +606,15 @@
599 return eventData.callEventObservers();606 return eventData.callEventObservers();
600 }607 }
601608
609 bool EventObserver::afterInsertToSysReplicationLog(Session &session)
610 {
611 if (all_event_plugins.empty())
612 return false;
613 InsertToSysReplicationLogEventData eventData(session);
614 return eventData.callEventObservers();
615 // std::cout << "event happening on sys_replication_log\n";
616 //return true;
617 }
602618
603} /* namespace plugin */619} /* namespace plugin */
604} /* namespace drizzled */620} /* namespace drizzled */
605621
=== modified file 'drizzled/plugin/event_observer.h'
--- drizzled/plugin/event_observer.h 2011-08-14 17:04:01 +0000
+++ drizzled/plugin/event_observer.h 2013-09-18 20:41:29 +0000
@@ -79,6 +79,8 @@
79 BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD, 79 BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD,
80 BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD,80 BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD,
8181
82 AFTER_INSERT_TO_SYS_REPLICATION_LOG,
83
82 /* The max event ID marker. */84 /* The max event ID marker. */
83 MAX_EVENT_COUNT85 MAX_EVENT_COUNT
84 };86 };
@@ -87,6 +89,10 @@
87 {89 {
88 switch(event) 90 switch(event)
89 {91 {
92
93 case AFTER_INSERT_TO_SYS_REPLICATION_LOG:
94 return "AFTER_INSERT_TO_SYS_REPLICATION_LOG";
95
90 case BEFORE_DROP_TABLE:96 case BEFORE_DROP_TABLE:
91 return "BEFORE_DROP_TABLE";97 return "BEFORE_DROP_TABLE";
9298
@@ -236,6 +242,8 @@
236 static bool beforeDropDatabase(Session &session, const std::string &db);242 static bool beforeDropDatabase(Session &session, const std::string &db);
237 static bool afterDropDatabase(Session &session, const std::string &db, int err);243 static bool afterDropDatabase(Session &session, const std::string &db, int err);
238244
245 static bool afterInsertToSysReplicationLog(Session &session);
246
239 static const EventObserverVector &getEventObservers(void);247 static const EventObserverVector &getEventObservers(void);
240248
241};249};
@@ -553,6 +561,15 @@
553 {} 561 {}
554};562};
555563
564class InsertToSysReplicationLogEventData: public SessionEventData
565{
566public:
567 InsertToSysReplicationLogEventData(Session &session_arg):
568 SessionEventData(EventObserver::AFTER_INSERT_TO_SYS_REPLICATION_LOG, session_arg)
569 {}
570 // virtual bool callEventObservers();
571};
572
556//=======================================================573//=======================================================
557574
558} /* namespace plugin */575} /* namespace plugin */
559576
=== added directory 'plugin/event_notify'
=== added file 'plugin/event_notify/event_notify.cc'
--- plugin/event_notify/event_notify.cc 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/event_notify.cc 2013-09-18 20:41:29 +0000
@@ -0,0 +1,346 @@
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (C) Ajaya Agrawal
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; version 2 of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <config.h>
21 #include <fcntl.h>
22 #include <drizzled/function/str/strfunc.h>
23 #include <drizzled/item/func.h>
24 #include <drizzled/plugin.h>
25 #include <drizzled/plugin/function.h>
26 #include <drizzled/item.h>
27 #include <drizzled/module/option_map.h>
28 #include <drizzled/session.h>
29 #include <drizzled/session/times.h>
30 #include <drizzled/table/instance/base.h>
31 #include <drizzled/table.h>
32
33 #include "event_notify.h"
34
35 namespace po= boost::program_options;
36 using namespace std;
37 using namespace drizzled;
38 using namespace plugin;
39
40 static EventNotify *event_notify = NULL;
41
42 struct AddressIs {
43 event_t *ptr;
44 AddressIs(event_t *ptr) : ptr(ptr) {}
45 bool operator()(const event_t *object) const {
46 return ptr == object;
47 }
48 };
49
50 // void show()
51 // {
52 // for (std::map<std::string, list<event_t*> >::iterator map_it=event_notify->all_events.begin(); map_it!=event_notify->all_events.end(); ++map_it){
53 // std::cout << map_it->first << " ";
54 // for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it)
55 // std::cout << (*list_it)->event_type << " " << (*list_it)->time_out << " " << (*list_it)->num_of_events_threshold<< " " << (*list_it)->num_of_events_so_far << '\n';
56 // }
57 // }
58
59 event_t::event_t(String *_table_name, String *_event_type, String *_time_out, String *_num_of_events_threshold)
60 {
61 this->table_name = _table_name->c_str();
62 this->event_type = _event_type->c_str();
63 time_out = atof(_time_out->c_str());
64 num_of_events_threshold = atoi(_num_of_events_threshold->c_str());
65 pthread_mutex_init(&this->lock, NULL);
66 pthread_cond_init(&this->cond, NULL);
67 num_of_events_so_far = 0;
68 }
69
70 event_t::~event_t()
71 {
72 pthread_mutex_destroy(&lock);
73 pthread_cond_destroy(&cond);
74 }
75
76 EventNotify::EventNotify(bool enabled):
77 drizzled::plugin::EventObserver("event_notify_interface"),
78 sysvar_enabled(enabled)
79 {
80 }
81
82 EventNotify::~EventNotify()
83 {
84 }
85
86 void EventNotify::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
87 {
88 boost::mutex::scoped_lock scoped(all_events_mutex);
89
90 string str(table_share.getTableName());
91 //cout << "in registertableeventsdo table name is " << str <<endl;
92 //show();
93 if (sysvar_enabled == false || (all_events.find(str) == all_events.end())){
94 return;
95 }
96 //cout << "yes event registered on this table\n";
97 registerEvent(observers, AFTER_INSERT_RECORD);
98 registerEvent(observers, AFTER_UPDATE_RECORD);
99 registerEvent(observers, AFTER_DELETE_RECORD);
100 }
101
102 void EventNotify::registerSessionEventsDo(Session &, EventObserverList &observers)
103 {
104 //here need to check whether drizzled was started with option --innodb.replication-log.
105 //cout << "registered for sys_replication_log\n";
106 registerEvent(observers, AFTER_INSERT_TO_SYS_REPLICATION_LOG);
107 }
108
109 bool EventNotify::observeEventDo(EventData &data)
110 {
111 if (not sysvar_enabled)
112 return false;
113
114 switch (data.event) {
115
116 case AFTER_INSERT_RECORD:
117 afterInsertRecord((AfterInsertRecordEventData &)data);
118 break;
119
120 case AFTER_UPDATE_RECORD:
121 afterUpdateRecord((AfterUpdateRecordEventData &)data);
122 break;
123
124 case AFTER_DELETE_RECORD:
125 afterDeleteRecord((AfterDeleteRecordEventData &)data);
126 break;
127
128 case AFTER_INSERT_TO_SYS_REPLICATION_LOG:
129 afterInsertToSysReplicationLog((InsertToSysReplicationLogEventData&) data);
130 break;
131
132 default:
133 fprintf(stderr, "event_notify: Unexpected event '%s'\n",
134 EventObserver::eventName(data.event));
135 }
136
137 return false;
138 }
139
140 bool EventNotify::afterInsertRecord(AfterInsertRecordEventData &data)
141 {
142 std::string str(data.table.getShare()->getTableName());
143 std::map<std::string, list<event_t*> >::iterator map_it;
144 boost::mutex::scoped_lock scoped(all_events_mutex);
145 map_it = all_events.find(str);
146
147 if(map_it != all_events.end()){
148 for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
149 if((*list_it)->event_type.compare("INSERT") == 0){
150 pthread_mutex_lock(&(*list_it)->lock);
151 (*list_it)->num_of_events_so_far++;
152 if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
153 pthread_cond_broadcast(&(*list_it)->cond);
154 fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
155 }
156 pthread_mutex_unlock(&(*list_it)->lock);
157 }
158 }
159 }
160 return false;
161 }
162
163 bool EventNotify::afterUpdateRecord(AfterUpdateRecordEventData &data)
164 {
165 std::string str(data.table.getShare()->getTableName());
166 std::map<std::string, list<event_t*> >::iterator map_it;
167 boost::mutex::scoped_lock scoped(all_events_mutex);
168 map_it = all_events.find(str);
169
170 if(map_it != all_events.end()){
171 for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
172 if((*list_it)->event_type.compare("UPDATE") == 0){
173 pthread_mutex_lock(&(*list_it)->lock);
174 (*list_it)->num_of_events_so_far++;
175 if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
176 pthread_cond_broadcast(&(*list_it)->cond);
177 fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
178 }
179 pthread_mutex_unlock(&(*list_it)->lock);
180 }
181 }
182 }
183 return false;
184 }
185
186 bool EventNotify::afterDeleteRecord(AfterDeleteRecordEventData &data)
187 {
188 std::string str(data.table.getShare()->getTableName());
189 std::map<std::string, list<event_t*> >::iterator map_it;
190 boost::mutex::scoped_lock scoped(all_events_mutex);
191 map_it = all_events.find(str);
192
193 if(map_it != all_events.end()){
194 for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
195 if((*list_it)->event_type.compare("DELETE") == 0){
196 pthread_mutex_lock(&(*list_it)->lock);
197 (*list_it)->num_of_events_so_far++;
198 if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
199 pthread_cond_broadcast(&(*list_it)->cond);
200 fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
201 }
202 pthread_mutex_unlock(&(*list_it)->lock);
203 }
204 }
205 }
206 return false;
207 }
208
209
210 bool EventNotify::afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data)
211 {
212 std::string str = "sys_replication_log";
213 std::map<std::string, list<event_t*> >::iterator map_it;
214 boost::mutex::scoped_lock scoped(all_events_mutex);
215 map_it = all_events.find(str);
216
217 if(map_it != all_events.end()){
218 for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
219 if((*list_it)->event_type.compare("INSERT") == 0){
220 pthread_mutex_lock(&(*list_it)->lock);
221 (*list_it)->num_of_events_so_far++;
222 if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
223 pthread_cond_broadcast(&(*list_it)->cond);
224 fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
225 }
226 pthread_mutex_unlock(&(*list_it)->lock);
227 }
228 }
229 }
230 return false;
231 }
232
233
234 class Item_func_event_notify : public Item_str_func
235 {
236 public:
237 Item_func_event_notify() : Item_str_func() {}
238 const char *func_name() const { return "wait_for"; }
239
240 String *val_str(String* s)
241 {
242 String *table_name = args[0]->val_str(s);
243 event_t *new_event = new event_t(args[0]->val_str(s),(args[1]->val_str(s)), (args[2]->val_str(s)), (args[3]->val_str(s)));
244
245 map<std::string, list<event_t*> >::iterator map_it;
246 list<event_t*>::iterator list_it;
247
248 {
249 boost::mutex::scoped_lock scoped(event_notify->all_events_mutex);
250 map_it = event_notify->all_events.find(new_event->table_name);
251
252 if(map_it == event_notify->all_events.end()){
253 list<event_t*> my_list;
254 my_list.push_back(new_event);
255 event_notify->all_events[new_event->table_name] = my_list;
256 }
257 else{
258 for (list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
259 if((*list_it)->event_type.compare(new_event->event_type) == 0 ) {
260 if( new_event->num_of_events_threshold <= (*list_it)->num_of_events_so_far){
261 //int retval = new_event->num_of_events_threshold;
262 delete new_event;
263 return table_name;
264 }
265 new_event->num_of_events_so_far = (*list_it)->num_of_events_so_far;
266 break;
267 }
268 }
269 map_it->second.push_back(new_event);
270 }
271 }
272 /*
273 * waiting for number of events reach its threshold.
274 */
275 struct timespec timeToWait;
276 struct timeval now;
277 gettimeofday(&now,NULL);
278 timeToWait.tv_sec = now.tv_sec+new_event->time_out;
279 pthread_mutex_lock(&new_event->lock);
280 //while(new_event->num_of_events_so_far < new_event->num_of_events_threshold )
281 fprintf(stderr, "wait %p\n", (void*)&new_event->cond);
282 pthread_cond_timedwait(&new_event->cond, &new_event->lock, &timeToWait);
283 fprintf(stderr, "SIGNALLED %p\n", (void*)&new_event->cond);
284
285 pthread_mutex_unlock(&new_event->lock);
286 int count_events = 0;
287 {
288 boost::mutex::scoped_lock scoped(event_notify->all_events_mutex);
289 map<std::string, list<event_t*> >::iterator map_iter;
290 list<event_t*>::iterator list_iter;
291 map_iter = event_notify->all_events.find(new_event->table_name);
292 for (list_iter=map_iter->second.begin(); list_iter!=map_iter->second.end(); ++list_iter){
293 if((*list_iter)->event_type.compare(new_event->event_type) == 0 ){
294 count_events++;
295 }
296 }
297 if(count_events > 1){
298 map_iter->second.remove_if(AddressIs(new_event));
299 delete new_event;
300 }
301 }
302 return table_name;
303 }
304
305 void fix_length_and_dec() {
306 max_length=strlen("wait_for");
307 }
308
309 bool check_argument_count(int n)
310 {
311 return (n == 4);
312 }
313 };
314
315 plugin::Create_function<Item_func_event_notify> *event_notify_udf= NULL;
316
317 static void init_options(drizzled::module::option_context &)
318 {
319 event_notify = new EventNotify(true);
320 }
321
322 static int event_notifier_plugin_init(drizzled::module::Context &context)
323 {
324
325 event_notify_udf=
326 new plugin::Create_function<Item_func_event_notify>("wait_for");
327 context.add(event_notify_udf);
328 context.add(event_notify);
329 context.registerVariable(new sys_var_bool_ptr("enabled", &event_notify->sysvar_enabled));
330 return 0;
331 }
332
333
334 DRIZZLE_DECLARE_PLUGIN
335 {
336 DRIZZLE_VERSION_ID, /* DRIZZLE_VERSION_ID */
337 "Event_Notifier", /* module name */
338 "1.0", /* module version */
339 "Ajaya Agrawal", /* author(s) */
340 N_("indicates an event happening on certain table/database/session"), /* description */
341 PLUGIN_LICENSE_BSD, /* license */
342 event_notifier_plugin_init, /* init module function */
343 NULL, /* module dependencies */
344 init_options /* init options function */
345 }
346 DRIZZLE_DECLARE_PLUGIN_END;
0\ No newline at end of file347\ No newline at end of file
1348
=== added file 'plugin/event_notify/event_notify.h'
--- plugin/event_notify/event_notify.h 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/event_notify.h 2013-09-18 20:41:29 +0000
@@ -0,0 +1,81 @@
1/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright 2013 Ajaya Agrawal
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#pragma once
21
22#include <drizzled/plugin/event_observer.h>
23#include <boost/thread/mutex.hpp>
24
25#include <map>
26#include <list>
27#include <pthread.h>
28#include <string>
29#include <cstring>
30
31namespace drizzled {
32namespace plugin {
33
34struct cmp_str
35{
36 bool operator()(std::string a, std::string b)
37 {
38 return (a.compare(b) < 0);
39 }
40};
41
42class event_t {
43
44public:
45 event_t(String *_session_time, String *_event_type, String *_table_name, String *_num_of_events);
46 ~event_t();
47 std::string table_name;
48 std::string event_type;
49 pthread_mutex_t lock;
50 pthread_cond_t cond;
51 int num_of_events_threshold;
52 double time_out;
53 int num_of_events_so_far;
54};
55
56class EventNotify : public drizzled::plugin::EventObserver
57{
58public:
59 EventNotify(bool enabled);
60 ~EventNotify();
61
62 void registerTableEventsDo(TableShare &table_share, EventObserverList &observers);
63 void registerSessionEventsDo(Session &session, EventObserverList &observers);
64 bool observeEventDo(EventData &);
65 bool afterStatement(AfterStatementEventData &data);
66 bool afterInsertRecord(AfterInsertRecordEventData &data);
67 bool afterUpdateRecord(AfterUpdateRecordEventData &data);
68 bool afterDeleteRecord(AfterDeleteRecordEventData &data);
69 bool afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data);
70 /**
71 * These are the event_notify system variables. So sysvar_enabled is
72 * event_notify_enabled in SHOW VARIABLES, etc. They are all global and dynamic.
73 */
74 bool sysvar_enabled;
75 boost::mutex all_events_mutex;
76 boost::mutex counter_mutex;
77 std::map <std::string, std::list<event_t*>, cmp_str > all_events;
78};
79
80} /* namespace plugin */
81} /* namespace drizzled */
082
=== added file 'plugin/event_notify/plugin.ini'
--- plugin/event_notify/plugin.ini 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/plugin.ini 2013-09-18 20:41:29 +0000
@@ -0,0 +1,3 @@
1[plugin]
2sources=event_notify.cc
3headers=event_notify.h
04
=== added directory 'plugin/event_notify/tests'
=== added directory 'plugin/event_notify/tests/r'
=== added file 'plugin/event_notify/tests/r/delete.result'
--- plugin/event_notify/tests/r/delete.result 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/r/delete.result 2013-09-18 20:41:29 +0000
@@ -0,0 +1,13 @@
1drop table if exists t1;
2create table t1 (a int primary key);
3select wait_for("t1", "DELETE", "200.9", "2");
4insert into t1 values(1),(2),(3);
5delete from t1 where a=1;
6delete from t1 where a=2;
7delete from t1 where a=3;
8wait_for("t1", "DELETE", "200.9", "2")
9t1
10select wait_for("t1", "DELETE", "1000", "3");
11wait_for("t1", "DELETE", "1000", "3")
12t1
13drop table t1;
014
=== added file 'plugin/event_notify/tests/r/insert.result'
--- plugin/event_notify/tests/r/insert.result 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/r/insert.result 2013-09-18 20:41:29 +0000
@@ -0,0 +1,18 @@
1drop table if exists t1;
2create table t1 (a int primary key);
3select wait_for("t1", "INSERT", "100", "4");
4insert into t1 values(1);
5insert into t1 values(2);
6insert into t1 values(3);
7insert into t1 values(4);
8wait_for("t1", "INSERT", "100", "4")
9t1
10select wait_for("t1", "INSERT", "100", "5");
11insert into t1 values(5);
12wait_for("t1", "INSERT", "100", "5")
13t1
14insert into t1 values(60);
15select wait_for("t1", "INSERT", "100", "6");
16wait_for("t1", "INSERT", "100", "6")
17t1
18drop table t1;
019
=== added file 'plugin/event_notify/tests/r/sys_replication_log.result'
--- plugin/event_notify/tests/r/sys_replication_log.result 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/r/sys_replication_log.result 2013-09-18 20:41:29 +0000
@@ -0,0 +1,12 @@
1drop table if exists t1;
2create table t1 (a int primary key);
3select wait_for("sys_replication_log", "INSERT", "200.9", "1");
4insert into t1 values(1);
5insert into t1 values(2);
6insert into t1 values(3);
7wait_for("sys_replication_log", "INSERT", "200.9", "1")
8sys_replication_log
9select wait_for("sys_replication_log", "INSERT", "100", "3");
10wait_for("sys_replication_log", "INSERT", "100", "3")
11sys_replication_log
12drop table t1;
013
=== added file 'plugin/event_notify/tests/r/time_out.result'
--- plugin/event_notify/tests/r/time_out.result 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/r/time_out.result 2013-09-18 20:41:29 +0000
@@ -0,0 +1,3 @@
1select wait_for("t1", "INSERT", "5", "20");
2wait_for("t1", "INSERT", "5", "20")
3t1
04
=== added file 'plugin/event_notify/tests/r/update.result'
--- plugin/event_notify/tests/r/update.result 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/r/update.result 2013-09-18 20:41:29 +0000
@@ -0,0 +1,13 @@
1drop table if exists t1;
2create table t1 (a int primary key);
3select wait_for("t1", "UPDATE", "200.9", "1");
4insert into t1 values(1),(2),(3);
5update t1 set a=10 where a=1;
6update t1 set a=20 where a=2;
7update t1 set a=30 where a=3;
8wait_for("t1", "UPDATE", "200.9", "1")
9t1
10select wait_for("t1", "UPDATE", "1000", "2");
11wait_for("t1", "UPDATE", "1000", "2")
12t1
13drop table t1;
014
=== added directory 'plugin/event_notify/tests/t'
=== added file 'plugin/event_notify/tests/t/delete.test'
--- plugin/event_notify/tests/t/delete.test 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/delete.test 2013-09-18 20:41:29 +0000
@@ -0,0 +1,21 @@
1connect (con1,localhost,root,,);
2--disable_warnings
3drop table if exists t1;
4--enable_warnings
5create table t1 (a int primary key);
6connection con1;
7send select wait_for("t1", "DELETE", "200.9", "2");
8connection default;
9insert into t1 values(1),(2),(3);
10delete from t1 where a=1;
11delete from t1 where a=2;
12delete from t1 where a=3;
13connection con1;
14reap;
15send select wait_for("t1", "DELETE", "1000", "3");
16connection default;
17connection con1;
18reap;
19disconnect con1;
20connection default;
21drop table t1;
022
=== added file 'plugin/event_notify/tests/t/insert.test'
--- plugin/event_notify/tests/t/insert.test 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/insert.test 2013-09-18 20:41:29 +0000
@@ -0,0 +1,28 @@
1connect (con1,localhost,root,,);
2connect (con2,localhost,root,,);
3--disable_warnings
4drop table if exists t1;
5--enable_warnings
6create table t1 (a int primary key);
7connection con1;
8send select wait_for("t1", "INSERT", "100", "4");
9connection default;
10insert into t1 values(1);
11insert into t1 values(2);
12insert into t1 values(3);
13insert into t1 values(4);
14connection con1;
15reap;
16connection con2;
17send select wait_for("t1", "INSERT", "100", "5");
18connection default;
19insert into t1 values(5);
20connection con2;
21reap;
22insert into t1 values(60);
23send select wait_for("t1", "INSERT", "100", "6");
24reap;
25disconnect con1;
26disconnect con2;
27connection default;
28drop table t1;
029
=== added file 'plugin/event_notify/tests/t/master.opt'
--- plugin/event_notify/tests/t/master.opt 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/master.opt 2013-09-18 20:41:29 +0000
@@ -0,0 +1,1 @@
1--plugin-add=event_notify --innodb.replication-log
02
=== added file 'plugin/event_notify/tests/t/sys_replication_log.test'
--- plugin/event_notify/tests/t/sys_replication_log.test 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/sys_replication_log.test 2013-09-18 20:41:29 +0000
@@ -0,0 +1,20 @@
1connect (con1,localhost,root,,);
2--disable_warnings
3drop table if exists t1;
4--enable_warnings
5create table t1 (a int primary key);
6connection con1;
7send select wait_for("sys_replication_log", "INSERT", "200.9", "1");
8connection default;
9insert into t1 values(1);
10insert into t1 values(2);
11insert into t1 values(3);
12connection con1;
13reap;
14send select wait_for("sys_replication_log", "INSERT", "100", "3");
15connection default;
16connection con1;
17reap;
18disconnect con1;
19connection default;
20drop table t1;
021
=== added file 'plugin/event_notify/tests/t/time_out.test'
--- plugin/event_notify/tests/t/time_out.test 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/time_out.test 2013-09-18 20:41:29 +0000
@@ -0,0 +1,2 @@
1send select wait_for("t1", "INSERT", "5", "20");
2reap;
03
=== added file 'plugin/event_notify/tests/t/update.test'
--- plugin/event_notify/tests/t/update.test 1970-01-01 00:00:00 +0000
+++ plugin/event_notify/tests/t/update.test 2013-09-18 20:41:29 +0000
@@ -0,0 +1,21 @@
1connect (con1,localhost,root,,);
2--disable_warnings
3drop table if exists t1;
4--enable_warnings
5create table t1 (a int primary key);
6connection con1;
7send select wait_for("t1", "UPDATE", "200.9", "1");
8connection default;
9insert into t1 values(1),(2),(3);
10update t1 set a=10 where a=1;
11update t1 set a=20 where a=2;
12update t1 set a=30 where a=3;
13connection con1;
14reap;
15send select wait_for("t1", "UPDATE", "1000", "2");
16connection default;
17connection con1;
18reap;
19disconnect con1;
20connection default;
21drop table t1;
022
=== modified file 'plugin/slave/queue_producer.cc'
--- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000
+++ plugin/slave/queue_producer.cc 2013-09-18 20:41:29 +0000
@@ -49,6 +49,62 @@
49 return reconnect(true);49 return reconnect(true);
50}50}
5151
52void QueueProducer::run()
53{
54 boost::posix_time::seconds duration(getSleepInterval());
55
56 /* thread setup needed to do things like create a Session */
57 internal::my_thread_init();
58
59 if (not init())
60 return;
61
62 while (1)
63 {
64 {
65 /* This uninterruptable block processes the message queue */
66 boost::this_thread::disable_interruption di;
67
68 if (not process())
69 {
70 shutdown();
71 return;
72 }
73 }
74
75 /* Interruptable only when not doing work (aka, sleeping) */
76 try
77 {
78 /* waiting for num_of_events through a wait_for() call*/
79 int num_of_events = 2;
80 std::string event_sql("select wait_for(\"sys_replication_log\", \"INSERT\", \"2000\", \"");
81 event_sql.append(boost::lexical_cast<string>(num_of_events));
82 event_sql.append("\")");
83 drizzle_return_t event_ret;
84 drizzle_result_st event_result;
85 drizzle_query_str(_connection, &event_result, event_sql.c_str(), &event_ret);
86
87 if (event_ret != DRIZZLE_RETURN_OK){
88 drizzle_result_free(&event_result);
89 boost::this_thread::sleep(duration);
90 string error = "";
91 error.append(drizzle_error(_drizzle));
92 cout << error <<": producer thread sleeping\n";
93 }
94 else{
95 num_of_events += 1;
96 cout << "wait_for returned\n";
97 drizzle_result_free(&event_result);
98 }
99 }
100 catch (boost::thread_interrupted &)
101 {
102 return;
103 }
104 }
105}
106
107
52bool QueueProducer::process()108bool QueueProducer::process()
53{109{
54 if (_saved_max_commit_id == 0)110 if (_saved_max_commit_id == 0)
55111
=== modified file 'plugin/slave/queue_thread.h'
--- plugin/slave/queue_thread.h 2011-03-14 05:40:28 +0000
+++ plugin/slave/queue_thread.h 2013-09-18 20:41:29 +0000
@@ -40,7 +40,7 @@
40 virtual ~QueueThread()40 virtual ~QueueThread()
41 {}41 {}
42 42
43 void run(void);43 virtual void run(void);
4444
45 /**45 /**
46 * Do any initialization work.46 * Do any initialization work.

Subscribers

People subscribed via source and target branches