来看这个代码
#include <stdio.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <string.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <queue>
#include <pthread.h>
#define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ );
static const int iBufferSize = 2048;
int iUdpServerFd;
static const unsigned short uiUdpServerPort = 6000;
static const int iUdpProcessThreadNumber = 2;
struct InQueueEvent
{
struct sockaddr_in stClientAddr;
socklen_t stClienAddrLen;
char szRecvBuffer[iBufferSize];
int iRecvLen;
};
struct OutQueueEvent
{
struct sockaddr_in stDestAddr;
socklen_t stDestAddrLen;
char szSendBuffer[iBufferSize];
int iSendContentLen;
};
std::queue<struct InQueueEvent> stInQueue;
pthread_mutex_t stInQueueMutex;
std::queue<struct OutQueueEvent> stOutQueue;
pthread_mutex_t stOutQueueMutex;
void* UdpProcessThread(void*)
{
_PRINTF_("udp process thread %ld start", pthread_self());
while (1)
{
if (stInQueue.empty())
{
continue;
}
pthread_mutex_lock(&stInQueueMutex);
struct InQueueEvent stInQueueEvent = stInQueue.front();
stInQueue.pop();
pthread_mutex_unlock(&stInQueueMutex);
_PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer);
struct OutQueueEvent stOutQueueEvent;
stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr;
stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen;
memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen);
stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen;
pthread_mutex_lock(&stOutQueueMutex);
stOutQueue.push(stOutQueueEvent);
pthread_mutex_unlock(&stOutQueueMutex);
}
}
void* UdpRecvThread(void*)
{
_PRINTF_("udp recv thread start");
while(1)
{
struct InQueueEvent stInQueueEvent;
memset(&stInQueueEvent, 0, sizeof(stInQueueEvent));
stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr);
stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen));
_PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer);
pthread_mutex_lock(&stInQueueMutex);
stInQueue.push(stInQueueEvent);
pthread_mutex_unlock(&stInQueueMutex);
}
}
void* UdpSendThread(void*)
{
_PRINTF_("udp send thread start");
while(1)
{
if (stOutQueue.empty())
{
continue;
}
pthread_mutex_lock(&stOutQueueMutex);
struct OutQueueEvent stOutQueueEvent = stOutQueue.front();
stOutQueue.pop();
pthread_mutex_unlock(&stOutQueueMutex);
int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen);
_PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer);
}
}
int CreateUdpServer(unsigned short uiUdpServerPort)
{
int iListenFd = socket(AF_INET, SOCK_DGRAM, 0);
assert(iListenFd > 0);
struct sockaddr_in stListenAddr;
memset(&stListenAddr, 0, sizeof(stListenAddr));
stListenAddr.sin_family = AF_INET;
stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
stListenAddr.sin_port = htons(uiUdpServerPort);
int iReuseFlag = 1;
assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0);
assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0);
return iListenFd;
}
int main()
{
pthread_mutex_init(&stInQueueMutex, NULL);
pthread_mutex_init(&stOutQueueMutex, NULL);
iUdpServerFd = CreateUdpServer(uiUdpServerPort);
pthread_t tid;
assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0);
assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0);
for (int i = 0; i < iUdpProcessThreadNumber; i++)
{
assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0);
}
while (1)
{
sleep(10);
}
return 0;
}
用 python 写个客户端测试一下
#!/usr/bin/env python
import socket
HOST = 'xx.xx.xx.xx'
PORT = 6000
BUFSIZ = 1024
ADDR = (HOST, PORT)
udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
data = raw_input('> ')
if not data:
break
udpCliSock.sendto(data, ADDR)
data, ADDR = udpCliSock.recvfrom(BUFSIZ)
if not data:
break
print ADDR, data
udpCliSock.close()
跑起来似乎没有问题
[DEBUG: UdpRecvThread@ EpollUdp.cpp,0069] udp recv thread start [DEBUG: UdpSendThread@ EpollUdp.cpp,0085] udp send thread start [DEBUG: UdpProcessThread@ EpollUdp.cpp,0044] udp process thread 1107310912 start [DEBUG: UdpProcessThread@ EpollUdp.cpp,0044] udp process thread 1098918208 start [DEBUG: UdpRecvThread@ EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 1 [DEBUG: UdpProcessThread@ EpollUdp.cpp,0055] thread 1107310912 processing the request 1 [DEBUG: UdpSendThread@ EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 1 [DEBUG: UdpRecvThread@ EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 2 [DEBUG: UdpProcessThread@ EpollUdp.cpp,0055] thread 1107310912 processing the request 2 [DEBUG: UdpSendThread@ EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 2
但是多发几次,就 core 了,这是为什么呢,gdb 调试发现 core 在 memcpy 上面,这个也不太能理解,不过,把代码尝试的改成这样就可以了
#include <stdio.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <string.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <queue>
#include <pthread.h>
#define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ );
static const int iBufferSize = 2048;
int iUdpServerFd;
static const unsigned short uiUdpServerPort = 6000;
static const int iUdpProcessThreadNumber = 2;
struct InQueueEvent
{
struct sockaddr_in stClientAddr;
socklen_t stClienAddrLen;
char szRecvBuffer[iBufferSize];
int iRecvLen;
};
struct OutQueueEvent
{
struct sockaddr_in stDestAddr;
socklen_t stDestAddrLen;
char szSendBuffer[iBufferSize];
int iSendContentLen;
};
std::queue<struct InQueueEvent> stInQueue;
pthread_mutex_t stInQueueMutex;
std::queue<struct OutQueueEvent> stOutQueue;
pthread_mutex_t stOutQueueMutex;
void* UdpProcessThread(void*)
{
_PRINTF_("udp process thread %ld start", pthread_self());
while (1)
{
pthread_mutex_lock(&stInQueueMutex);
if (stInQueue.empty())
{
pthread_mutex_unlock(&stInQueueMutex);
continue;
}
struct InQueueEvent stInQueueEvent = stInQueue.front();
stInQueue.pop();
pthread_mutex_unlock(&stInQueueMutex);
_PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer);
struct OutQueueEvent stOutQueueEvent;
memset(&stOutQueueEvent, 0, sizeof(stOutQueueEvent));
stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr;
stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen;
memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen);
stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen;
pthread_mutex_lock(&stOutQueueMutex);
stOutQueue.push(stOutQueueEvent);
pthread_mutex_unlock(&stOutQueueMutex);
}
}
void* UdpRecvThread(void*)
{
_PRINTF_("udp recv thread start");
while(1)
{
struct InQueueEvent stInQueueEvent;
memset(&stInQueueEvent, 0, sizeof(stInQueueEvent));
stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr);
stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen));
_PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer);
pthread_mutex_lock(&stInQueueMutex);
stInQueue.push(stInQueueEvent);
pthread_mutex_unlock(&stInQueueMutex);
}
}
void* UdpSendThread(void*)
{
_PRINTF_("udp send thread start");
while(1)
{
pthread_mutex_lock(&stOutQueueMutex);
if (stOutQueue.empty())
{
pthread_mutex_unlock(&stOutQueueMutex);
continue;
}
struct OutQueueEvent stOutQueueEvent = stOutQueue.front();
stOutQueue.pop();
pthread_mutex_unlock(&stOutQueueMutex);
int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen);
_PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer);
}
}
int CreateUdpServer(unsigned short uiUdpServerPort)
{
int iListenFd = socket(AF_INET, SOCK_DGRAM, 0);
assert(iListenFd > 0);
struct sockaddr_in stListenAddr;
memset(&stListenAddr, 0, sizeof(stListenAddr));
stListenAddr.sin_family = AF_INET;
stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
stListenAddr.sin_port = htons(uiUdpServerPort);
int iReuseFlag = 1;
assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0);
assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0);
return iListenFd;
}
int main()
{
pthread_mutex_init(&stInQueueMutex, NULL);
pthread_mutex_init(&stOutQueueMutex, NULL);
iUdpServerFd = CreateUdpServer(uiUdpServerPort);
pthread_t tid;
assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0);
assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0);
for (int i = 0; i < iUdpProcessThreadNumber; i++)
{
assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0);
}
while (1)
{
sleep(10);
}
return 0;
}
注意高亮的部分,在检查队列是否为空的时候,也要包含在锁的区域里面,这样就可以了
———————————————-
2013-12-10 16:37:06 update 使用这个脚本,可以多线程压测服务
#!/usr/bin/env python
import socket
import random
import threading
HOST = 'xx.xx.xx.xx'
PORT = 6000
BUFSIZ = 1024
ADDR = (HOST, PORT)
class UdpTestThread(threading.Thread):
def run(self):
udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for i in range(1000):
data = str(random.random())
udpCliSock.sendto(data, ADDR)
data = udpCliSock.recvfrom(BUFSIZ)
#print data
udpCliSock.close()
for i in range(50):
UdpTestThread().start()
—————————-
2013-12-10 17:21:05 update 发现上面的例子,压测结果很差,于是写了个极简的,作为比对
#include <sys/socket.h>
#include <arpa/inet.h>
int main() {
int fd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(6000);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(fd, (struct sockaddr*)&addr, sizeof(addr));
while (1) {
char buffer[2048] = {0};
struct sockaddr cli;
socklen_t cli_len = sizeof(cli);
int recv_len = recvfrom(fd, buffer, sizeof(buffer), 0, &cli, &cli_len);
sendto(fd, buffer, recv_len, 0, &cli, cli_len);
}
return 0;
}
发现差不多,看来还要找原因
这几天挖出来个GLib的,自带线程池和宏定义好了的锁,感觉再用C就顺手多了
其实用 libevent 基本就够了,不用造轮子
那玩意儿就是个轮子
还附带各种数据结构与常用工具类
反正我现在是觉得能有稳定开源的就用呗