This file provides implementation to test the functionality of the I/O queue when TCP socket is used.
#include "test.h"
#if INCLUDE_TCP_IOQUEUE_TEST
#include <pjlib.h>
#if PJ_HAS_TCP
#define THIS_FILE "test_tcp"
#define NON_EXISTANT_PORT 50123
#define LOOP 100
#define BUF_MIN_SIZE 32
#define BUF_MAX_SIZE 2048
#define SOCK_INACTIVE_MIN (4-2)
#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
callback_write_size,
callback_accept_status,
callback_connect_status;
static unsigned callback_call_count;
*callback_write_key,
*callback_accept_key,
*callback_connect_key;
*callback_write_op,
*callback_accept_op;
{
callback_read_key = key;
callback_read_size = bytes_read;
callback_call_count++;
}
{
callback_write_key = key;
callback_write_size = bytes_written;
callback_call_count++;
}
int status)
{
app_perror(".....warning: received error in on_ioqueue_accept() callback",
status);
} else {
callback_accept_status = -61;
PJ_LOG(3,(
"",
"..... on_ioqueue_accept() callback was given "
"invalid socket and status is %d", status));
}
} else {
int client_addr_len;
client_addr_len = sizeof(addr);
app_perror("...ERROR in pj_sock_getsockname()", status);
}
callback_accept_key = key;
callback_accept_status = status;
callback_call_count++;
}
}
{
callback_connect_key = key;
callback_connect_status = status;
callback_call_count++;
}
{
&on_ioqueue_read,
&on_ioqueue_write,
&on_ioqueue_accept,
&on_ioqueue_connect,
};
void *send_buf,
void *recv_buf,
{
int i, pending_op = 0;
bytes = bufsize;
app_perror("...pj_ioqueue_recv error", status);
return -100;
}
++pending_op;
else {
return -115;
}
bytes = bufsize;
return -120;
}
++pending_op;
}
callback_read_size = callback_write_size = 0;
callback_read_key = callback_write_key = NULL;
callback_read_op = callback_write_op = NULL;
status = 0;
while (pending_op > 0) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status > 0) {
if (callback_read_size) {
if (callback_read_size != bufsize)
return -160;
if (callback_read_key != skey)
return -161;
if (callback_read_op != &read_op)
return -162;
}
if (callback_write_size) {
if (callback_write_key != ckey)
return -163;
if (callback_write_op != &write_op)
return -164;
}
pending_op -= status;
}
if (status == 0) {
PJ_LOG(3,(
"",
"...error: timed out"));
}
if (status < 0) {
return -170;
}
}
for (i=0; i<10; ++i) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status != 0)
return -173;
}
if (
pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
return -180;
}
return 0;
}
{
int client_addr_len;
char *send_buf, *recv_buf;
int bufsize = BUF_MIN_SIZE;
int status = -1;
int pending_op = 0;
app_perror("...error creating socket", rc);
status=-1; goto on_error;
}
app_perror("...error creating socket", rc);
status=-1; goto on_error;
}
app_perror("...bind error", rc);
status=-10; goto on_error;
}
client_addr_len = sizeof(addr);
app_perror("...ERROR in pj_sock_getsockname()", rc);
status=-15; goto on_error;
}
app_perror("...ERROR in pj_ioqueue_create()", rc);
status=-20; goto on_error;
}
&ckey1);
else
ckey1 = NULL;
app_perror("...ERROR in pj_ioqueue_register_sock()", rc);
status=-23; goto on_error;
}
app_perror("...ERROR in pj_sock_listen()", rc);
status=-25; goto on_error;
}
&client_addr, &rmt_addr, &client_addr_len);
app_perror("...ERROR in pj_ioqueue_accept()", rc);
status=-30; goto on_error;
}
++pending_op;
}
app_perror("...ERROR in pj_ioqueue_connect()", rc);
status=-40; goto on_error;
}
++pending_op;
}
callback_read_size = callback_write_size = 0;
callback_accept_status = callback_connect_status = -2;
callback_call_count = 0;
callback_read_key = callback_write_key =
callback_accept_key = callback_connect_key = NULL;
callback_accept_op = callback_read_op = callback_write_op = NULL;
while (pending_op) {
#ifdef PJ_SYMBIAN
callback_call_count = 0;
status = callback_call_count;
#else
#endif
if (status > 0) {
if (callback_accept_status != -2) {
if (callback_accept_status != 0) {
status=-41; goto on_error;
}
if (callback_accept_key != skey) {
status=-42; goto on_error;
}
if (callback_accept_op != &accept_op) {
status=-43; goto on_error;
}
callback_accept_status = -2;
}
if (callback_connect_status != -2) {
if (callback_connect_status != 0) {
status=-50; goto on_error;
}
if (callback_connect_key != ckey1) {
status=-51; goto on_error;
}
callback_connect_status = -2;
}
if (status > pending_op) {
"...error: pj_ioqueue_poll() returned %d "
"(only expecting %d)",
status, pending_op));
return -52;
}
pending_op -= status;
if (pending_op == 0) {
status = 0;
}
}
}
if (pending_op == 0) {
unsigned i;
for (i=0; i<10; ++i) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status != 0) {
status=-60; goto on_error;
}
}
}
status = -69;
goto on_error;
}
&test_cb, &ckey0);
app_perror("...ERROR in pj_ioqueue_register_sock", rc);
status = -70;
goto on_error;
}
status = send_recv_test(ioque, ckey0, ckey1, send_buf,
recv_buf, bufsize, &t_elapsed);
if (status != 0) {
goto on_error;
}
status = 0;
on_error:
if (skey != NULL)
if (ckey1 != NULL)
if (ckey0 != NULL)
if (ioque != NULL)
return status;
}
{
int status = -1;
int pending_op = 0;
if (!ioque) {
status=-20; goto on_error;
}
app_perror("...ERROR in pj_sock_socket()", rc);
status=-1; goto on_error;
}
&test_cb, &ckey1);
app_perror("...ERROR in pj_ioqueue_register_sock()", rc);
status=-23; goto on_error;
}
status = -30;
goto on_error;
}
} else {
++pending_op;
}
callback_connect_status = -2;
callback_connect_key = NULL;
while (pending_op) {
#ifdef PJ_SYMBIAN
callback_call_count = 0;
status = callback_call_count;
#else
#endif
if (status > 0) {
if (callback_connect_key==ckey1) {
if (callback_connect_status == 0) {
status = -50;
goto on_error;
}
}
if (status > pending_op) {
"...error: pj_ioqueue_poll() returned %d "
"(only expecting %d)",
status, pending_op));
return -552;
}
pending_op -= status;
if (pending_op == 0) {
status = 0;
}
}
}
if (pending_op == 0) {
unsigned i;
for (i=0; i<10; ++i) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status != 0) {
status=-60; goto on_error;
}
}
}
status = 0;
on_error:
if (ckey1 != NULL)
if (ioque != NULL)
return status;
}
{
#if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0
enum { MAX_PAIR = 1, TEST_LOOP = 2 };
#else
enum { MAX_PAIR = 4, TEST_LOOP = 2 };
#endif
struct listener
{
int addr_len;
} listener;
struct server
{
int rem_addr_len;
} server[MAX_PAIR];
struct client
{
} client[MAX_PAIR];
char *send_buf, *recv_buf;
unsigned i, bufsize = BUF_MIN_SIZE;
int status;
int test_loop, pending_op = 0;
listener.key = NULL;
for (i=0; i<MAX_PAIR; ++i) {
server[i].key = NULL;
}
for (i=0; i<MAX_PAIR; ++i) {
client[i].key = NULL;
}
app_perror("...ERROR in pj_ioqueue_create()", rc);
return -10;
}
app_perror("...error creating socket", rc);
status=-20; goto on_error;
}
if ((rc=
pj_sock_bind(listener.sock, &listener.addr,
sizeof(listener.addr))) != 0 ) {
app_perror("...bind error", rc);
status=-30; goto on_error;
}
listener.addr_len = sizeof(listener.addr);
app_perror("...ERROR in pj_sock_getsockname()", rc);
status=-40; goto on_error;
}
&listener.key);
app_perror("...ERROR", rc);
status=-50; goto on_error;
}
app_perror("...ERROR in pj_sock_listen()", rc);
status=-60; goto on_error;
}
for (test_loop=0; test_loop < TEST_LOOP; ++test_loop) {
for (i=0; i<MAX_PAIR; ++i) {
app_perror("...error creating socket", rc);
status=-70; goto on_error;
}
&test_cb, &client[i].key);
app_perror("...error ", rc);
status=-80; goto on_error;
}
sizeof(server[i].accept_op));
&server[i].sock, &server[i].local_addr,
&server[i].rem_addr,
&server[i].rem_addr_len);
app_perror("...ERROR in pj_ioqueue_accept()", rc);
status=-90; goto on_error;
}
++pending_op;
}
sizeof(listener.addr));
app_perror("...ERROR in pj_ioqueue_connect()", rc);
status=-100; goto on_error;
}
++pending_op;
}
while (pending_op) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status > 0) {
if (status > pending_op) {
"...error: pj_ioqueue_poll() returned %d "
"(only expecting %d)",
status, pending_op));
return -110;
}
pending_op -= status;
if (pending_op == 0) {
status = 0;
}
}
}
}
if (pending_op == 0) {
for (i=0; i<10; ++i) {
#ifdef PJ_SYMBIAN
#else
#endif
if (status != 0) {
status=-120; goto on_error;
}
}
}
for (i=0; i<MAX_PAIR; ++i) {
status = -130;
goto on_error;
}
if (server[i].local_addr.sin_family !=
pj_AF_INET() ||
server[i].local_addr.sin_addr.s_addr == 0 ||
server[i].local_addr.sin_port == 0)
{
app_perror("...ERROR address not set", rc);
status = -140;
goto on_error;
}
if (server[i].rem_addr.sin_family !=
pj_AF_INET() ||
server[i].rem_addr.sin_addr.s_addr == 0 ||
server[i].rem_addr.sin_port == 0)
{
app_perror("...ERROR address not set", rc);
status = -150;
goto on_error;
}
&test_cb, &server[i].key);
app_perror("...ERROR in pj_ioqueue_register_sock", rc);
status = -160;
goto on_error;
}
status = send_recv_test(ioque, server[i].key, client[i].key,
send_buf, recv_buf, bufsize, &t_elapsed);
if (status != 0) {
goto on_error;
}
}
status = 0;
for (i=0; i<MAX_PAIR; ++i) {
if (server[i].key != NULL) {
server[i].key = NULL;
}
if (client[i].key != NULL) {
client[i].key = NULL;
}
}
}
status = 0;
on_error:
for (i=0; i<MAX_PAIR; ++i) {
if (server[i].key != NULL) {
server[i].key = NULL;
}
if (client[i].key != NULL) {
client[i].key = NULL;
}
}
if (listener.key) {
listener.key = NULL;
}
if (ioque != NULL)
return status;
}
{
int status;
char title[64];
pj_ansi_snprintf(title, sizeof(title), "%s (concur:%d, epoll_flags:0x%x)",
PJ_LOG(3, (THIS_FILE,
"..%s compliance test 0 (success scenario)",
title));
if ((status=compliance_test_0(cfg)) != 0) {
PJ_LOG(1, (THIS_FILE,
"....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE,
"..%s compliance test 1 (failed scenario)",
title));
if ((status=compliance_test_1(cfg)) != 0) {
PJ_LOG(1, (THIS_FILE,
"....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE,
"..%s compliance test 2 (repeated accept)",
title));
if ((status=compliance_test_2(cfg)) != 0) {
PJ_LOG(1, (THIS_FILE,
"....FAILED (status=%d)\n", status));
return status;
}
return 0;
}
int tcp_ioqueue_test()
{
#if PJ_HAS_LINUX_EPOLL
0
#endif
};
int i, rc;
PJ_LOG(3, (THIS_FILE,
"..%s TCP compliance test, epoll_flags=0x%x",
rc = tcp_ioqueue_test_impl(&cfg);
if (rc != 0)
return rc;
}
PJ_LOG(3, (THIS_FILE,
"..%s TCP compliance test, concurrency=%d",
rc = tcp_ioqueue_test_impl(&cfg);
if (rc != 0)
return rc;
}
return 0;
}
#endif
#else
int dummy_uiq_tcp;
#endif
long pj_ssize_t
Definition: types.h:64
int pj_bool_t
Definition: types.h:71
struct pj_ioqueue_t pj_ioqueue_t
Definition: types.h:210
long pj_sock_t
Definition: types.h:263
struct pj_ioqueue_key_t pj_ioqueue_key_t
Definition: types.h:216
int pj_status_t
Definition: types.h:68
#define PJ_ARRAY_SIZE(a)
Definition: types.h:281
@ PJ_SUCCESS
Definition: types.h:93
@ PJ_TRUE
Definition: types.h:96
@ PJ_FALSE
Definition: types.h:99
pj_ioqueue_epoll_flag
Definition: ioqueue.h:337
void pj_ioqueue_op_key_init(pj_ioqueue_op_key_t *op_key, pj_size_t size)
pj_status_t pj_ioqueue_create2(pj_pool_t *pool, pj_size_t max_fd, const pj_ioqueue_cfg *cfg, pj_ioqueue_t **ioqueue)
pj_status_t pj_ioqueue_destroy(pj_ioqueue_t *ioque)
pj_status_t pj_ioqueue_recv(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, pj_uint32_t flags)
pj_status_t pj_ioqueue_unregister(pj_ioqueue_key_t *key)
void pj_ioqueue_cfg_default(pj_ioqueue_cfg *cfg)
pj_status_t pj_ioqueue_send(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, pj_uint32_t flags)
int pj_ioqueue_poll(pj_ioqueue_t *ioque, const pj_time_val *timeout)
pj_status_t pj_ioqueue_register_sock(pj_pool_t *pool, pj_ioqueue_t *ioque, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key)
const char * pj_ioqueue_name(void)
pj_status_t pj_ioqueue_connect(pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen)
pj_status_t pj_ioqueue_accept(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t *new_sock, pj_sockaddr_t *local, pj_sockaddr_t *remote, int *addrlen)
@ PJ_IOQUEUE_EPOLL_AUTO
Definition: ioqueue.h:351
@ PJ_IOQUEUE_EPOLL_ONESHOT
Definition: ioqueue.h:344
@ PJ_IOQUEUE_EPOLL_EXCLUSIVE
Definition: ioqueue.h:340
#define PJ_LOG(level, arg)
Definition: log.h:106
void * pj_pool_alloc(pj_pool_t *pool, pj_size_t size)
pj_pool_t * pj_pool_create(pj_pool_factory *factory, const char *name, pj_size_t initial_size, pj_size_t increment_size, pj_pool_callback *callback)
void pj_pool_release(pj_pool_t *pool)
int pj_memcmp(const void *buf1, const void *buf2, pj_size_t size)
Definition: string.h:823
char * pj_create_random_string(char *str, pj_size_t length)
const pj_str_t * pj_cstr(pj_str_t *str, const char *s)
Definition: string.h:100
pj_status_t pj_sock_bind(pj_sock_t sockfd, const pj_sockaddr_t *my_addr, int addrlen)
pj_status_t pj_sock_close(pj_sock_t sockfd)
pj_status_t pj_sock_listen(pj_sock_t sockfd, int backlog)
#define pj_AF_INET()
Definition: sock.h:113
#define PJ_INVALID_SOCKET
Definition: sock.h:485
pj_in_addr pj_inet_addr(const pj_str_t *cp)
pj_status_t pj_sock_getsockname(pj_sock_t sockfd, pj_sockaddr_t *addr, int *namelen)
#define pj_SOCK_STREAM()
Definition: sock.h:160
pj_status_t pj_sockaddr_in_init(pj_sockaddr_in *addr, const pj_str_t *cp, pj_uint16_t port)
pj_status_t pj_sock_socket(int family, int type, int protocol, pj_sock_t *sock)
pj_bool_t pj_symbianos_poll(int priority, int ms_timeout)
pj_status_t pj_get_timestamp(pj_timestamp *ts)
#define PJ_TIME_VAL_MSEC(t)
Definition: types.h:421
#define PJ_IOQUEUE_MAX_HANDLES
Definition: config.h:681
#define PJ_UNUSED_ARG(arg)
Definition: config.h:1343
pj_status_t pj_get_os_error(void)
#define PJ_EPENDING
Definition: errno.h:322
Definition: udp_echo_srv_ioqueue.c:27
Definition: ioqueue.h:219
Definition: ioqueue.h:362
unsigned epoll_flags
Definition: ioqueue.h:370
pj_bool_t default_concurrency
Definition: ioqueue.h:379
Definition: ioqueue.h:208
pj_in_addr sin_addr
Definition: sock.h:542
long msec
Definition: types.h:402
long sec
Definition: types.h:399
pj_uint32_t lo
Definition: types.h:142
struct pj_timestamp::@9 u32