Merge lp:~ajayaa/drizzle/event-notify-interface into lp:drizzle
- event-notify-interface
- Merge into 7.2
Status: | Needs review |
---|---|
Proposed branch: | lp:~ajayaa/drizzle/event-notify-interface |
Merge into: | lp:drizzle |
Diff against target: |
819 lines (+672/-1) 18 files modified
drizzled/plugin/event_observer.cc (+16/-0) drizzled/plugin/event_observer.h (+17/-0) plugin/event_notify/event_notify.cc (+346/-0) plugin/event_notify/event_notify.h (+81/-0) plugin/event_notify/plugin.ini (+3/-0) plugin/event_notify/tests/r/delete.result (+13/-0) plugin/event_notify/tests/r/insert.result (+18/-0) plugin/event_notify/tests/r/sys_replication_log.result (+12/-0) plugin/event_notify/tests/r/time_out.result (+3/-0) plugin/event_notify/tests/r/update.result (+13/-0) plugin/event_notify/tests/t/delete.test (+21/-0) plugin/event_notify/tests/t/insert.test (+28/-0) plugin/event_notify/tests/t/master.opt (+1/-0) plugin/event_notify/tests/t/sys_replication_log.test (+20/-0) plugin/event_notify/tests/t/time_out.test (+2/-0) plugin/event_notify/tests/t/update.test (+21/-0) plugin/slave/queue_producer.cc (+56/-0) plugin/slave/queue_thread.h (+1/-1) |
To merge this branch: | bzr merge lp:~ajayaa/drizzle/event-notify-interface |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Stewart Smith (community) | Approve | ||
Review via email: mp+185343@code.launchpad.net |
Commit message
Description of the change
Added event_notify interface. Currently three types of event i.e. INSERT, UPDATE, DELETE in any type of table is observed. Client can issue wait_for() call to wait for certain number of events and then return.
- 2649. By Ajaya Agrawal
-
changed slave code
Unmerged revisions
- 2649. By Ajaya Agrawal
-
changed slave code
- 2648. By Ajaya Agrawal
-
added test cases for all type of events including insert in sys_replication_log and timeout
- 2647. By Ajaya Agrawal
-
added test cases for all type of events including insert in sys_replication_log and timeout
- 2646. By Ajaya Agrawal
-
fixed memory leak.working correctly. writing more test cases
- 2645. By Ajaya Agrawal
-
memory leak need to be fixed
- 2644. By Ajaya Agrawal
-
added a new event for replicationlog
- 2643. By Ajaya Agrawal
-
Added time_out support and test cases
- 2642. By Ajaya Agrawal
-
Added time_out support and test cases
- 2641. By Ajaya Agrawal
-
removed all hard coding and print statements and added destructor for event_t class
- 2640. By Ajaya Agrawal
-
removed all hard coding\n added destructor for event_t class
Preview Diff
1 | === modified file 'drizzled/plugin/event_observer.cc' | |||
2 | --- drizzled/plugin/event_observer.cc 2011-03-22 12:01:19 +0000 | |||
3 | +++ drizzled/plugin/event_observer.cc 2013-09-18 20:41:29 +0000 | |||
4 | @@ -421,6 +421,13 @@ | |||
5 | 421 | return (in_table.getMutableShare()->getTableObservers() != NULL); | 421 | return (in_table.getMutableShare()->getTableObservers() != NULL); |
6 | 422 | } | 422 | } |
7 | 423 | 423 | ||
8 | 424 | // bool InsertToSysReplicationLogEventData::callEventObservers() | ||
9 | 425 | // { | ||
10 | 426 | // EventObserverList *observers = new EventObserverList(); | ||
11 | 427 | // observers->addObserver(this, AFTER_INSERT_TO_SYS_REPLICATION_LOG); | ||
12 | 428 | |||
13 | 429 | // } | ||
14 | 430 | |||
15 | 424 | /*==========================================================*/ | 431 | /*==========================================================*/ |
16 | 425 | /* Static meathods called by drizzle to notify interested plugins | 432 | /* Static meathods called by drizzle to notify interested plugins |
17 | 426 | * of a schema event. | 433 | * of a schema event. |
18 | @@ -599,6 +606,15 @@ | |||
19 | 599 | return eventData.callEventObservers(); | 606 | return eventData.callEventObservers(); |
20 | 600 | } | 607 | } |
21 | 601 | 608 | ||
22 | 609 | bool EventObserver::afterInsertToSysReplicationLog(Session &session) | ||
23 | 610 | { | ||
24 | 611 | if (all_event_plugins.empty()) | ||
25 | 612 | return false; | ||
26 | 613 | InsertToSysReplicationLogEventData eventData(session); | ||
27 | 614 | return eventData.callEventObservers(); | ||
28 | 615 | // std::cout << "event happening on sys_replication_log\n"; | ||
29 | 616 | //return true; | ||
30 | 617 | } | ||
31 | 602 | 618 | ||
32 | 603 | } /* namespace plugin */ | 619 | } /* namespace plugin */ |
33 | 604 | } /* namespace drizzled */ | 620 | } /* namespace drizzled */ |
34 | 605 | 621 | ||
35 | === modified file 'drizzled/plugin/event_observer.h' | |||
36 | --- drizzled/plugin/event_observer.h 2011-08-14 17:04:01 +0000 | |||
37 | +++ drizzled/plugin/event_observer.h 2013-09-18 20:41:29 +0000 | |||
38 | @@ -79,6 +79,8 @@ | |||
39 | 79 | BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD, | 79 | BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD, |
40 | 80 | BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD, | 80 | BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD, |
41 | 81 | 81 | ||
42 | 82 | AFTER_INSERT_TO_SYS_REPLICATION_LOG, | ||
43 | 83 | |||
44 | 82 | /* The max event ID marker. */ | 84 | /* The max event ID marker. */ |
45 | 83 | MAX_EVENT_COUNT | 85 | MAX_EVENT_COUNT |
46 | 84 | }; | 86 | }; |
47 | @@ -87,6 +89,10 @@ | |||
48 | 87 | { | 89 | { |
49 | 88 | switch(event) | 90 | switch(event) |
50 | 89 | { | 91 | { |
51 | 92 | |||
52 | 93 | case AFTER_INSERT_TO_SYS_REPLICATION_LOG: | ||
53 | 94 | return "AFTER_INSERT_TO_SYS_REPLICATION_LOG"; | ||
54 | 95 | |||
55 | 90 | case BEFORE_DROP_TABLE: | 96 | case BEFORE_DROP_TABLE: |
56 | 91 | return "BEFORE_DROP_TABLE"; | 97 | return "BEFORE_DROP_TABLE"; |
57 | 92 | 98 | ||
58 | @@ -236,6 +242,8 @@ | |||
59 | 236 | static bool beforeDropDatabase(Session &session, const std::string &db); | 242 | static bool beforeDropDatabase(Session &session, const std::string &db); |
60 | 237 | static bool afterDropDatabase(Session &session, const std::string &db, int err); | 243 | static bool afterDropDatabase(Session &session, const std::string &db, int err); |
61 | 238 | 244 | ||
62 | 245 | static bool afterInsertToSysReplicationLog(Session &session); | ||
63 | 246 | |||
64 | 239 | static const EventObserverVector &getEventObservers(void); | 247 | static const EventObserverVector &getEventObservers(void); |
65 | 240 | 248 | ||
66 | 241 | }; | 249 | }; |
67 | @@ -553,6 +561,15 @@ | |||
68 | 553 | {} | 561 | {} |
69 | 554 | }; | 562 | }; |
70 | 555 | 563 | ||
71 | 564 | class InsertToSysReplicationLogEventData: public SessionEventData | ||
72 | 565 | { | ||
73 | 566 | public: | ||
74 | 567 | InsertToSysReplicationLogEventData(Session &session_arg): | ||
75 | 568 | SessionEventData(EventObserver::AFTER_INSERT_TO_SYS_REPLICATION_LOG, session_arg) | ||
76 | 569 | {} | ||
77 | 570 | // virtual bool callEventObservers(); | ||
78 | 571 | }; | ||
79 | 572 | |||
80 | 556 | //======================================================= | 573 | //======================================================= |
81 | 557 | 574 | ||
82 | 558 | } /* namespace plugin */ | 575 | } /* namespace plugin */ |
83 | 559 | 576 | ||
84 | === added directory 'plugin/event_notify' | |||
85 | === added file 'plugin/event_notify/event_notify.cc' | |||
86 | --- plugin/event_notify/event_notify.cc 1970-01-01 00:00:00 +0000 | |||
87 | +++ plugin/event_notify/event_notify.cc 2013-09-18 20:41:29 +0000 | |||
88 | @@ -0,0 +1,346 @@ | |||
89 | 1 | /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*- | ||
90 | 2 | * vim:expandtab:shiftwidth=2:tabstop=2:smarttab: | ||
91 | 3 | * | ||
92 | 4 | * Copyright (C) Ajaya Agrawal | ||
93 | 5 | * | ||
94 | 6 | * This program is free software; you can redistribute it and/or modify | ||
95 | 7 | * it under the terms of the GNU General Public License as published by | ||
96 | 8 | * the Free Software Foundation; version 2 of the License. | ||
97 | 9 | * | ||
98 | 10 | * This program is distributed in the hope that it will be useful, | ||
99 | 11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
100 | 12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
101 | 13 | * GNU General Public License for more details. | ||
102 | 14 | * | ||
103 | 15 | * You should have received a copy of the GNU General Public License | ||
104 | 16 | * along with this program; if not, write to the Free Software | ||
105 | 17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | ||
106 | 18 | */ | ||
107 | 19 | |||
108 | 20 | #include <config.h> | ||
109 | 21 | #include <fcntl.h> | ||
110 | 22 | #include <drizzled/function/str/strfunc.h> | ||
111 | 23 | #include <drizzled/item/func.h> | ||
112 | 24 | #include <drizzled/plugin.h> | ||
113 | 25 | #include <drizzled/plugin/function.h> | ||
114 | 26 | #include <drizzled/item.h> | ||
115 | 27 | #include <drizzled/module/option_map.h> | ||
116 | 28 | #include <drizzled/session.h> | ||
117 | 29 | #include <drizzled/session/times.h> | ||
118 | 30 | #include <drizzled/table/instance/base.h> | ||
119 | 31 | #include <drizzled/table.h> | ||
120 | 32 | |||
121 | 33 | #include "event_notify.h" | ||
122 | 34 | |||
123 | 35 | namespace po= boost::program_options; | ||
124 | 36 | using namespace std; | ||
125 | 37 | using namespace drizzled; | ||
126 | 38 | using namespace plugin; | ||
127 | 39 | |||
128 | 40 | static EventNotify *event_notify = NULL; | ||
129 | 41 | |||
130 | 42 | struct AddressIs { | ||
131 | 43 | event_t *ptr; | ||
132 | 44 | AddressIs(event_t *ptr) : ptr(ptr) {} | ||
133 | 45 | bool operator()(const event_t *object) const { | ||
134 | 46 | return ptr == object; | ||
135 | 47 | } | ||
136 | 48 | }; | ||
137 | 49 | |||
138 | 50 | // void show() | ||
139 | 51 | // { | ||
140 | 52 | // for (std::map<std::string, list<event_t*> >::iterator map_it=event_notify->all_events.begin(); map_it!=event_notify->all_events.end(); ++map_it){ | ||
141 | 53 | // std::cout << map_it->first << " "; | ||
142 | 54 | // for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it) | ||
143 | 55 | // std::cout << (*list_it)->event_type << " " << (*list_it)->time_out << " " << (*list_it)->num_of_events_threshold<< " " << (*list_it)->num_of_events_so_far << '\n'; | ||
144 | 56 | // } | ||
145 | 57 | // } | ||
146 | 58 | |||
147 | 59 | event_t::event_t(String *_table_name, String *_event_type, String *_time_out, String *_num_of_events_threshold) | ||
148 | 60 | { | ||
149 | 61 | this->table_name = _table_name->c_str(); | ||
150 | 62 | this->event_type = _event_type->c_str(); | ||
151 | 63 | time_out = atof(_time_out->c_str()); | ||
152 | 64 | num_of_events_threshold = atoi(_num_of_events_threshold->c_str()); | ||
153 | 65 | pthread_mutex_init(&this->lock, NULL); | ||
154 | 66 | pthread_cond_init(&this->cond, NULL); | ||
155 | 67 | num_of_events_so_far = 0; | ||
156 | 68 | } | ||
157 | 69 | |||
158 | 70 | event_t::~event_t() | ||
159 | 71 | { | ||
160 | 72 | pthread_mutex_destroy(&lock); | ||
161 | 73 | pthread_cond_destroy(&cond); | ||
162 | 74 | } | ||
163 | 75 | |||
164 | 76 | EventNotify::EventNotify(bool enabled): | ||
165 | 77 | drizzled::plugin::EventObserver("event_notify_interface"), | ||
166 | 78 | sysvar_enabled(enabled) | ||
167 | 79 | { | ||
168 | 80 | } | ||
169 | 81 | |||
170 | 82 | EventNotify::~EventNotify() | ||
171 | 83 | { | ||
172 | 84 | } | ||
173 | 85 | |||
174 | 86 | void EventNotify::registerTableEventsDo(TableShare &table_share, EventObserverList &observers) | ||
175 | 87 | { | ||
176 | 88 | boost::mutex::scoped_lock scoped(all_events_mutex); | ||
177 | 89 | |||
178 | 90 | string str(table_share.getTableName()); | ||
179 | 91 | //cout << "in registertableeventsdo table name is " << str <<endl; | ||
180 | 92 | //show(); | ||
181 | 93 | if (sysvar_enabled == false || (all_events.find(str) == all_events.end())){ | ||
182 | 94 | return; | ||
183 | 95 | } | ||
184 | 96 | //cout << "yes event registered on this table\n"; | ||
185 | 97 | registerEvent(observers, AFTER_INSERT_RECORD); | ||
186 | 98 | registerEvent(observers, AFTER_UPDATE_RECORD); | ||
187 | 99 | registerEvent(observers, AFTER_DELETE_RECORD); | ||
188 | 100 | } | ||
189 | 101 | |||
190 | 102 | void EventNotify::registerSessionEventsDo(Session &, EventObserverList &observers) | ||
191 | 103 | { | ||
192 | 104 | //here need to check whether drizzled was started with option --innodb.replication-log. | ||
193 | 105 | //cout << "registered for sys_replication_log\n"; | ||
194 | 106 | registerEvent(observers, AFTER_INSERT_TO_SYS_REPLICATION_LOG); | ||
195 | 107 | } | ||
196 | 108 | |||
197 | 109 | bool EventNotify::observeEventDo(EventData &data) | ||
198 | 110 | { | ||
199 | 111 | if (not sysvar_enabled) | ||
200 | 112 | return false; | ||
201 | 113 | |||
202 | 114 | switch (data.event) { | ||
203 | 115 | |||
204 | 116 | case AFTER_INSERT_RECORD: | ||
205 | 117 | afterInsertRecord((AfterInsertRecordEventData &)data); | ||
206 | 118 | break; | ||
207 | 119 | |||
208 | 120 | case AFTER_UPDATE_RECORD: | ||
209 | 121 | afterUpdateRecord((AfterUpdateRecordEventData &)data); | ||
210 | 122 | break; | ||
211 | 123 | |||
212 | 124 | case AFTER_DELETE_RECORD: | ||
213 | 125 | afterDeleteRecord((AfterDeleteRecordEventData &)data); | ||
214 | 126 | break; | ||
215 | 127 | |||
216 | 128 | case AFTER_INSERT_TO_SYS_REPLICATION_LOG: | ||
217 | 129 | afterInsertToSysReplicationLog((InsertToSysReplicationLogEventData&) data); | ||
218 | 130 | break; | ||
219 | 131 | |||
220 | 132 | default: | ||
221 | 133 | fprintf(stderr, "event_notify: Unexpected event '%s'\n", | ||
222 | 134 | EventObserver::eventName(data.event)); | ||
223 | 135 | } | ||
224 | 136 | |||
225 | 137 | return false; | ||
226 | 138 | } | ||
227 | 139 | |||
228 | 140 | bool EventNotify::afterInsertRecord(AfterInsertRecordEventData &data) | ||
229 | 141 | { | ||
230 | 142 | std::string str(data.table.getShare()->getTableName()); | ||
231 | 143 | std::map<std::string, list<event_t*> >::iterator map_it; | ||
232 | 144 | boost::mutex::scoped_lock scoped(all_events_mutex); | ||
233 | 145 | map_it = all_events.find(str); | ||
234 | 146 | |||
235 | 147 | if(map_it != all_events.end()){ | ||
236 | 148 | for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ | ||
237 | 149 | if((*list_it)->event_type.compare("INSERT") == 0){ | ||
238 | 150 | pthread_mutex_lock(&(*list_it)->lock); | ||
239 | 151 | (*list_it)->num_of_events_so_far++; | ||
240 | 152 | if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ | ||
241 | 153 | pthread_cond_broadcast(&(*list_it)->cond); | ||
242 | 154 | fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); | ||
243 | 155 | } | ||
244 | 156 | pthread_mutex_unlock(&(*list_it)->lock); | ||
245 | 157 | } | ||
246 | 158 | } | ||
247 | 159 | } | ||
248 | 160 | return false; | ||
249 | 161 | } | ||
250 | 162 | |||
251 | 163 | bool EventNotify::afterUpdateRecord(AfterUpdateRecordEventData &data) | ||
252 | 164 | { | ||
253 | 165 | std::string str(data.table.getShare()->getTableName()); | ||
254 | 166 | std::map<std::string, list<event_t*> >::iterator map_it; | ||
255 | 167 | boost::mutex::scoped_lock scoped(all_events_mutex); | ||
256 | 168 | map_it = all_events.find(str); | ||
257 | 169 | |||
258 | 170 | if(map_it != all_events.end()){ | ||
259 | 171 | for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ | ||
260 | 172 | if((*list_it)->event_type.compare("UPDATE") == 0){ | ||
261 | 173 | pthread_mutex_lock(&(*list_it)->lock); | ||
262 | 174 | (*list_it)->num_of_events_so_far++; | ||
263 | 175 | if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ | ||
264 | 176 | pthread_cond_broadcast(&(*list_it)->cond); | ||
265 | 177 | fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); | ||
266 | 178 | } | ||
267 | 179 | pthread_mutex_unlock(&(*list_it)->lock); | ||
268 | 180 | } | ||
269 | 181 | } | ||
270 | 182 | } | ||
271 | 183 | return false; | ||
272 | 184 | } | ||
273 | 185 | |||
274 | 186 | bool EventNotify::afterDeleteRecord(AfterDeleteRecordEventData &data) | ||
275 | 187 | { | ||
276 | 188 | std::string str(data.table.getShare()->getTableName()); | ||
277 | 189 | std::map<std::string, list<event_t*> >::iterator map_it; | ||
278 | 190 | boost::mutex::scoped_lock scoped(all_events_mutex); | ||
279 | 191 | map_it = all_events.find(str); | ||
280 | 192 | |||
281 | 193 | if(map_it != all_events.end()){ | ||
282 | 194 | for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ | ||
283 | 195 | if((*list_it)->event_type.compare("DELETE") == 0){ | ||
284 | 196 | pthread_mutex_lock(&(*list_it)->lock); | ||
285 | 197 | (*list_it)->num_of_events_so_far++; | ||
286 | 198 | if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ | ||
287 | 199 | pthread_cond_broadcast(&(*list_it)->cond); | ||
288 | 200 | fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); | ||
289 | 201 | } | ||
290 | 202 | pthread_mutex_unlock(&(*list_it)->lock); | ||
291 | 203 | } | ||
292 | 204 | } | ||
293 | 205 | } | ||
294 | 206 | return false; | ||
295 | 207 | } | ||
296 | 208 | |||
297 | 209 | |||
298 | 210 | bool EventNotify::afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data) | ||
299 | 211 | { | ||
300 | 212 | std::string str = "sys_replication_log"; | ||
301 | 213 | std::map<std::string, list<event_t*> >::iterator map_it; | ||
302 | 214 | boost::mutex::scoped_lock scoped(all_events_mutex); | ||
303 | 215 | map_it = all_events.find(str); | ||
304 | 216 | |||
305 | 217 | if(map_it != all_events.end()){ | ||
306 | 218 | for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ | ||
307 | 219 | if((*list_it)->event_type.compare("INSERT") == 0){ | ||
308 | 220 | pthread_mutex_lock(&(*list_it)->lock); | ||
309 | 221 | (*list_it)->num_of_events_so_far++; | ||
310 | 222 | if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ | ||
311 | 223 | pthread_cond_broadcast(&(*list_it)->cond); | ||
312 | 224 | fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); | ||
313 | 225 | } | ||
314 | 226 | pthread_mutex_unlock(&(*list_it)->lock); | ||
315 | 227 | } | ||
316 | 228 | } | ||
317 | 229 | } | ||
318 | 230 | return false; | ||
319 | 231 | } | ||
320 | 232 | |||
321 | 233 | |||
322 | 234 | class Item_func_event_notify : public Item_str_func | ||
323 | 235 | { | ||
324 | 236 | public: | ||
325 | 237 | Item_func_event_notify() : Item_str_func() {} | ||
326 | 238 | const char *func_name() const { return "wait_for"; } | ||
327 | 239 | |||
328 | 240 | String *val_str(String* s) | ||
329 | 241 | { | ||
330 | 242 | String *table_name = args[0]->val_str(s); | ||
331 | 243 | event_t *new_event = new event_t(args[0]->val_str(s),(args[1]->val_str(s)), (args[2]->val_str(s)), (args[3]->val_str(s))); | ||
332 | 244 | |||
333 | 245 | map<std::string, list<event_t*> >::iterator map_it; | ||
334 | 246 | list<event_t*>::iterator list_it; | ||
335 | 247 | |||
336 | 248 | { | ||
337 | 249 | boost::mutex::scoped_lock scoped(event_notify->all_events_mutex); | ||
338 | 250 | map_it = event_notify->all_events.find(new_event->table_name); | ||
339 | 251 | |||
340 | 252 | if(map_it == event_notify->all_events.end()){ | ||
341 | 253 | list<event_t*> my_list; | ||
342 | 254 | my_list.push_back(new_event); | ||
343 | 255 | event_notify->all_events[new_event->table_name] = my_list; | ||
344 | 256 | } | ||
345 | 257 | else{ | ||
346 | 258 | for (list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ | ||
347 | 259 | if((*list_it)->event_type.compare(new_event->event_type) == 0 ) { | ||
348 | 260 | if( new_event->num_of_events_threshold <= (*list_it)->num_of_events_so_far){ | ||
349 | 261 | //int retval = new_event->num_of_events_threshold; | ||
350 | 262 | delete new_event; | ||
351 | 263 | return table_name; | ||
352 | 264 | } | ||
353 | 265 | new_event->num_of_events_so_far = (*list_it)->num_of_events_so_far; | ||
354 | 266 | break; | ||
355 | 267 | } | ||
356 | 268 | } | ||
357 | 269 | map_it->second.push_back(new_event); | ||
358 | 270 | } | ||
359 | 271 | } | ||
360 | 272 | /* | ||
361 | 273 | * waiting for number of events reach its threshold. | ||
362 | 274 | */ | ||
363 | 275 | struct timespec timeToWait; | ||
364 | 276 | struct timeval now; | ||
365 | 277 | gettimeofday(&now,NULL); | ||
366 | 278 | timeToWait.tv_sec = now.tv_sec+new_event->time_out; | ||
367 | 279 | pthread_mutex_lock(&new_event->lock); | ||
368 | 280 | //while(new_event->num_of_events_so_far < new_event->num_of_events_threshold ) | ||
369 | 281 | fprintf(stderr, "wait %p\n", (void*)&new_event->cond); | ||
370 | 282 | pthread_cond_timedwait(&new_event->cond, &new_event->lock, &timeToWait); | ||
371 | 283 | fprintf(stderr, "SIGNALLED %p\n", (void*)&new_event->cond); | ||
372 | 284 | |||
373 | 285 | pthread_mutex_unlock(&new_event->lock); | ||
374 | 286 | int count_events = 0; | ||
375 | 287 | { | ||
376 | 288 | boost::mutex::scoped_lock scoped(event_notify->all_events_mutex); | ||
377 | 289 | map<std::string, list<event_t*> >::iterator map_iter; | ||
378 | 290 | list<event_t*>::iterator list_iter; | ||
379 | 291 | map_iter = event_notify->all_events.find(new_event->table_name); | ||
380 | 292 | for (list_iter=map_iter->second.begin(); list_iter!=map_iter->second.end(); ++list_iter){ | ||
381 | 293 | if((*list_iter)->event_type.compare(new_event->event_type) == 0 ){ | ||
382 | 294 | count_events++; | ||
383 | 295 | } | ||
384 | 296 | } | ||
385 | 297 | if(count_events > 1){ | ||
386 | 298 | map_iter->second.remove_if(AddressIs(new_event)); | ||
387 | 299 | delete new_event; | ||
388 | 300 | } | ||
389 | 301 | } | ||
390 | 302 | return table_name; | ||
391 | 303 | } | ||
392 | 304 | |||
393 | 305 | void fix_length_and_dec() { | ||
394 | 306 | max_length=strlen("wait_for"); | ||
395 | 307 | } | ||
396 | 308 | |||
397 | 309 | bool check_argument_count(int n) | ||
398 | 310 | { | ||
399 | 311 | return (n == 4); | ||
400 | 312 | } | ||
401 | 313 | }; | ||
402 | 314 | |||
403 | 315 | plugin::Create_function<Item_func_event_notify> *event_notify_udf= NULL; | ||
404 | 316 | |||
405 | 317 | static void init_options(drizzled::module::option_context &) | ||
406 | 318 | { | ||
407 | 319 | event_notify = new EventNotify(true); | ||
408 | 320 | } | ||
409 | 321 | |||
410 | 322 | static int event_notifier_plugin_init(drizzled::module::Context &context) | ||
411 | 323 | { | ||
412 | 324 | |||
413 | 325 | event_notify_udf= | ||
414 | 326 | new plugin::Create_function<Item_func_event_notify>("wait_for"); | ||
415 | 327 | context.add(event_notify_udf); | ||
416 | 328 | context.add(event_notify); | ||
417 | 329 | context.registerVariable(new sys_var_bool_ptr("enabled", &event_notify->sysvar_enabled)); | ||
418 | 330 | return 0; | ||
419 | 331 | } | ||
420 | 332 | |||
421 | 333 | |||
422 | 334 | DRIZZLE_DECLARE_PLUGIN | ||
423 | 335 | { | ||
424 | 336 | DRIZZLE_VERSION_ID, /* DRIZZLE_VERSION_ID */ | ||
425 | 337 | "Event_Notifier", /* module name */ | ||
426 | 338 | "1.0", /* module version */ | ||
427 | 339 | "Ajaya Agrawal", /* author(s) */ | ||
428 | 340 | N_("indicates an event happening on certain table/database/session"), /* description */ | ||
429 | 341 | PLUGIN_LICENSE_BSD, /* license */ | ||
430 | 342 | event_notifier_plugin_init, /* init module function */ | ||
431 | 343 | NULL, /* module dependencies */ | ||
432 | 344 | init_options /* init options function */ | ||
433 | 345 | } | ||
434 | 346 | DRIZZLE_DECLARE_PLUGIN_END; | ||
435 | 0 | \ No newline at end of file | 347 | \ No newline at end of file |
436 | 1 | 348 | ||
437 | === added file 'plugin/event_notify/event_notify.h' | |||
438 | --- plugin/event_notify/event_notify.h 1970-01-01 00:00:00 +0000 | |||
439 | +++ plugin/event_notify/event_notify.h 2013-09-18 20:41:29 +0000 | |||
440 | @@ -0,0 +1,81 @@ | |||
441 | 1 | /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*- | ||
442 | 2 | * vim:expandtab:shiftwidth=2:tabstop=2:smarttab: | ||
443 | 3 | * | ||
444 | 4 | * Copyright 2013 Ajaya Agrawal | ||
445 | 5 | * | ||
446 | 6 | * This program is free software: you can redistribute it and/or modify | ||
447 | 7 | * it under the terms of the GNU General Public License as published by | ||
448 | 8 | * the Free Software Foundation, either version 3 of the License, or | ||
449 | 9 | * (at your option) any later version. | ||
450 | 10 | * | ||
451 | 11 | * This program is distributed in the hope that it will be useful, | ||
452 | 12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
453 | 13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
454 | 14 | * GNU General Public License for more details. | ||
455 | 15 | * | ||
456 | 16 | * You should have received a copy of the GNU General Public License | ||
457 | 17 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
458 | 18 | */ | ||
459 | 19 | |||
460 | 20 | #pragma once | ||
461 | 21 | |||
462 | 22 | #include <drizzled/plugin/event_observer.h> | ||
463 | 23 | #include <boost/thread/mutex.hpp> | ||
464 | 24 | |||
465 | 25 | #include <map> | ||
466 | 26 | #include <list> | ||
467 | 27 | #include <pthread.h> | ||
468 | 28 | #include <string> | ||
469 | 29 | #include <cstring> | ||
470 | 30 | |||
471 | 31 | namespace drizzled { | ||
472 | 32 | namespace plugin { | ||
473 | 33 | |||
474 | 34 | struct cmp_str | ||
475 | 35 | { | ||
476 | 36 | bool operator()(std::string a, std::string b) | ||
477 | 37 | { | ||
478 | 38 | return (a.compare(b) < 0); | ||
479 | 39 | } | ||
480 | 40 | }; | ||
481 | 41 | |||
482 | 42 | class event_t { | ||
483 | 43 | |||
484 | 44 | public: | ||
485 | 45 | event_t(String *_session_time, String *_event_type, String *_table_name, String *_num_of_events); | ||
486 | 46 | ~event_t(); | ||
487 | 47 | std::string table_name; | ||
488 | 48 | std::string event_type; | ||
489 | 49 | pthread_mutex_t lock; | ||
490 | 50 | pthread_cond_t cond; | ||
491 | 51 | int num_of_events_threshold; | ||
492 | 52 | double time_out; | ||
493 | 53 | int num_of_events_so_far; | ||
494 | 54 | }; | ||
495 | 55 | |||
496 | 56 | class EventNotify : public drizzled::plugin::EventObserver | ||
497 | 57 | { | ||
498 | 58 | public: | ||
499 | 59 | EventNotify(bool enabled); | ||
500 | 60 | ~EventNotify(); | ||
501 | 61 | |||
502 | 62 | void registerTableEventsDo(TableShare &table_share, EventObserverList &observers); | ||
503 | 63 | void registerSessionEventsDo(Session &session, EventObserverList &observers); | ||
504 | 64 | bool observeEventDo(EventData &); | ||
505 | 65 | bool afterStatement(AfterStatementEventData &data); | ||
506 | 66 | bool afterInsertRecord(AfterInsertRecordEventData &data); | ||
507 | 67 | bool afterUpdateRecord(AfterUpdateRecordEventData &data); | ||
508 | 68 | bool afterDeleteRecord(AfterDeleteRecordEventData &data); | ||
509 | 69 | bool afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data); | ||
510 | 70 | /** | ||
511 | 71 | * These are the event_notify system variables. So sysvar_enabled is | ||
512 | 72 | * event_notify_enabled in SHOW VARIABLES, etc. They are all global and dynamic. | ||
513 | 73 | */ | ||
514 | 74 | bool sysvar_enabled; | ||
515 | 75 | boost::mutex all_events_mutex; | ||
516 | 76 | boost::mutex counter_mutex; | ||
517 | 77 | std::map <std::string, std::list<event_t*>, cmp_str > all_events; | ||
518 | 78 | }; | ||
519 | 79 | |||
520 | 80 | } /* namespace plugin */ | ||
521 | 81 | } /* namespace drizzled */ | ||
522 | 0 | 82 | ||
523 | === added file 'plugin/event_notify/plugin.ini' | |||
524 | --- plugin/event_notify/plugin.ini 1970-01-01 00:00:00 +0000 | |||
525 | +++ plugin/event_notify/plugin.ini 2013-09-18 20:41:29 +0000 | |||
526 | @@ -0,0 +1,3 @@ | |||
527 | 1 | [plugin] | ||
528 | 2 | sources=event_notify.cc | ||
529 | 3 | headers=event_notify.h | ||
530 | 0 | 4 | ||
531 | === added directory 'plugin/event_notify/tests' | |||
532 | === added directory 'plugin/event_notify/tests/r' | |||
533 | === added file 'plugin/event_notify/tests/r/delete.result' | |||
534 | --- plugin/event_notify/tests/r/delete.result 1970-01-01 00:00:00 +0000 | |||
535 | +++ plugin/event_notify/tests/r/delete.result 2013-09-18 20:41:29 +0000 | |||
536 | @@ -0,0 +1,13 @@ | |||
537 | 1 | drop table if exists t1; | ||
538 | 2 | create table t1 (a int primary key); | ||
539 | 3 | select wait_for("t1", "DELETE", "200.9", "2"); | ||
540 | 4 | insert into t1 values(1),(2),(3); | ||
541 | 5 | delete from t1 where a=1; | ||
542 | 6 | delete from t1 where a=2; | ||
543 | 7 | delete from t1 where a=3; | ||
544 | 8 | wait_for("t1", "DELETE", "200.9", "2") | ||
545 | 9 | t1 | ||
546 | 10 | select wait_for("t1", "DELETE", "1000", "3"); | ||
547 | 11 | wait_for("t1", "DELETE", "1000", "3") | ||
548 | 12 | t1 | ||
549 | 13 | drop table t1; | ||
550 | 0 | 14 | ||
551 | === added file 'plugin/event_notify/tests/r/insert.result' | |||
552 | --- plugin/event_notify/tests/r/insert.result 1970-01-01 00:00:00 +0000 | |||
553 | +++ plugin/event_notify/tests/r/insert.result 2013-09-18 20:41:29 +0000 | |||
554 | @@ -0,0 +1,18 @@ | |||
555 | 1 | drop table if exists t1; | ||
556 | 2 | create table t1 (a int primary key); | ||
557 | 3 | select wait_for("t1", "INSERT", "100", "4"); | ||
558 | 4 | insert into t1 values(1); | ||
559 | 5 | insert into t1 values(2); | ||
560 | 6 | insert into t1 values(3); | ||
561 | 7 | insert into t1 values(4); | ||
562 | 8 | wait_for("t1", "INSERT", "100", "4") | ||
563 | 9 | t1 | ||
564 | 10 | select wait_for("t1", "INSERT", "100", "5"); | ||
565 | 11 | insert into t1 values(5); | ||
566 | 12 | wait_for("t1", "INSERT", "100", "5") | ||
567 | 13 | t1 | ||
568 | 14 | insert into t1 values(60); | ||
569 | 15 | select wait_for("t1", "INSERT", "100", "6"); | ||
570 | 16 | wait_for("t1", "INSERT", "100", "6") | ||
571 | 17 | t1 | ||
572 | 18 | drop table t1; | ||
573 | 0 | 19 | ||
574 | === added file 'plugin/event_notify/tests/r/sys_replication_log.result' | |||
575 | --- plugin/event_notify/tests/r/sys_replication_log.result 1970-01-01 00:00:00 +0000 | |||
576 | +++ plugin/event_notify/tests/r/sys_replication_log.result 2013-09-18 20:41:29 +0000 | |||
577 | @@ -0,0 +1,12 @@ | |||
578 | 1 | drop table if exists t1; | ||
579 | 2 | create table t1 (a int primary key); | ||
580 | 3 | select wait_for("sys_replication_log", "INSERT", "200.9", "1"); | ||
581 | 4 | insert into t1 values(1); | ||
582 | 5 | insert into t1 values(2); | ||
583 | 6 | insert into t1 values(3); | ||
584 | 7 | wait_for("sys_replication_log", "INSERT", "200.9", "1") | ||
585 | 8 | sys_replication_log | ||
586 | 9 | select wait_for("sys_replication_log", "INSERT", "100", "3"); | ||
587 | 10 | wait_for("sys_replication_log", "INSERT", "100", "3") | ||
588 | 11 | sys_replication_log | ||
589 | 12 | drop table t1; | ||
590 | 0 | 13 | ||
591 | === added file 'plugin/event_notify/tests/r/time_out.result' | |||
592 | --- plugin/event_notify/tests/r/time_out.result 1970-01-01 00:00:00 +0000 | |||
593 | +++ plugin/event_notify/tests/r/time_out.result 2013-09-18 20:41:29 +0000 | |||
594 | @@ -0,0 +1,3 @@ | |||
595 | 1 | select wait_for("t1", "INSERT", "5", "20"); | ||
596 | 2 | wait_for("t1", "INSERT", "5", "20") | ||
597 | 3 | t1 | ||
598 | 0 | 4 | ||
599 | === added file 'plugin/event_notify/tests/r/update.result' | |||
600 | --- plugin/event_notify/tests/r/update.result 1970-01-01 00:00:00 +0000 | |||
601 | +++ plugin/event_notify/tests/r/update.result 2013-09-18 20:41:29 +0000 | |||
602 | @@ -0,0 +1,13 @@ | |||
603 | 1 | drop table if exists t1; | ||
604 | 2 | create table t1 (a int primary key); | ||
605 | 3 | select wait_for("t1", "UPDATE", "200.9", "1"); | ||
606 | 4 | insert into t1 values(1),(2),(3); | ||
607 | 5 | update t1 set a=10 where a=1; | ||
608 | 6 | update t1 set a=20 where a=2; | ||
609 | 7 | update t1 set a=30 where a=3; | ||
610 | 8 | wait_for("t1", "UPDATE", "200.9", "1") | ||
611 | 9 | t1 | ||
612 | 10 | select wait_for("t1", "UPDATE", "1000", "2"); | ||
613 | 11 | wait_for("t1", "UPDATE", "1000", "2") | ||
614 | 12 | t1 | ||
615 | 13 | drop table t1; | ||
616 | 0 | 14 | ||
617 | === added directory 'plugin/event_notify/tests/t' | |||
618 | === added file 'plugin/event_notify/tests/t/delete.test' | |||
619 | --- plugin/event_notify/tests/t/delete.test 1970-01-01 00:00:00 +0000 | |||
620 | +++ plugin/event_notify/tests/t/delete.test 2013-09-18 20:41:29 +0000 | |||
621 | @@ -0,0 +1,21 @@ | |||
622 | 1 | connect (con1,localhost,root,,); | ||
623 | 2 | --disable_warnings | ||
624 | 3 | drop table if exists t1; | ||
625 | 4 | --enable_warnings | ||
626 | 5 | create table t1 (a int primary key); | ||
627 | 6 | connection con1; | ||
628 | 7 | send select wait_for("t1", "DELETE", "200.9", "2"); | ||
629 | 8 | connection default; | ||
630 | 9 | insert into t1 values(1),(2),(3); | ||
631 | 10 | delete from t1 where a=1; | ||
632 | 11 | delete from t1 where a=2; | ||
633 | 12 | delete from t1 where a=3; | ||
634 | 13 | connection con1; | ||
635 | 14 | reap; | ||
636 | 15 | send select wait_for("t1", "DELETE", "1000", "3"); | ||
637 | 16 | connection default; | ||
638 | 17 | connection con1; | ||
639 | 18 | reap; | ||
640 | 19 | disconnect con1; | ||
641 | 20 | connection default; | ||
642 | 21 | drop table t1; | ||
643 | 0 | 22 | ||
644 | === added file 'plugin/event_notify/tests/t/insert.test' | |||
645 | --- plugin/event_notify/tests/t/insert.test 1970-01-01 00:00:00 +0000 | |||
646 | +++ plugin/event_notify/tests/t/insert.test 2013-09-18 20:41:29 +0000 | |||
647 | @@ -0,0 +1,28 @@ | |||
648 | 1 | connect (con1,localhost,root,,); | ||
649 | 2 | connect (con2,localhost,root,,); | ||
650 | 3 | --disable_warnings | ||
651 | 4 | drop table if exists t1; | ||
652 | 5 | --enable_warnings | ||
653 | 6 | create table t1 (a int primary key); | ||
654 | 7 | connection con1; | ||
655 | 8 | send select wait_for("t1", "INSERT", "100", "4"); | ||
656 | 9 | connection default; | ||
657 | 10 | insert into t1 values(1); | ||
658 | 11 | insert into t1 values(2); | ||
659 | 12 | insert into t1 values(3); | ||
660 | 13 | insert into t1 values(4); | ||
661 | 14 | connection con1; | ||
662 | 15 | reap; | ||
663 | 16 | connection con2; | ||
664 | 17 | send select wait_for("t1", "INSERT", "100", "5"); | ||
665 | 18 | connection default; | ||
666 | 19 | insert into t1 values(5); | ||
667 | 20 | connection con2; | ||
668 | 21 | reap; | ||
669 | 22 | insert into t1 values(60); | ||
670 | 23 | send select wait_for("t1", "INSERT", "100", "6"); | ||
671 | 24 | reap; | ||
672 | 25 | disconnect con1; | ||
673 | 26 | disconnect con2; | ||
674 | 27 | connection default; | ||
675 | 28 | drop table t1; | ||
676 | 0 | 29 | ||
677 | === added file 'plugin/event_notify/tests/t/master.opt' | |||
678 | --- plugin/event_notify/tests/t/master.opt 1970-01-01 00:00:00 +0000 | |||
679 | +++ plugin/event_notify/tests/t/master.opt 2013-09-18 20:41:29 +0000 | |||
680 | @@ -0,0 +1,1 @@ | |||
681 | 1 | --plugin-add=event_notify --innodb.replication-log | ||
682 | 0 | 2 | ||
683 | === added file 'plugin/event_notify/tests/t/sys_replication_log.test' | |||
684 | --- plugin/event_notify/tests/t/sys_replication_log.test 1970-01-01 00:00:00 +0000 | |||
685 | +++ plugin/event_notify/tests/t/sys_replication_log.test 2013-09-18 20:41:29 +0000 | |||
686 | @@ -0,0 +1,20 @@ | |||
687 | 1 | connect (con1,localhost,root,,); | ||
688 | 2 | --disable_warnings | ||
689 | 3 | drop table if exists t1; | ||
690 | 4 | --enable_warnings | ||
691 | 5 | create table t1 (a int primary key); | ||
692 | 6 | connection con1; | ||
693 | 7 | send select wait_for("sys_replication_log", "INSERT", "200.9", "1"); | ||
694 | 8 | connection default; | ||
695 | 9 | insert into t1 values(1); | ||
696 | 10 | insert into t1 values(2); | ||
697 | 11 | insert into t1 values(3); | ||
698 | 12 | connection con1; | ||
699 | 13 | reap; | ||
700 | 14 | send select wait_for("sys_replication_log", "INSERT", "100", "3"); | ||
701 | 15 | connection default; | ||
702 | 16 | connection con1; | ||
703 | 17 | reap; | ||
704 | 18 | disconnect con1; | ||
705 | 19 | connection default; | ||
706 | 20 | drop table t1; | ||
707 | 0 | 21 | ||
708 | === added file 'plugin/event_notify/tests/t/time_out.test' | |||
709 | --- plugin/event_notify/tests/t/time_out.test 1970-01-01 00:00:00 +0000 | |||
710 | +++ plugin/event_notify/tests/t/time_out.test 2013-09-18 20:41:29 +0000 | |||
711 | @@ -0,0 +1,2 @@ | |||
712 | 1 | send select wait_for("t1", "INSERT", "5", "20"); | ||
713 | 2 | reap; | ||
714 | 0 | 3 | ||
715 | === added file 'plugin/event_notify/tests/t/update.test' | |||
716 | --- plugin/event_notify/tests/t/update.test 1970-01-01 00:00:00 +0000 | |||
717 | +++ plugin/event_notify/tests/t/update.test 2013-09-18 20:41:29 +0000 | |||
718 | @@ -0,0 +1,21 @@ | |||
719 | 1 | connect (con1,localhost,root,,); | ||
720 | 2 | --disable_warnings | ||
721 | 3 | drop table if exists t1; | ||
722 | 4 | --enable_warnings | ||
723 | 5 | create table t1 (a int primary key); | ||
724 | 6 | connection con1; | ||
725 | 7 | send select wait_for("t1", "UPDATE", "200.9", "1"); | ||
726 | 8 | connection default; | ||
727 | 9 | insert into t1 values(1),(2),(3); | ||
728 | 10 | update t1 set a=10 where a=1; | ||
729 | 11 | update t1 set a=20 where a=2; | ||
730 | 12 | update t1 set a=30 where a=3; | ||
731 | 13 | connection con1; | ||
732 | 14 | reap; | ||
733 | 15 | send select wait_for("t1", "UPDATE", "1000", "2"); | ||
734 | 16 | connection default; | ||
735 | 17 | connection con1; | ||
736 | 18 | reap; | ||
737 | 19 | disconnect con1; | ||
738 | 20 | connection default; | ||
739 | 21 | drop table t1; | ||
740 | 0 | 22 | ||
741 | === modified file 'plugin/slave/queue_producer.cc' | |||
742 | --- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000 | |||
743 | +++ plugin/slave/queue_producer.cc 2013-09-18 20:41:29 +0000 | |||
744 | @@ -49,6 +49,62 @@ | |||
745 | 49 | return reconnect(true); | 49 | return reconnect(true); |
746 | 50 | } | 50 | } |
747 | 51 | 51 | ||
748 | 52 | void QueueProducer::run() | ||
749 | 53 | { | ||
750 | 54 | boost::posix_time::seconds duration(getSleepInterval()); | ||
751 | 55 | |||
752 | 56 | /* thread setup needed to do things like create a Session */ | ||
753 | 57 | internal::my_thread_init(); | ||
754 | 58 | |||
755 | 59 | if (not init()) | ||
756 | 60 | return; | ||
757 | 61 | |||
758 | 62 | while (1) | ||
759 | 63 | { | ||
760 | 64 | { | ||
761 | 65 | /* This uninterruptable block processes the message queue */ | ||
762 | 66 | boost::this_thread::disable_interruption di; | ||
763 | 67 | |||
764 | 68 | if (not process()) | ||
765 | 69 | { | ||
766 | 70 | shutdown(); | ||
767 | 71 | return; | ||
768 | 72 | } | ||
769 | 73 | } | ||
770 | 74 | |||
771 | 75 | /* Interruptable only when not doing work (aka, sleeping) */ | ||
772 | 76 | try | ||
773 | 77 | { | ||
774 | 78 | /* waiting for num_of_events through a wait_for() call*/ | ||
775 | 79 | int num_of_events = 2; | ||
776 | 80 | std::string event_sql("select wait_for(\"sys_replication_log\", \"INSERT\", \"2000\", \""); | ||
777 | 81 | event_sql.append(boost::lexical_cast<string>(num_of_events)); | ||
778 | 82 | event_sql.append("\")"); | ||
779 | 83 | drizzle_return_t event_ret; | ||
780 | 84 | drizzle_result_st event_result; | ||
781 | 85 | drizzle_query_str(_connection, &event_result, event_sql.c_str(), &event_ret); | ||
782 | 86 | |||
783 | 87 | if (event_ret != DRIZZLE_RETURN_OK){ | ||
784 | 88 | drizzle_result_free(&event_result); | ||
785 | 89 | boost::this_thread::sleep(duration); | ||
786 | 90 | string error = ""; | ||
787 | 91 | error.append(drizzle_error(_drizzle)); | ||
788 | 92 | cout << error <<": producer thread sleeping\n"; | ||
789 | 93 | } | ||
790 | 94 | else{ | ||
791 | 95 | num_of_events += 1; | ||
792 | 96 | cout << "wait_for returned\n"; | ||
793 | 97 | drizzle_result_free(&event_result); | ||
794 | 98 | } | ||
795 | 99 | } | ||
796 | 100 | catch (boost::thread_interrupted &) | ||
797 | 101 | { | ||
798 | 102 | return; | ||
799 | 103 | } | ||
800 | 104 | } | ||
801 | 105 | } | ||
802 | 106 | |||
803 | 107 | |||
804 | 52 | bool QueueProducer::process() | 108 | bool QueueProducer::process() |
805 | 53 | { | 109 | { |
806 | 54 | if (_saved_max_commit_id == 0) | 110 | if (_saved_max_commit_id == 0) |
807 | 55 | 111 | ||
808 | === modified file 'plugin/slave/queue_thread.h' | |||
809 | --- plugin/slave/queue_thread.h 2011-03-14 05:40:28 +0000 | |||
810 | +++ plugin/slave/queue_thread.h 2013-09-18 20:41:29 +0000 | |||
811 | @@ -40,7 +40,7 @@ | |||
812 | 40 | virtual ~QueueThread() | 40 | virtual ~QueueThread() |
813 | 41 | {} | 41 | {} |
814 | 42 | 42 | ||
816 | 43 | void run(void); | 43 | virtual void run(void); |
817 | 44 | 44 | ||
818 | 45 | /** | 45 | /** |
819 | 46 | * Do any initialization work. | 46 | * Do any initialization work. |