Merge lp://staging/~trond-norbye/libmemcached/mget_execute into lp://staging/~tangent-org/libmemcached/trunk

Proposed by Trond Norbye
Status: Merged
Merged at revision: not available
Proposed branch: lp://staging/~trond-norbye/libmemcached/mget_execute
Merge into: lp://staging/~tangent-org/libmemcached/trunk
Diff against target: 1024 lines
12 files modified
docs/Makefile.am (+8/-0)
docs/memcached_get.pod (+39/-3)
libmemcached/common.h (+2/-1)
libmemcached/memcached.h (+1/-0)
libmemcached/memcached_connect.c (+19/-21)
libmemcached/memcached_fetch.c (+1/-1)
libmemcached/memcached_get.c (+99/-73)
libmemcached/memcached_get.h (+25/-6)
libmemcached/memcached_io.c (+109/-20)
libmemcached/memcached_key.c (+2/-3)
libmemcached/memcached_types.h (+9/-3)
tests/function.c (+174/-0)
To merge this branch: bzr merge lp://staging/~trond-norbye/libmemcached/mget_execute
Reviewer Review Type Date Requested Status
Libmemcached-developers Pending
Review via email: mp+13297@code.staging.launchpad.net
To post a comment you must log in.
Revision history for this message
Trond Norbye (trond-norbye) wrote :

Implemented memcached_mget_execute, the way to do really large mgets with the binary protocol :-)

606. By Trond Norbye

Bug #447237: const-correctness in interface

607. By Trond Norbye

Fix compile warnings generated by gcc 4.4.1

608. By Trond Norbye

Fix problems with multigets and replication

The current code could go out of sync and send multiple "NOOP" to the same
server depending on the replica number to get, and the fetch-code isn't
capable of handling that. The memcached_mget_execute code also complcates this
because it could process some of the messages while receiving them so we
cannot reset the get sequence if we are able to send out the commands but
not the NOOP... To create a sane patch for this I disabled the buffered mode.
It will increase the number of packets sent to the server, but at least we
have a well defined behavior.

609. By Trond Norbye

mget with replication didn't work if you had a cache miss

610. By Trond Norbye

