HomePort
curl_ev_module.c
Go to the documentation of this file.
1 /*
2  * Copyright 2011 Aalborg University. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are
5  * permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this list of
8  * conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this list
11  * of conditions and the following disclaimer in the documentation and/or other materials
12  * provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY Aalborg University ''AS IS'' AND ANY EXPRESS OR IMPLIED
15  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
16  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Aalborg University OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
20  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
21  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
22  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  * The views and conclusions contained in the software and documentation are those of the
25  * authors and should not be interpreted as representing official policies, either expressed
26  */
27 
29 #include "curl_ev_intern.h"
30 #include "hpd/hpd_shared_api.h"
31 #include "hpd/common/hpd_common.h"
32 #include <ev.h>
33 
34 typedef struct curl_ev_io curl_ev_io_t;
35 
36 struct curl_ev_io {
37  TAILQ_ENTRY(curl_ev_io) HPD_TAILQ_FIELD;
38  ev_io watcher;
39 };
40 
41 struct curl_ev {
42  CURLM *mult_handle;
43  ev_timer timer;
44  TAILQ_HEAD(, curl_ev_io) io_watchers;
45  TAILQ_HEAD(curl_ev_handles, hpd_curl_ev_handle) handles;
47  const hpd_module_t *context;
48 };
49 
50 static hpd_error_t curl_ev_on_create(void **data, const hpd_module_t *context);
51 static hpd_error_t curl_ev_on_destroy(void *data);
52 static hpd_error_t curl_ev_on_start(void *data, hpd_t *hpd);
53 static hpd_error_t curl_ev_on_stop(void *data, hpd_t *hpd);
54 static hpd_error_t curl_ev_on_parse_opt(void *data, const char *name, const char *arg);
55 
56 static CURLMcode curl_ev_socket_action(int sockfd);
57 
64 };
65 
66 static curl_ev_t *curl_ev = NULL;
67 
68 #define CURL_EV_INIT_CHECK(CONTEXT) do { \
69  if (!curl_ev) { \
70  HPD_LOG_ERROR((CONTEXT), "Curl not initialised"); \
71  HPD_LOG_RETURN((CONTEXT), HPD_E_STATE, "Did you remember to add the curl module to hpd?"); \
72  } \
73 } while(0)
74 
75 static void curl_ev_on_io(hpd_ev_loop_t *loop, ev_io *w, int revents)
76 {
77  CURLMcode cmc;
78  if ((cmc = curl_ev_socket_action(w->fd)))
79  HPD_LOG_ERROR(curl_ev->context, "curl_ev_socket_action() failed [code: %i]", cmc);
80 }
81 
82 static void curl_ev_on_timeout(hpd_ev_loop_t *loop, ev_timer *w, int revents)
83 {
84  ev_timer_stop(loop, w);
85  CURLMcode cmc;
86  if ((cmc = curl_ev_socket_action(CURL_SOCKET_TIMEOUT)))
87  HPD_LOG_ERROR(curl_ev->context, "curl_ev_socket_action() failed [code: %i]", cmc);
88 }
89 
90 static CURLMcode curl_ev_get_stopped_watcher(curl_ev_io_t **w, curl_socket_t s, const hpd_module_t *context)
91 {
92  CURLMcode cmc;
93 
94  if ((*w)) {
95  ev_io_stop(curl_ev->loop, &(*w)->watcher);
96  } else {
97  HPD_CALLOC((*w), 1, curl_ev_io_t);
98  if ((cmc = curl_multi_assign(curl_ev->mult_handle, s, (*w)))) {
99  HPD_LOG_ERROR(context, "Curl multi return an error [code: %i]", cmc);
100  free((*w));
101  return cmc;
102  }
103  TAILQ_INSERT_TAIL(&curl_ev->io_watchers, (*w), HPD_TAILQ_FIELD);
104  ev_init(&(*w)->watcher, curl_ev_on_io);
105  }
106  return CURLM_OK;
107 
108  alloc_error:
109  return CURLM_OUT_OF_MEMORY;
110 }
111 
112 static CURLMcode curl_ev_start_watcher(curl_ev_io_t *w, curl_socket_t s, int what, const hpd_module_t *context)
113 {
114  switch (what) {
115  case CURL_POLL_IN:
116  ev_io_set(&w->watcher, s, EV_READ);
117  break;
118  case CURL_POLL_OUT:
119  ev_io_set(&w->watcher, s, EV_WRITE);
120  break;
121  case CURL_POLL_INOUT:
122  ev_io_set(&w->watcher, s, EV_READ | EV_WRITE);
123  break;
124  default:
125  HPD_LOG_ERROR(context, "Should not be here");
126  return CURLM_INTERNAL_ERROR;
127  }
128 
129  ev_io_start(curl_ev->loop, &w->watcher);
130  return CURLM_OK;
131 }
132 
134 {
135  ev_io_stop(curl_ev->loop, &w->watcher);
136  TAILQ_REMOVE(&curl_ev->io_watchers, w, HPD_TAILQ_FIELD);
137  free(w);
138 }
139 
140 static CURLMcode curl_ev_on_update_socket(CURL *easy, curl_socket_t s, int what, void *userp, void *socketp)
141 {
142  CURLMcode cmc;
143  const hpd_module_t *context = curl_ev->context;
144 
145  switch (what) {
146  case CURL_POLL_IN:
147  case CURL_POLL_OUT:
148  case CURL_POLL_INOUT: {
149  curl_ev_io_t *w = socketp;
150  if ((cmc = curl_ev_get_stopped_watcher(&w, s, context)) != CURLM_OK) return cmc;
151  if ((cmc = curl_ev_start_watcher(w, s, what, context)) != CURLM_OK) return cmc;
152  return CURLM_OK;
153  }
154  case CURL_POLL_REMOVE: {
155  curl_ev_stop_watcher(socketp);
156  return CURLM_OK;
157  }
158  default: {
159  HPD_LOG_ERROR(context, "Should not be here");
160  return CURLM_INTERNAL_ERROR;
161  }
162  }
163 }
164 
165 static CURLMcode curl_ev_on_update_timer(CURLM *multi, long timeout_ms, void *userp)
166 {
167  CURLMcode cmc;
168 
169  if (timeout_ms == 0) {
170  if ((cmc = curl_ev_socket_action(CURL_SOCKET_TIMEOUT))) {
171  HPD_LOG_ERROR(curl_ev->context, "curl_ev_socket_action() failed [code: %i]", cmc);
172  return cmc;
173  }
174  } else if (timeout_ms > 0) {
175  curl_ev->timer.repeat = timeout_ms / 1000.0;
176  ev_timer_again(curl_ev->loop, &curl_ev->timer);
177  } else {
178  ev_timer_stop(curl_ev->loop, &curl_ev->timer);
179  }
180 
181  return CURLM_OK;
182 }
183 
185 {
186  CURLMcode cmc, cmc2;
187  const hpd_module_t *context = curl_ev->context;
188 
189  if (!curl_ev->mult_handle) {
190  HPD_LOG_VERBOSE(context, "Not started, saving handles for later...");
191  return HPD_E_SUCCESS;
192  }
193 
194  hpd_curl_ev_handle_t *handle = TAILQ_FIRST(&curl_ev->handles);
195  if (handle) {
196  // TODO On failures we can move along in the queue and send an error to the failed one
197  if ((cmc = curl_multi_add_handle(curl_ev->mult_handle, handle->handle))) goto add_error;
198  if ((cmc = curl_ev_socket_action(CURL_SOCKET_TIMEOUT))) goto action_error;
199  }
200 
201  return HPD_E_SUCCESS;
202 
203  action_error:
204  if ((cmc2 = curl_multi_remove_handle(curl_ev->mult_handle, handle->handle)))
205  HPD_LOG_ERROR(context, "Curl remove handle return an error [code: %i]", cmc2);
206 
207  add_error:
208  HPD_LOG_RETURN(context, HPD_E_UNKNOWN, "Curl multi return an error [code: %i]", cmc);
209 }
210 
211 static CURLMcode curl_ev_socket_action(int sockfd)
212 {
213  int unused;
214  CURLMcode cmc;
215  hpd_error_t rc;
216  const hpd_module_t *context = curl_ev->context;
217 
218  if ((cmc = curl_multi_socket_action(curl_ev->mult_handle, sockfd, 0, &unused)))
219  HPD_LOG_RETURN(context, cmc, "Curl multi return an error [code: %i]", cmc);
220 
221  CURLMsg *m;
222  while ((m = curl_multi_info_read(curl_ev->mult_handle, &unused))) {
223  switch (m->msg) {
224  case CURLMSG_NONE: {
225  break;
226  }
227  case CURLMSG_DONE: {
228  CURL *easy_handle = m->easy_handle;
229  hpd_curl_ev_handle_t *handle;
230  TAILQ_FOREACH(handle, &curl_ev->handles, HPD_TAILQ_FIELD) {
231  if (handle->handle == easy_handle) {
232  CURLcode cc = m->data.result;
233  if (cc != CURLE_OK)
234  HPD_LOG_WARN(context, "Curl handle error: %s [code: %i]", curl_easy_strerror(cc), cc);
235  if (handle->on_done)
236  handle->on_done(handle->data, cc);
237  if ((rc = hpd_curl_ev_remove_handle(handle))) {
238  HPD_LOG_ERROR(context, "Failed to remove handle [code: %i]", rc);
239  return CURLM_INTERNAL_ERROR;
240  }
241  if ((rc = hpd_curl_ev_cleanup(handle))) {
242  HPD_LOG_ERROR(context, "Failed to remove handle [code: %i]", rc);
243  return CURLM_INTERNAL_ERROR;
244  }
245  break;
246  }
247  }
248  break;
249  }
250  case CURLMSG_LAST: {
251  HPD_LOG_ERROR(context, "Should not be here");
252  return CURLM_INTERNAL_ERROR;
253  }
254  }
255  }
256 
257  return CURLM_OK;
258 }
259 
261 {
262  CURL_EV_INIT_CHECK(handle->context);
263 
264  hpd_error_t rc;
265 
267  TAILQ_FOREACH(h, &curl_ev->handles, HPD_TAILQ_FIELD)
268  if (h == handle)
269  HPD_LOG_RETURN(curl_ev->context, HPD_E_ARGUMENT, "Cannot add handle more than once");
270 
271  if (TAILQ_EMPTY(&curl_ev->handles)) {
272  TAILQ_INSERT_TAIL(&curl_ev->handles, handle, HPD_TAILQ_FIELD);
273  if ((rc = curl_ev_add_next())) {
274  TAILQ_REMOVE(&curl_ev->handles, handle, HPD_TAILQ_FIELD);
275  return rc;
276  }
277  } else {
278  TAILQ_INSERT_TAIL(&curl_ev->handles, handle, HPD_TAILQ_FIELD); // Line duplication due to if condition
279  }
280  handle->curl_ev = curl_ev;
281  return HPD_E_SUCCESS;
282 }
283 
285 {
286  CURL_EV_INIT_CHECK(handle->context);
287 
288  hpd_error_t rc;
289  CURLMcode cmc;
290 
291  if (TAILQ_FIRST(&curl_ev->handles) == handle) {
292  if ((cmc = curl_multi_remove_handle(curl_ev->mult_handle, handle->handle)))
293  HPD_LOG_RETURN(curl_ev->context, HPD_E_UNKNOWN, "Curl multi return an error [code: %i]", cmc);
294  TAILQ_REMOVE(&curl_ev->handles, handle, HPD_TAILQ_FIELD);
295  if ((rc = curl_ev_add_next()))
296  HPD_LOG_RETURN(curl_ev->context, HPD_E_SUCCESS, "Curl add next failed [code: %i]", rc);
297  } else {
298  TAILQ_REMOVE(&curl_ev->handles, handle, HPD_TAILQ_FIELD);
299  }
300 
301  handle->curl_ev = NULL;
302  return HPD_E_SUCCESS;
303 }
304 
305 static hpd_error_t curl_ev_on_create(void **data, const hpd_module_t *context)
306 {
307  if (!context) return HPD_E_NULL;
308  if (curl_ev)
309  HPD_LOG_RETURN(context, HPD_E_STATE, "Only one ínstance of curl_ev module allowed");
310 
311  HPD_CALLOC(curl_ev, 1, curl_ev_t);
312  curl_ev->context = context;
313 
314  TAILQ_INIT(&curl_ev->handles);
315  TAILQ_INIT(&curl_ev->io_watchers);
316 
317  ev_init(&curl_ev->timer, curl_ev_on_timeout);
318 
319  return HPD_E_SUCCESS;
320 
321  alloc_error:
322  curl_global_cleanup();
323  HPD_LOG_RETURN_E_ALLOC(context);
324 }
325 
327 {
328  if (!curl_ev) return HPD_E_NULL;
329 
330  hpd_error_t rc;
331 
332  while (!TAILQ_EMPTY(&curl_ev->handles)) {
333  hpd_curl_ev_handle_t *handle = TAILQ_LAST(&curl_ev->handles, curl_ev_handles);
334  TAILQ_REMOVE(&curl_ev->handles, handle, HPD_TAILQ_FIELD);
335  handle->curl_ev = NULL;
336  if ((rc = hpd_curl_ev_cleanup(handle))) {
337  HPD_LOG_ERROR(curl_ev->context, "Failed to remove handle [code: %i]", rc);
338  return rc;
339  }
340  }
341 
342  free(curl_ev);
343  curl_ev = NULL;
344 
345  return HPD_E_SUCCESS;
346 }
347 
349 {
350  if (!curl_ev) return HPD_E_NULL;
351  if (!hpd) HPD_LOG_RETURN_E_NULL(curl_ev->context);
352 
353  hpd_error_t rc;
354  const hpd_module_t *context = curl_ev->context;
355 
356  CURLcode cc;
357  if ((cc = curl_global_init(CURL_GLOBAL_ALL)))
358  HPD_LOG_RETURN(context, HPD_E_UNKNOWN, "Curl failed to initialise [code: %i]", cc);
359  // TODO Check supported features in curl_version_info
360 
362  if ((rc = hpd_get_loop(hpd, &loop))) goto hpd_error;
363  curl_ev->loop = loop;
364 
365  CURLMcode cmc;
366  curl_ev->mult_handle = curl_multi_init();
367  if ((cmc = curl_multi_setopt(curl_ev->mult_handle, CURLMOPT_SOCKETFUNCTION, curl_ev_on_update_socket))) goto curl_m_error;
368  if ((cmc = curl_multi_setopt(curl_ev->mult_handle, CURLMOPT_SOCKETDATA, curl_ev))) goto curl_m_error;
369  if ((cmc = curl_multi_setopt(curl_ev->mult_handle, CURLMOPT_TIMERFUNCTION, curl_ev_on_update_timer))) goto curl_m_error;
370  if ((cmc = curl_multi_setopt(curl_ev->mult_handle, CURLMOPT_TIMERDATA, curl_ev))) goto curl_m_error;
371 
372  if ((rc = curl_ev_add_next())) goto next_error;
373 
374  return HPD_E_SUCCESS;
375 
376  hpd_error:
377  curl_global_cleanup();
378  return rc;
379  curl_m_error:
380  curl_ev->loop = NULL;
381  curl_global_cleanup();
382  HPD_LOG_RETURN(context, HPD_E_UNKNOWN, "Curl multi return an error [code: %i]", cmc);
383  next_error:
384  curl_ev->loop = NULL;
385  curl_global_cleanup();
386  return rc;
387 }
388 
390 {
391  if (!curl_ev) return HPD_E_NULL;
392 
393  CURLMcode cmc;
394  const hpd_module_t *context = curl_ev->context;
395 
396  // Stop current handle
397  hpd_curl_ev_handle_t *handle = TAILQ_FIRST(&curl_ev->handles);
398  if (handle && (cmc = curl_multi_remove_handle(curl_ev->mult_handle, handle->handle)))
399  HPD_LOG_RETURN(context, HPD_E_UNKNOWN, "Curl multi return an error [code: %i]", cmc);
400 
401  // Kill watchers
402  ev_timer_stop(curl_ev->loop, &curl_ev->timer);
403  curl_ev_io_t *io, *io_tmp;
404  TAILQ_FOREACH_SAFE(io, &curl_ev->io_watchers, HPD_TAILQ_FIELD, io_tmp) {
406  }
407 
408  // Stop curl multi
409  if ((cmc = curl_multi_cleanup(curl_ev->mult_handle)))
410  HPD_LOG_RETURN(context, HPD_E_UNKNOWN, "Curl multi return an error [code: %i]", cmc);
411 
412  return HPD_E_SUCCESS;
413 }
414 
415 static hpd_error_t curl_ev_on_parse_opt(void *data, const char *name, const char *arg)
416 {
417  if (!curl_ev) return HPD_E_NULL;
418 
419  return HPD_E_ARGUMENT;
420 }
hpd_error_t hpd_curl_ev_remove_handle(hpd_curl_ev_handle_t *handle)
static hpd_error_t curl_ev_on_destroy(void *data)
hpd_error_t hpd_curl_ev_cleanup(hpd_curl_ev_handle_t *handle)
Definition: curl_ev.c:90
#define HPD_LOG_RETURN(CONTEXT, E, FMT,...)
static CURLMcode curl_ev_start_watcher(curl_ev_io_t *w, curl_socket_t s, int what, const hpd_module_t *context)
static hpd_error_t curl_ev_on_create(void **data, const hpd_module_t *context)
ev_timer timer
static CURLMcode curl_ev_on_update_timer(CURLM *multi, long timeout_ms, void *userp)
static void curl_ev_on_timeout(hpd_ev_loop_t *loop, ev_timer *w, int revents)
const hpd_module_t * context
static void curl_ev_stop_watcher(curl_ev_io_t *w)
hpd_curl_ev_done_f on_done
free(data.url)
#define HPD_TAILQ_FIELD
Definition: hpd_queue.h:37
#define HPD_CALLOC(PTR, NUM, CAST)
Allocates and zeros a structure.
Definition: hpd_common.h:44
hpd_error
[HPD_NULL_TERMINATED]
Definition: hpd_types.h:69
TAILQ_HEAD(hpd_map, hpd_pair)
hpd_error_t hpd_curl_ev_add_handle(hpd_curl_ev_handle_t *handle)
CURLM * mult_handle
static hpd_error_t curl_ev_on_parse_opt(void *data, const char *name, const char *arg)
static CURLMcode curl_ev_on_update_socket(CURL *easy, curl_socket_t s, int what, void *userp, void *socketp)
#define HPD_LOG_VERBOSE(CONTEXT, FMT,...)
static curl_ev_t * curl_ev
enum hpd_error hpd_error_t
Definition: hpd_types.h:167
#define HPD_LOG_RETURN_E_ALLOC(CONTEXT)
[Application API Callbacks]
Definition: hpd_types.h:200
#define HPD_LOG_RETURN_E_NULL(CONTEXT)
static hpd_error_t curl_ev_on_start(void *data, hpd_t *hpd)
Definition: daemon.h:50
#define CURL_EV_INIT_CHECK(CONTEXT)
static struct ev_loop * loop
static CURLMcode curl_ev_socket_action(int sockfd)
static void curl_ev_on_io(hpd_ev_loop_t *loop, ev_io *w, int revents)
static hpd_error_t curl_ev_on_stop(void *data, hpd_t *hpd)
static hpd_error_t curl_ev_add_next()
static CURLMcode curl_ev_get_stopped_watcher(curl_ev_io_t **w, curl_socket_t s, const hpd_module_t *context)
#define HPD_LOG_WARN(CONTEXT, FMT,...)
curl_ev_t * curl_ev
struct ev_loop hpd_ev_loop_t
Definition: hpd_types.h:51
hpd_module_def_t hpd_curl_ev
#define HPD_LOG_ERROR(CONTEXT, FMT,...)
hpd_error_t hpd_get_loop(hpd_t *hpd, hpd_ev_loop_t **loop)
Definition: daemon_api.c:44