Test the performance of the I/O queue, using typical producer consumer test. The test should examine the effect of using multiple threads on the performance.
#include "test.h"
#include <pjlib.h>
#include <pj/compat/high_precision.h>
#if INCLUDE_IOQUEUE_PERF_TEST
#ifdef _MSC_VER
# pragma warning ( disable: 4204)
#endif
#define THIS_FILE "ioq_perf"
#define TRACE_(expr)
static unsigned last_error_counter;
#define LIMIT_TRANSFER 0
#define IS_ERROR_SILENCED(e) ((e)==PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
typedef struct test_item
{
const char *type_name;
client_fd;
*client_key;
send_op;
int has_pending_send;
char *outgoing_buffer;
char *incoming_buffer;
bytes_recv;
} test_item;
{
int data_is_available = 1;
do {
if (thread_quit_flag)
return;
if (bytes_read < 0) {
if (rc != last_error) {
if (!IS_ERROR_SILENCED(rc)) {
PJ_LOG(3,(THIS_FILE,
"...error: read error, bytes_read=%d (%s)",
bytes_read, errmsg));
".....additional info: type=%s, total read=%u, "
"total sent=%u",
item->type_name, item->bytes_recv,
item->bytes_sent));
}
} else {
last_error_counter++;
}
bytes_read = 0;
} else if (bytes_read == 0) {
PJ_LOG(3,(THIS_FILE,
"...socket has closed!"));
}
item->bytes_recv += bytes_read;
if (LIMIT_TRANSFER && item->bytes_recv>item->buffer_size*LIMIT_TRANSFER)
thread_quit_flag = 1;
bytes_read = item->buffer_size;
item->incoming_buffer, &bytes_read, 0 );
data_is_available = 1;
data_is_available = 0;
} else {
data_is_available = 0;
if (rc != last_error) {
last_error = rc;
app_perror("...error: read error(1)", rc);
} else {
last_error_counter++;
}
}
if (!item->has_pending_send) {
item->outgoing_buffer, &sent, 0);
app_perror("...error: write error", rc);
item->bytes_sent += sent;
}
}
} while (data_is_available);
}
{
if (thread_quit_flag)
return;
if (bytes_sent <= 0) {
if (!IS_ERROR_SILENCED(-bytes_sent)) {
"...error: sending stopped. bytes_sent=%d",
-bytes_sent));
}
item->has_pending_send = 0;
}
else if (!item->has_pending_send) {
item->bytes_sent += bytes_sent;
bytes_sent = item->buffer_size;
item->outgoing_buffer, &bytes_sent, 0);
app_perror("...error: write error", rc);
item->bytes_sent += bytes_sent;
}
}
}
struct thread_arg
{
int id;
unsigned loop_cnt,
err_cnt,
event_cnt;
};
static int worker_thread(void *p)
{
struct thread_arg *arg = (struct thread_arg*) p;
int rc;
while (!thread_quit_flag) {
++arg->loop_cnt;
if (rc < 0) {
"...error in pj_ioqueue_poll() in thread %d "
"after %d loop: %s [pj_status_t=%d]",
arg->id, arg->loop_cnt, errmsg, -rc));
++arg->err_cnt;
} else if (rc > 0) {
++arg->event_cnt;
}
}
return 0;
}
int sock_type, const char *type_name,
unsigned thread_cnt, unsigned sockpair_cnt,
{
enum { MSEC_DURATION = 5000 };
test_item *items;
struct thread_arg *args;
pj_size_t total_elapsed_usec, total_received;
pj_highprec_t bandwidth;
unsigned i;
TRACE_((THIS_FILE, " starting test.."));
thread_quit_flag = 0;
if (!pool)
return -10;
items = (test_item*)
pj_pool_calloc(pool, sockpair_cnt,
sizeof(test_item));
TRACE_((THIS_FILE, " creating ioqueue.."));
app_perror("...error: unable to create ioqueue", rc);
return -15;
}
for (i=0; i<sockpair_cnt; ++i) {
items[i].type_name = type_name;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
items[i].outgoing_buffer = (
char*)
pj_pool_alloc(pool, buffer_size);
items[i].incoming_buffer = (
char*)
pj_pool_alloc(pool, buffer_size);
items[i].bytes_recv = items[i].bytes_sent = 0;
TRACE_((THIS_FILE, " calling socketpair.."));
&items[i].server_fd, &items[i].client_fd);
app_perror("...error: unable to create socket pair", rc);
return -20;
}
TRACE_((THIS_FILE, " register(1).."));
items[i].server_fd,
&items[i], &ioqueue_callback,
&items[i].server_key);
app_perror("...error: registering server socket to ioqueue", rc);
return -60;
}
TRACE_((THIS_FILE, " register(2).."));
items[i].client_fd,
&items[i], &ioqueue_callback,
&items[i].client_key);
app_perror("...error: registering server socket to ioqueue", rc);
return -70;
}
TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
bytes = items[i].buffer_size;
items[i].incoming_buffer, &bytes,
0);
app_perror("...error: pj_ioqueue_recv", rc);
return -73;
}
TRACE_((THIS_FILE, " pj_ioqueue_write.."));
bytes = items[i].buffer_size;
items[i].outgoing_buffer, &bytes, 0);
app_perror("...error: pj_ioqueue_write", rc);
return -76;
items[i].bytes_sent += bytes;
}
}
sizeof(struct thread_arg));
for (i=0; i<thread_cnt; ++i) {
struct thread_arg *arg = &args[i];
arg->id = i;
arg->ioqueue = ioqueue;
&worker_thread,
arg,
PJ_THREAD_SUSPENDED, &thread[i] );
app_perror("...error: unable to create thread", rc);
return -80;
}
}
return -90;
TRACE_((THIS_FILE, " resuming all threads.."));
for (i=0; i<thread_cnt; ++i) {
if (rc != 0)
return -100;
}
TRACE_((THIS_FILE, " wait for few seconds.."));
do {
if (thread_quit_flag) {
TRACE_((THIS_FILE, " transfer limit reached.."));
break;
}
TRACE_((THIS_FILE, " time limit reached.."));
break;
}
} while (1);
TRACE_((THIS_FILE, " terminating all threads.."));
thread_quit_flag = 1;
for (i=0; i<thread_cnt; ++i) {
TRACE_((THIS_FILE, " join thread %d..", i));
}
TRACE_((THIS_FILE, " closing all sockets.."));
for (i=0; i<sockpair_cnt; ++i) {
}
for (i=0; i<thread_cnt; ++i) {
}
TRACE_((THIS_FILE, " destroying ioqueue.."));
total_received = 0;
for (i=0; i<sockpair_cnt; ++i) {
total_received += items[i].bytes_recv;
}
bandwidth = (pj_highprec_t)total_received;
pj_highprec_mul(bandwidth, 1000);
pj_highprec_div(bandwidth, total_elapsed_usec);
if (display_report) {
PJ_LOG(3,(THIS_FILE,
" %s %d threads, %d pairs", type_name,
thread_cnt, sockpair_cnt));
PJ_LOG(3,(THIS_FILE,
" Elapsed : %u msec", total_elapsed_usec/1000));
PJ_LOG(3,(THIS_FILE,
" Bandwidth: %d KB/s", *p_bandwidth));
PJ_LOG(3,(THIS_FILE,
" Threads statistics:"));
PJ_LOG(3,(THIS_FILE,
" ============================="));
PJ_LOG(3,(THIS_FILE,
" Thread Loops Events Errors"));
PJ_LOG(3,(THIS_FILE,
" ============================="));
for (i=0; i<thread_cnt; ++i) {
struct thread_arg *arg = &args[i];
PJ_LOG(3,(THIS_FILE,
" %6d %6d %6d %6d",
arg->id, arg->loop_cnt, arg->event_cnt, arg->err_cnt));
}
PJ_LOG(3,(THIS_FILE,
" ============================="));
PJ_LOG(3,(THIS_FILE,
" Socket-pair statistics:"));
PJ_LOG(3,(THIS_FILE,
" ==================================="));
PJ_LOG(3,(THIS_FILE,
" Pair Sent Recv Pct total"));
PJ_LOG(3,(THIS_FILE,
" ==================================="));
for (i=0; i<sockpair_cnt; ++i) {
test_item *item = &items[i];
PJ_LOG(3,(THIS_FILE,
" %4d %5.1f MB %5.1f MB %5.1f%%",
i, item->bytes_sent/1000000.0,
item->bytes_recv/1000000.0,
item->bytes_recv*100.0/total_received));
}
} else {
PJ_LOG(3,(THIS_FILE,
" %.4s %2d %2d %8d KB/s",
type_name, thread_cnt, sockpair_cnt,
*p_bandwidth));
}
TRACE_((THIS_FILE, " done.."));
return 0;
}
{
enum { BUF_SIZE = 512 };
int i, rc;
struct {
int type;
const char *type_name;
int thread_cnt;
int sockpair_cnt;
} test_param[] =
{
};
int best_index = 0;
PJ_LOG(3,(THIS_FILE,
" Testing with concurency=%d, epoll_flags=0x%x",
PJ_LOG(3,(THIS_FILE,
" ======================================="));
PJ_LOG(3,(THIS_FILE,
" Type Threads Skt.Pairs Bandwidth"));
PJ_LOG(3,(THIS_FILE,
" ======================================="));
best_bandwidth = 0;
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
rc = perform_test(cfg,
test_param[i].type,
test_param[i].type_name,
test_param[i].thread_cnt,
test_param[i].sockpair_cnt,
BUF_SIZE,
&bandwidth);
if (rc != 0)
return rc;
if (bandwidth > best_bandwidth)
best_bandwidth = bandwidth, best_index = i;
}
" Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
test_param[best_index].type_name,
test_param[best_index].thread_cnt,
test_param[best_index].sockpair_cnt,
best_bandwidth));
PJ_LOG(3,(THIS_FILE,
" (Note: packet size=%d, total errors=%u)",
BUF_SIZE, last_error_counter));
return 0;
}
int ioqueue_perf_test(void)
{
#if PJ_HAS_LINUX_EPOLL
0,
#else
#endif
};
int i, rc;
PJ_LOG(3,(THIS_FILE,
" Detailed perf (concurrency=%d, epoll_flags=0x%x):",
rc = perform_test(&cfg,
"udp",
8,
8,
512,
&bandwidth);
if (rc != 0)
return rc;
}
PJ_LOG(3,(THIS_FILE,
" Detailed perf (concurrency=%d, epoll_flags=0x%x):",
rc = perform_test(&cfg,
"udp",
8,
8,
512,
&bandwidth);
if (rc != 0)
return rc;
int concur;
for (concur=0; concur<2; ++concur) {
rc = ioqueue_perf_test_imp(&cfg);
if (rc != 0)
return rc;
}
}
return 0;
}
#else
int dummy_uiq_perf_test;
#endif
long pj_ssize_t
Definition: types.h:64
int pj_bool_t
Definition: types.h:71
struct pj_ioqueue_t pj_ioqueue_t
Definition: types.h:210
long pj_sock_t
Definition: types.h:263
struct pj_ioqueue_key_t pj_ioqueue_key_t
Definition: types.h:216
size_t pj_size_t
Definition: types.h:58
int pj_status_t
Definition: types.h:68
struct pj_thread_t pj_thread_t
Definition: types.h:236
#define PJ_ARRAY_SIZE(a)
Definition: types.h:281
unsigned int pj_uint32_t
Definition: types.h:43
@ PJ_SUCCESS
Definition: types.h:93
@ PJ_TRUE
Definition: types.h:96
@ PJ_FALSE
Definition: types.h:99
pj_ioqueue_epoll_flag
Definition: ioqueue.h:337
void pj_ioqueue_op_key_init(pj_ioqueue_op_key_t *op_key, pj_size_t size)
pj_status_t pj_ioqueue_create2(pj_pool_t *pool, pj_size_t max_fd, const pj_ioqueue_cfg *cfg, pj_ioqueue_t **ioqueue)
pj_status_t pj_ioqueue_destroy(pj_ioqueue_t *ioque)
pj_status_t pj_ioqueue_recv(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, pj_uint32_t flags)
void * pj_ioqueue_get_user_data(pj_ioqueue_key_t *key)
pj_status_t pj_ioqueue_unregister(pj_ioqueue_key_t *key)
void pj_ioqueue_cfg_default(pj_ioqueue_cfg *cfg)
pj_status_t pj_ioqueue_send(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, pj_uint32_t flags)
int pj_ioqueue_poll(pj_ioqueue_t *ioque, const pj_time_val *timeout)
pj_status_t pj_ioqueue_register_sock(pj_pool_t *pool, pj_ioqueue_t *ioque, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key)
const char * pj_ioqueue_name(void)
@ PJ_IOQUEUE_EPOLL_AUTO
Definition: ioqueue.h:351
@ PJ_IOQUEUE_EPOLL_ONESHOT
Definition: ioqueue.h:344
@ PJ_IOQUEUE_EPOLL_EXCLUSIVE
Definition: ioqueue.h:340
#define PJ_LOG(level, arg)
Definition: log.h:106
void * pj_pool_alloc(pj_pool_t *pool, pj_size_t size)
pj_pool_t * pj_pool_create(pj_pool_factory *factory, const char *name, pj_size_t initial_size, pj_size_t increment_size, pj_pool_callback *callback)
void * pj_pool_calloc(pj_pool_t *pool, pj_size_t count, pj_size_t elem)
void pj_pool_release(pj_pool_t *pool)
char * pj_create_random_string(char *str, pj_size_t length)
#define pj_AF_INET()
Definition: sock.h:113
#define pj_SOCK_DGRAM()
Definition: sock.h:162
#define pj_SOCK_STREAM()
Definition: sock.h:160
pj_status_t pj_thread_resume(pj_thread_t *thread)
pj_status_t pj_thread_destroy(pj_thread_t *thread)
pj_status_t pj_thread_join(pj_thread_t *thread)
pj_status_t pj_thread_create(pj_pool_t *pool, const char *thread_name, pj_thread_proc *proc, void *arg, pj_size_t stack_size, unsigned flags, pj_thread_t **thread)
pj_status_t pj_thread_sleep(unsigned msec)
pj_status_t pj_get_timestamp(pj_timestamp *ts)
pj_uint32_t pj_elapsed_usec(const pj_timestamp *start, const pj_timestamp *stop)
#define PJ_THREAD_DEFAULT_STACK_SIZE
Definition: config.h:607
#define PJ_ERR_MSG_SIZE
Definition: errno.h:84
pj_str_t pj_strerror(pj_status_t statcode, char *buf, pj_size_t bufsize)
#define PJ_PERROR(level, arg)
Definition: errno.h:175
#define PJ_EPENDING
Definition: errno.h:322
Definition: udp_echo_srv_ioqueue.c:27
Definition: ioqueue.h:219
void(* on_write_complete)(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent)
Definition: ioqueue.h:246
void(* on_read_complete)(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read)
Definition: ioqueue.h:231
Definition: ioqueue.h:362
unsigned epoll_flags
Definition: ioqueue.h:370
pj_bool_t default_concurrency
Definition: ioqueue.h:379
Definition: ioqueue.h:208