Refactor: memcached_mget_execute -> memcached_mget_execute_by_key

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'docs/Makefile.am'
2--- docs/Makefile.am 2009-09-26 07:47:01 +0000
3+++ docs/Makefile.am 2009-10-14 18:55:19 +0000
4@@ -74,6 +74,8 @@
5 memcached_increment_with_initial.3\
6 memcached_mget.3\
7 memcached_mget_by_key.3\
8+ memcached_mget_execute.3 \
9+ memcached_mget_execute_by_key.3 \
10 memcached_prepend.3\
11 memcached_prepend_by_key.3\
12 memcached_replace.3\
13@@ -212,6 +214,12 @@
14 memcached_mget_by_key.3: memcached_get.pod
15 ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_by_key.3
16
17+memcached_mget_execute.3: memcached_get.pod
18+ ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_execute.3
19+
20+memcached_mget_execute_by_key.3: memcached_get.pod
21+ ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_execute_by_key.3
22+
23 memcached_fetch.3: memcached_get.pod
24 ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_fetch.3
25
26
27=== modified file 'docs/memcached_get.pod'
28--- docs/memcached_get.pod 2009-07-15 19:31:14 +0000
29+++ docs/memcached_get.pod 2009-10-14 18:55:19 +0000
30@@ -1,6 +1,7 @@
31 =head1 NAME
32
33-memcached_get, memcached_mget, memcached_fetch - Get a value
34+memcached_get, memcached_mget, memcached_fetch, memcached_mget_execute,
35+memcached_mget_execute_by_key - Get a value
36
37 =head1 LIBRARY
38
39@@ -23,7 +24,8 @@
40
41 memcached_return
42 memcached_mget (memcached_st *ptr,
43- char **keys, size_t *key_length,
44+ const char * const *keys,
45+ const size_t *key_length,
46 size_t number_of_keys);
47 char *
48 memcached_get_by_key(memcached_st *ptr,
49@@ -36,7 +38,8 @@
50 memcached_return
51 memcached_mget_by_key(memcached_st *ptr,
52 const char *master_key, size_t master_key_length,
53- char **keys, size_t *key_length,
54+ const char * const *keys,
55+ const size_t *key_length,
56 size_t number_of_keys);
57
58 char *memcached_fetch (memcached_st *ptr,
59@@ -44,12 +47,35 @@
60 size_t *value_length,
61 uint32_t *flags,
62 memcached_return *error);
63+
64 memcached_return
65 memcached_fetch_execute(memcached_st *ptr,
66 memcached_return (*callback[])(memcached_st *ptr, memcached_result_st *result, void *context),
67 void *context,
68 unsigned int number_of_callbacks);
69
70+
71+ memcached_return
72+ memcached_mget_execute(memcached_st *ptr,
73+ const char * const *keys,
74+ const size_t *key_length,
75+ size_t number_of_keys,
76+ memcached_execute_function *callback,
77+ void *context,
78+ unsigned int number_of_callbacks);
79+
80+ memcached_return
81+ memcached_mget_execute_by_key(memcached_st *ptr,
82+ const char *master_key,
83+ size_t master_key_length,
84+ const char * const *keys,
85+ const size_t *key_length,
86+ size_t number_of_keys,
87+ memcached_execute_function *callback,
88+ void *context,
89+ unsigned int number_of_callbacks);
90+
91+
92 =head1 DESCRIPTION
93
94 memcached_get() is used to fetch an individual value from the server. You
95@@ -99,6 +125,16 @@
96 to each function call. In the future there will be an option to allow this
97 to be an array.
98
99+memcached_mget_execute() and memcached_mget_execute_by_key() is
100+similar to memcached_mget(), but it may trigger the supplied callbacks
101+with result sets while sending out the queries. If you try to perform
102+a really large multiget with memcached_mget() you may encounter a
103+deadlock in the OS kernel (we fail to write data to the socket because
104+the input buffer is full). memcached_mget_execute() solves this
105+problem by processing some of the results before continuing sending
106+out requests. Please note that this function is only available in the
107+binary protocol.
108+
109 memcached_get_by_key() and memcached_mget_by_key() behave in a similar nature
110 as memcached_get() and memcached_mget(). The difference is that they take
111 a master key that is used for determining which server an object was stored
112
113=== modified file 'libmemcached/common.h'
114--- libmemcached/common.h 2009-09-17 12:39:18 +0000
115+++ libmemcached/common.h 2009-10-14 18:55:19 +0000
116@@ -136,7 +136,8 @@
117 void server_list_free(memcached_st *ptr, memcached_server_st *servers);
118
119 LIBMEMCACHED_LOCAL
120-memcached_return memcached_key_test(const char **keys, size_t *key_length,
121+memcached_return memcached_key_test(const char * const *keys,
122+ const size_t *key_length,
123 size_t number_of_keys);
124
125
126
127=== modified file 'libmemcached/memcached.h'
128--- libmemcached/memcached.h 2009-10-13 15:24:14 +0000
129+++ libmemcached/memcached.h 2009-10-14 18:55:19 +0000
130@@ -113,6 +113,7 @@
131 memcached_trigger_delete_key delete_trigger;
132 char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
133 uint32_t number_of_replicas;
134+ memcached_callback_st *callbacks;
135 };
136
137 LIBMEMCACHED_API
138
139=== modified file 'libmemcached/memcached_connect.c'
140--- libmemcached/memcached_connect.c 2009-10-12 15:54:00 +0000
141+++ libmemcached/memcached_connect.c 2009-10-14 18:55:19 +0000
142@@ -121,16 +121,25 @@
143 WATCHPOINT_ASSERT(error == 0);
144 }
145
146- /* For the moment, not getting a nonblocking mode will not be fatal */
147- if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
148- {
149- int flags;
150+ /* libmemcached will always use nonblocking IO to avoid write deadlocks */
151+ int flags;
152
153+ do
154 flags= fcntl(ptr->fd, F_GETFL, 0);
155- unlikely (flags != -1)
156- {
157- (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
158- }
159+ while (flags == -1 && (errno == EINTR || errno == EAGAIN));
160+
161+ unlikely (flags == -1)
162+ return MEMCACHED_CONNECTION_FAILURE;
163+ else if ((flags & O_NONBLOCK) == 0)
164+ {
165+ int rval;
166+
167+ do
168+ rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
169+ while (rval == -1 && (errno == EINTR || errno == EAGAIN));
170+
171+ unlikely (rval == -1)
172+ return MEMCACHED_CONNECTION_FAILURE;
173 }
174
175 return MEMCACHED_SUCCESS;
176@@ -219,14 +228,6 @@
177
178 (void)set_socket_options(ptr);
179
180- int flags= 0;
181- if (ptr->root->connect_timeout)
182- {
183- flags= fcntl(ptr->fd, F_GETFL, 0);
184- if (flags != -1 && !(flags & O_NONBLOCK))
185- (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
186- }
187-
188 /* connect to server */
189 while (ptr->fd != -1 &&
190 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
191@@ -268,10 +269,6 @@
192
193 if (ptr->fd != -1)
194 {
195- /* restore flags */
196- if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0)
197- (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
198-
199 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
200 ptr->server_failure_counter= 0;
201 return MEMCACHED_SUCCESS;
202@@ -290,9 +287,10 @@
203 if (gettimeofday(&next_time, NULL) == 0)
204 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
205 }
206- ptr->server_failure_counter+= 1;
207+ ptr->server_failure_counter++;
208 if (ptr->cached_errno == 0)
209 return MEMCACHED_TIMEOUT;
210+
211 return MEMCACHED_ERRNO; /* The last error should be from connect() */
212 }
213
214
215=== modified file 'libmemcached/memcached_fetch.c'
216--- libmemcached/memcached_fetch.c 2009-05-19 20:32:02 +0000
217+++ libmemcached/memcached_fetch.c 2009-10-14 18:55:19 +0000
218@@ -64,7 +64,7 @@
219 return result;
220 else if (*error == MEMCACHED_END)
221 memcached_server_response_reset(server);
222- else
223+ else if (*error != MEMCACHED_NOTFOUND)
224 break;
225 }
226
227
228=== modified file 'libmemcached/memcached_get.c'
229--- libmemcached/memcached_get.c 2009-10-06 10:48:57 +0000
230+++ libmemcached/memcached_get.c 2009-10-14 18:55:19 +0000
231@@ -15,12 +15,12 @@
232 }
233
234 static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
235- const char *master_key,
236- size_t master_key_length,
237- const char **keys,
238- size_t *key_length,
239- size_t number_of_keys,
240- bool mget_mode);
241+ const char *master_key,
242+ size_t master_key_length,
243+ const char * const *keys,
244+ const size_t *key_length,
245+ size_t number_of_keys,
246+ bool mget_mode);
247
248 char *memcached_get_by_key(memcached_st *ptr,
249 const char *master_key,
250@@ -42,10 +42,9 @@
251 }
252
253 /* Request the key */
254- *error= memcached_mget_by_key_real(ptr,
255- master_key,
256- master_key_length,
257- (const char **)&key, &key_length, 1, false);
258+ *error= memcached_mget_by_key_real(ptr, master_key, master_key_length,
259+ (const char * const *)&key,
260+ &key_length, 1, false);
261
262 value= memcached_fetch(ptr, NULL, NULL,
263 value_length, flags, error);
264@@ -110,7 +109,8 @@
265 }
266
267 memcached_return memcached_mget(memcached_st *ptr,
268- const char **keys, size_t *key_length,
269+ const char * const *keys,
270+ const size_t *key_length,
271 size_t number_of_keys)
272 {
273 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
274@@ -119,17 +119,18 @@
275 static memcached_return binary_mget_by_key(memcached_st *ptr,
276 unsigned int master_server_key,
277 bool is_master_key_set,
278- const char **keys, size_t *key_length,
279+ const char * const *keys,
280+ const size_t *key_length,
281 size_t number_of_keys,
282 bool mget_mode);
283
284 static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
285- const char *master_key,
286- size_t master_key_length,
287- const char **keys,
288- size_t *key_length,
289- size_t number_of_keys,
290- bool mget_mode)
291+ const char *master_key,
292+ size_t master_key_length,
293+ const char * const *keys,
294+ const size_t *key_length,
295+ size_t number_of_keys,
296+ bool mget_mode)
297 {
298 unsigned int x;
299 memcached_return rc= MEMCACHED_NOTFOUND;
300@@ -155,7 +156,7 @@
301
302 if (master_key && master_key_length)
303 {
304- if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
305+ if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
306 return MEMCACHED_BAD_KEY_PROVIDED;
307 master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
308 is_master_key_set= true;
309@@ -269,18 +270,60 @@
310 memcached_return memcached_mget_by_key(memcached_st *ptr,
311 const char *master_key,
312 size_t master_key_length,
313- const char **keys,
314- size_t *key_length,
315+ const char * const *keys,
316+ const size_t *key_length,
317 size_t number_of_keys)
318 {
319 return memcached_mget_by_key_real(ptr, master_key, master_key_length, keys,
320 key_length, number_of_keys, true);
321 }
322
323+memcached_return memcached_mget_execute(memcached_st *ptr,
324+ const char * const *keys,
325+ const size_t *key_length,
326+ size_t number_of_keys,
327+ memcached_execute_function *callback,
328+ void *context,
329+ unsigned int number_of_callbacks)
330+{
331+ return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
332+ number_of_keys, callback,
333+ context, number_of_callbacks);
334+}
335+
336+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
337+ const char *master_key,
338+ size_t master_key_length,
339+ const char * const *keys,
340+ const size_t *key_length,
341+ size_t number_of_keys,
342+ memcached_execute_function *callback,
343+ void *context,
344+ unsigned int number_of_callbacks)
345+{
346+ if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0)
347+ return MEMCACHED_NOT_SUPPORTED;
348+
349+ memcached_return rc;
350+ memcached_callback_st *original_callbacks= ptr->callbacks;
351+ memcached_callback_st cb= {
352+ .callback= callback,
353+ .context= context,
354+ .number_of_callback= number_of_callbacks
355+ };
356+
357+ ptr->callbacks= &cb;
358+ rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
359+ key_length, number_of_keys);
360+ ptr->callbacks= original_callbacks;
361+ return rc;
362+}
363+
364 static memcached_return simple_binary_mget(memcached_st *ptr,
365 unsigned int master_server_key,
366 bool is_master_key_set,
367- const char **keys, size_t *key_length,
368+ const char * const *keys,
369+ const size_t *key_length,
370 size_t number_of_keys, bool mget_mode)
371 {
372 memcached_return rc= MEMCACHED_NOTFOUND;
373@@ -382,19 +425,19 @@
374 }
375
376 static memcached_return replication_binary_mget(memcached_st *ptr,
377- uint32_t* hash, bool* dead_servers,
378- const char **keys, size_t *key_length,
379- size_t number_of_keys, bool mget_mode)
380+ uint32_t* hash,
381+ bool* dead_servers,
382+ const char *const *keys,
383+ const size_t *key_length,
384+ size_t number_of_keys)
385 {
386 memcached_return rc= MEMCACHED_NOTFOUND;
387 uint32_t x;
388
389- int flush= number_of_keys == 1;
390-
391 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
392 {
393 bool success= true;
394-
395+
396 for (x= 0; x < number_of_keys; ++x)
397 {
398 if (hash[x] == ptr->number_of_hosts)
399@@ -419,58 +462,39 @@
400 }
401 }
402
403- protocol_binary_request_getk request= {.bytes= {0}};
404- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
405- if (mget_mode)
406- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
407- else
408- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
409-
410- request.message.header.request.keylen= htons((uint16_t)key_length[x]);
411- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
412- request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
413-
414+ protocol_binary_request_getk request= {
415+ .message.header.request= {
416+ .magic= PROTOCOL_BINARY_REQ,
417+ .opcode= PROTOCOL_BINARY_CMD_GETK,
418+ .keylen= htons((uint16_t)key_length[x]),
419+ .datatype= PROTOCOL_BINARY_RAW_BYTES,
420+ .bodylen= htonl((uint32_t)key_length[x])
421+ }
422+ };
423+
424+ /*
425+ * We need to disable buffering to actually know that the request was
426+ * successfully sent to the server (so that we should expect a result
427+ * back). It would be nice to do this in buffered mode, but then it
428+ * would be complex to handle all error situations if we got to send
429+ * some of the messages, and then we failed on writing out some others
430+ * and we used the callback interface from memcached_mget_execute so
431+ * that we might have processed some of the responses etc. For now,
432+ * just make sure we work _correctly_
433+ */
434 if ((memcached_io_write(&ptr->hosts[server], request.bytes,
435 sizeof(request.bytes), 0) == -1) ||
436 (memcached_io_write(&ptr->hosts[server], keys[x],
437- key_length[x], (char) flush) == -1))
438+ key_length[x], 1) == -1))
439 {
440 memcached_io_reset(&ptr->hosts[server]);
441 dead_servers[server]= true;
442 success= false;
443 continue;
444 }
445- /* we just want one pending response per server */
446- memcached_server_response_reset(&ptr->hosts[server]);
447+
448 memcached_server_response_increment(&ptr->hosts[server]);
449- }
450-
451- if (mget_mode)
452- {
453- /*
454- * Send a noop command to flush the buffers
455- */
456- protocol_binary_request_noop request= {.bytes= {0}};
457- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
458- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
459- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
460-
461- for (x= 0; x < ptr->number_of_hosts; x++)
462- if (memcached_server_response_count(&ptr->hosts[x]))
463- {
464- if (memcached_io_write(&ptr->hosts[x], request.bytes,
465- sizeof(request.bytes), 1) == -1)
466- {
467- memcached_io_reset(&ptr->hosts[x]);
468- dead_servers[x]= true;
469- success= false;
470- }
471-
472- /* mark all of the messages bound for this server as sent! */
473- for (x= 0; x < number_of_keys; ++x)
474- if (hash[x] == x)
475- hash[x]= ptr->number_of_hosts;
476- }
477+ hash[x]= ptr->number_of_hosts;
478 }
479
480 if (success)
481@@ -483,8 +507,10 @@
482 static memcached_return binary_mget_by_key(memcached_st *ptr,
483 unsigned int master_server_key,
484 bool is_master_key_set,
485- const char **keys, size_t *key_length,
486- size_t number_of_keys, bool mget_mode)
487+ const char * const *keys,
488+ const size_t *key_length,
489+ size_t number_of_keys,
490+ bool mget_mode)
491 {
492 memcached_return rc;
493
494@@ -516,7 +542,7 @@
495 hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
496
497 rc= replication_binary_mget(ptr, hash, dead_servers, keys,
498- key_length, number_of_keys, mget_mode);
499+ key_length, number_of_keys);
500
501 ptr->call_free(ptr, hash);
502 ptr->call_free(ptr, dead_servers);
503
504=== modified file 'libmemcached/memcached_get.h'
505--- libmemcached/memcached_get.h 2009-09-17 19:14:07 +0000
506+++ libmemcached/memcached_get.h 2009-10-14 18:55:19 +0000
507@@ -6,8 +6,8 @@
508 * Author: Brian Aker
509 */
510
511-#ifndef __MEMCACHED_GET_H__
512-#define __MEMCACHED_GET_H__
513+#ifndef LIBMEMCACHED_MEMCACHED_GET_H
514+#define LIBMEMCACHED_MEMCACHED_GET_H
515
516 #ifdef __cplusplus
517 extern "C" {
518@@ -23,7 +23,8 @@
519
520 LIBMEMCACHED_API
521 memcached_return memcached_mget(memcached_st *ptr,
522- const char **keys, size_t *key_length,
523+ const char * const *keys,
524+ const size_t *key_length,
525 size_t number_of_keys);
526
527 LIBMEMCACHED_API
528@@ -38,8 +39,8 @@
529 memcached_return memcached_mget_by_key(memcached_st *ptr,
530 const char *master_key, size_t
531 master_key_length,
532- const char **keys,
533- size_t *key_length,
534+ const char * const *keys,
535+ const size_t *key_length,
536 size_t number_of_keys);
537
538 LIBMEMCACHED_API
539@@ -53,10 +54,28 @@
540 memcached_result_st *result,
541 memcached_return *error);
542
543+LIBMEMCACHED_API
544+memcached_return memcached_mget_execute(memcached_st *ptr,
545+ const char * const *keys,
546+ const size_t *key_length,
547+ size_t number_of_keys,
548+ memcached_execute_function *callback,
549+ void *context,
550+ unsigned int number_of_callbacks);
551
552+LIBMEMCACHED_API
553+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
554+ const char *master_key,
555+ size_t master_key_length,
556+ const char * const *keys,
557+ const size_t *key_length,
558+ size_t number_of_keys,
559+ memcached_execute_function *callback,
560+ void *context,
561+ unsigned int number_of_callbacks);
562
563 #ifdef __cplusplus
564 }
565 #endif
566
567-#endif /* __MEMCACHED_GET_H__ */
568+#endif /* LIBMEMCACHED_MEMCACHED_GET_H */
569
570=== modified file 'libmemcached/memcached_io.c'
571--- libmemcached/memcached_io.c 2009-07-18 17:37:40 +0000
572+++ libmemcached/memcached_io.c 2009-10-14 18:55:19 +0000
573@@ -9,7 +9,7 @@
574
575 typedef enum {
576 MEM_READ,
577- MEM_WRITE,
578+ MEM_WRITE
579 } memc_read_or_write;
580
581 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
582@@ -18,18 +18,14 @@
583 static memcached_return io_wait(memcached_server_st *ptr,
584 memc_read_or_write read_or_write)
585 {
586- struct pollfd fds[1];
587- short flags= 0;
588+ struct pollfd fds= {
589+ .fd= ptr->fd,
590+ .events = POLLIN
591+ };
592 int error;
593
594- if (read_or_write == MEM_WRITE) /* write */
595- flags= POLLOUT;
596- else
597- flags= POLLIN;
598-
599- memset(&fds, 0, sizeof(struct pollfd));
600- fds[0].fd= ptr->fd;
601- fds[0].events= flags;
602+ unlikely (read_or_write == MEM_WRITE) /* write */
603+ fds.events= POLLOUT;
604
605 /*
606 ** We are going to block on write, but at least on Solaris we might block
607@@ -41,26 +37,109 @@
608 */
609 if (read_or_write == MEM_WRITE)
610 {
611- memcached_return rc=memcached_purge(ptr);
612+ memcached_return rc= memcached_purge(ptr);
613 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
614- return MEMCACHED_FAILURE;
615+ return MEMCACHED_FAILURE;
616 }
617
618- error= poll(fds, 1, ptr->root->poll_timeout);
619+ int timeout= ptr->root->poll_timeout;
620+ if ((ptr->root->flags & MEM_NO_BLOCK) == 0)
621+ timeout= -1;
622+
623+ error= poll(&fds, 1, timeout);
624
625 if (error == 1)
626 return MEMCACHED_SUCCESS;
627 else if (error == 0)
628- {
629 return MEMCACHED_TIMEOUT;
630- }
631
632 /* Imposssible for anything other then -1 */
633 WATCHPOINT_ASSERT(error == -1);
634 memcached_quit_server(ptr, 1);
635
636 return MEMCACHED_FAILURE;
637-
638+}
639+
640+/**
641+ * Try to fill the input buffer for a server with as much
642+ * data as possible.
643+ *
644+ * @param ptr the server to pack
645+ */
646+static bool repack_input_buffer(memcached_server_st *ptr)
647+{
648+ if (ptr->read_ptr != ptr->read_buffer)
649+ {
650+ /* Move all of the data to the beginning of the buffer so
651+ ** that we can fit more data into the buffer...
652+ */
653+ memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
654+ ptr->read_ptr= ptr->read_buffer;
655+ ptr->read_data_length= ptr->read_buffer_length;
656+ }
657+
658+ /* There is room in the buffer, try to fill it! */
659+ if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
660+ {
661+ /* Just try a single read to grab what's available */
662+ ssize_t nr= read(ptr->fd,
663+ ptr->read_ptr + ptr->read_data_length,
664+ MEMCACHED_MAX_BUFFER - ptr->read_data_length);
665+
666+ if (nr > 0)
667+ {
668+ ptr->read_data_length+= (size_t)nr;
669+ ptr->read_buffer_length+= (size_t)nr;
670+ return true;
671+ }
672+ }
673+ return false;
674+}
675+
676+/**
677+ * If the we have callbacks connected to this server structure
678+ * we may start process the input queue and fire the callbacks
679+ * for the incomming messages. This function is _only_ called
680+ * when the input buffer is full, so that we _know_ that we have
681+ * at least _one_ message to process.
682+ *
683+ * @param ptr the server to star processing iput messages for
684+ * @return true if we processed anything, false otherwise
685+ */
686+static bool process_input_buffer(memcached_server_st *ptr)
687+{
688+ /*
689+ ** We might be able to process some of the response messages if we
690+ ** have a callback set up
691+ */
692+ if (ptr->root->callbacks != NULL && (ptr->root->flags & MEM_USE_UDP) == 0)
693+ {
694+ /*
695+ * We might have responses... try to read them out and fire
696+ * callbacks
697+ */
698+ memcached_callback_st cb= *ptr->root->callbacks;
699+
700+ char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
701+ memcached_return error;
702+ error= memcached_response(ptr, buffer, sizeof(buffer),
703+ &ptr->root->result);
704+ if (error == MEMCACHED_SUCCESS)
705+ {
706+ for (unsigned int x= 0; x < cb.number_of_callback; x++)
707+ {
708+ error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context);
709+ if (error != MEMCACHED_SUCCESS)
710+ break;
711+ }
712+
713+ /* @todo what should I do with the error message??? */
714+ }
715+ /* @todo what should I do with other error messages?? */
716+ return true;
717+ }
718+
719+ return false;
720 }
721
722 #ifdef UNUSED
723@@ -385,6 +464,16 @@
724 continue;
725 case EAGAIN:
726 {
727+ /*
728+ * We may be blocked on write because the input buffer
729+ * is full. Let's check if we have room in our input
730+ * buffer for more data and retry the write before
731+ * waiting..
732+ */
733+ if (repack_input_buffer(ptr) ||
734+ process_input_buffer(ptr))
735+ continue;
736+
737 memcached_return rc;
738 rc= io_wait(ptr, MEM_WRITE);
739
740@@ -429,7 +518,7 @@
741 return (ssize_t) return_length;
742 }
743
744-/*
745+/*
746 Eventually we will just kill off the server with the problem.
747 */
748 void memcached_io_reset(memcached_server_st *ptr)
749@@ -439,7 +528,7 @@
750
751 /**
752 * Read a given number of bytes from the server and place it into a specific
753- * buffer. Reset the IO channel on this server if an error occurs.
754+ * buffer. Reset the IO channel on this server if an error occurs.
755 */
756 memcached_return memcached_safe_read(memcached_server_st *ptr,
757 void *dta,
758@@ -526,7 +615,7 @@
759 uint16_t cur_req= get_udp_datagram_request_id(header);
760 int msg_num= get_msg_num_from_request_id(cur_req);
761 int thread_id= get_thread_id_from_request_id(cur_req);
762-
763+
764 if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
765 msg_num= 0;
766
767
768=== modified file 'libmemcached/memcached_key.c'
769--- libmemcached/memcached_key.c 2009-07-15 18:26:30 +0000
770+++ libmemcached/memcached_key.c 2009-10-14 18:55:19 +0000
771@@ -1,6 +1,7 @@
772 #include "common.h"
773
774-memcached_return memcached_key_test(const char **keys, size_t *key_length,
775+memcached_return memcached_key_test(const char * const *keys,
776+ const size_t *key_length,
777 size_t number_of_keys)
778 {
779 uint32_t x;
780@@ -14,8 +15,6 @@
781 if (rc != MEMCACHED_SUCCESS)
782 return rc;
783
784-
785-
786 for (y= 0; y < *(key_length + x); y++)
787 {
788 if ((isgraph(keys[x][y])) == 0)
789
790=== modified file 'libmemcached/memcached_types.h'
791--- libmemcached/memcached_types.h 2009-09-17 19:14:07 +0000
792+++ libmemcached/memcached_types.h 2009-10-14 18:55:19 +0000
793@@ -6,8 +6,8 @@
794 * Author: Brian Aker
795 */
796
797-#ifndef __MEMCACHED_TYPES_H__
798-#define __MEMCACHED_TYPES_H__
799+#ifndef LIBMEMCACHED_MEMCACHED_TYPES_H
800+#define LIBMEMCACHED_MEMCACHED_TYPES_H
801
802 #ifdef __cplusplus
803 extern "C" {
804@@ -37,8 +37,14 @@
805 typedef memcached_return (*memcached_dump_func)(memcached_st *ptr,
806 const char *key, size_t key_length, void *context);
807
808+typedef struct {
809+ memcached_execute_function *callback;
810+ void *context;
811+ unsigned int number_of_callback;
812+} memcached_callback_st;
813+
814 #ifdef __cplusplus
815 }
816 #endif
817
818-#endif /* __MEMCACHED_TYPES_H__ */
819+#endif /* LIBMEMCACHED_MEMCACHED_TYPES_H */
820
821=== modified file 'tests/function.c'
822--- tests/function.c 2009-10-12 21:31:25 +0000
823+++ tests/function.c 2009-10-14 18:55:19 +0000
824@@ -1449,6 +1449,70 @@
825 return TEST_SUCCESS;
826 }
827
828+static test_return_t mget_execute(memcached_st *memc)
829+{
830+ bool binary= false;
831+ if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) != 0)
832+ binary= true;
833+
834+ /*
835+ * I only want to hit _one_ server so I know the number of requests I'm
836+ * sending in the pipeline.
837+ */
838+ uint32_t number_of_hosts= memc->number_of_hosts;
839+ memc->number_of_hosts= 1;
840+
841+ int max_keys= binary ? 20480 : 1;
842+
843+
844+ char **keys= calloc((size_t)max_keys, sizeof(char*));
845+ size_t *key_length=calloc((size_t)max_keys, sizeof(size_t));
846+
847+ /* First add all of the items.. */
848+ char blob[1024] = {0};
849+ memcached_return rc;
850+ for (int x= 0; x < max_keys; ++x)
851+ {
852+ char k[251];
853+ key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
854+ keys[x]= strdup(k);
855+ assert(keys[x] != NULL);
856+ rc= memcached_add(memc, keys[x], key_length[x], blob, sizeof(blob), 0, 0);
857+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
858+ }
859+
860+ /* Try to get all of them with a large multiget */
861+ unsigned int counter= 0;
862+ memcached_execute_function callbacks[1]= { [0]= &callback_counter };
863+ rc= memcached_mget_execute(memc, (const char**)keys, key_length,
864+ (size_t)max_keys, callbacks, &counter, 1);
865+
866+ if (binary)
867+ {
868+ assert(rc == MEMCACHED_SUCCESS);
869+
870+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
871+ assert(rc == MEMCACHED_END);
872+
873+ /* Verify that we got all of the items */
874+ assert(counter == (unsigned int)max_keys);
875+ }
876+ else
877+ {
878+ assert(rc == MEMCACHED_NOT_SUPPORTED);
879+ assert(counter == 0);
880+ }
881+
882+ /* Release all allocated resources */
883+ for (int x= 0; x < max_keys; ++x)
884+ free(keys[x]);
885+ free(keys);
886+ free(key_length);
887+
888+ memc->number_of_hosts= number_of_hosts;
889+ return TEST_SUCCESS;
890+}
891+
892 static test_return_t get_stats_keys(memcached_st *memc)
893 {
894 char **list;
895@@ -4732,6 +4796,114 @@
896 return TEST_SUCCESS;
897 }
898
899+static test_return_t regression_bug_447342(memcached_st *memc)
900+{
901+ if (memc->number_of_hosts < 3 || pre_replication(memc) != MEMCACHED_SUCCESS)
902+ return TEST_SKIPPED;
903+
904+ memcached_return rc;
905+
906+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 2);
907+ assert(rc == MEMCACHED_SUCCESS);
908+
909+ const size_t max_keys= 100;
910+ char **keys= calloc(max_keys, sizeof(char*));
911+ size_t *key_length=calloc(max_keys, sizeof(size_t));
912+
913+ for (int x= 0; x < (int)max_keys; ++x)
914+ {
915+ char k[251];
916+ key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
917+ keys[x]= strdup(k);
918+ assert(keys[x] != NULL);
919+ rc= memcached_set(memc, k, key_length[x], k, key_length[x], 0, 0);
920+ assert(rc == MEMCACHED_SUCCESS);
921+ }
922+
923+ /*
924+ ** We are using the quiet commands to store the replicas, so we need
925+ ** to ensure that all of them are processed before we can continue.
926+ ** In the test we go directly from storing the object to trying to
927+ ** receive the object from all of the different servers, so we
928+ ** could end up in a race condition (the memcached server hasn't yet
929+ ** processed the quiet command from the replication set when it process
930+ ** the request from the other client (created by the clone)). As a
931+ ** workaround for that we call memcached_quit to send the quit command
932+ ** to the server and wait for the response ;-) If you use the test code
933+ ** as an example for your own code, please note that you shouldn't need
934+ ** to do this ;-)
935+ */
936+ memcached_quit(memc);
937+
938+ /* Verify that all messages are stored, and we didn't stuff too much
939+ * into the servers
940+ */
941+ rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
942+ assert(rc == MEMCACHED_SUCCESS);
943+
944+ unsigned int counter= 0;
945+ memcached_execute_function callbacks[1]= { [0]= &callback_counter };
946+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
947+ /* Verify that we received all of the key/value pairs */
948+ assert(counter == (unsigned int)max_keys);
949+
950+ memcached_quit(memc);
951+ /*
952+ * Don't do the following in your code. I am abusing the internal details
953+ * within the library, and this is not a supported interface.
954+ * This is to verify correct behavior in the library. Fake that two servers
955+ * are dead..
956+ */
957+ unsigned int port0= memc->hosts[0].port;
958+ unsigned int port2= memc->hosts[2].port;
959+ memc->hosts[0].port= 0;
960+ memc->hosts[2].port= 0;
961+
962+ rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
963+ assert(rc == MEMCACHED_SUCCESS);
964+
965+ counter= 0;
966+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
967+ assert(counter == (unsigned int)max_keys);
968+
969+ /* restore the memc handle */
970+ memc->hosts[0].port= port0;
971+ memc->hosts[2].port= port2;
972+
973+ memcached_quit(memc);
974+
975+ /* Remove half of the objects */
976+ for (int x= 0; x < (int)max_keys; ++x)
977+ if (x & 1)
978+ {
979+ rc= memcached_delete(memc, keys[x], key_length[x], 0);
980+ assert(rc == MEMCACHED_SUCCESS);
981+ }
982+
983+ memcached_quit(memc);
984+ memc->hosts[0].port= 0;
985+ memc->hosts[2].port= 0;
986+
987+ /* now retry the command, this time we should have cache misses */
988+ rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
989+ assert(rc == MEMCACHED_SUCCESS);
990+
991+ counter= 0;
992+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
993+ assert(counter == (unsigned int)(max_keys >> 1));
994+
995+ /* Release allocated resources */
996+ for (size_t x= 0; x < max_keys; ++x)
997+ free(keys[x]);
998+ free(keys);
999+ free(key_length);
1000+
1001+ /* restore the memc handle */
1002+ memc->hosts[0].port= port0;
1003+ memc->hosts[2].port= port2;
1004+ return TEST_SUCCESS;
1005+}
1006+
1007 /* Test memcached_server_get_last_disconnect
1008 * For a working server set, shall be NULL
1009 * For a set of non existing server, shall not be NULL
1010@@ -4892,6 +5064,7 @@
1011 {"mget_result", 1, mget_result_test },
1012 {"mget_result_alloc", 1, mget_result_alloc_test },
1013 {"mget_result_function", 1, mget_result_function },
1014+ {"mget_execute", 1, mget_execute },
1015 {"mget_end", 0, mget_end },
1016 {"get_stats", 0, get_stats },
1017 {"add_host_test", 0, add_host_test },
1018@@ -4998,6 +5171,7 @@
1019 {"lp:434843 buffered", 1, regression_bug_434843_buffered },
1020 {"lp:421108", 1, regression_bug_421108 },
1021 {"lp:442914", 1, regression_bug_442914 },
1022+ {"lp:447342", 1, regression_bug_447342 },
1023 {0, 0, 0}
1024 };
1025

Subscribers

People subscribed via source and target branches

to all changes: