来看这个代码
#include <stdio.h> #include <assert.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <fcntl.h> #include <string.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <queue> #include <pthread.h> #define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ ); static const int iBufferSize = 2048; int iUdpServerFd; static const unsigned short uiUdpServerPort = 6000; static const int iUdpProcessThreadNumber = 2; struct InQueueEvent { struct sockaddr_in stClientAddr; socklen_t stClienAddrLen; char szRecvBuffer[iBufferSize]; int iRecvLen; }; struct OutQueueEvent { struct sockaddr_in stDestAddr; socklen_t stDestAddrLen; char szSendBuffer[iBufferSize]; int iSendContentLen; }; std::queue<struct InQueueEvent> stInQueue; pthread_mutex_t stInQueueMutex; std::queue<struct OutQueueEvent> stOutQueue; pthread_mutex_t stOutQueueMutex; void* UdpProcessThread(void*) { _PRINTF_("udp process thread %ld start", pthread_self()); while (1) { if (stInQueue.empty()) { continue; } pthread_mutex_lock(&stInQueueMutex); struct InQueueEvent stInQueueEvent = stInQueue.front(); stInQueue.pop(); pthread_mutex_unlock(&stInQueueMutex); _PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer); struct OutQueueEvent stOutQueueEvent; stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr; stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen; memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen); stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen; pthread_mutex_lock(&stOutQueueMutex); stOutQueue.push(stOutQueueEvent); pthread_mutex_unlock(&stOutQueueMutex); } } void* UdpRecvThread(void*) { _PRINTF_("udp recv thread start"); while(1) { struct InQueueEvent stInQueueEvent; memset(&stInQueueEvent, 0, sizeof(stInQueueEvent)); stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr); stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen)); _PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer); pthread_mutex_lock(&stInQueueMutex); stInQueue.push(stInQueueEvent); pthread_mutex_unlock(&stInQueueMutex); } } void* UdpSendThread(void*) { _PRINTF_("udp send thread start"); while(1) { if (stOutQueue.empty()) { continue; } pthread_mutex_lock(&stOutQueueMutex); struct OutQueueEvent stOutQueueEvent = stOutQueue.front(); stOutQueue.pop(); pthread_mutex_unlock(&stOutQueueMutex); int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen); _PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer); } } int CreateUdpServer(unsigned short uiUdpServerPort) { int iListenFd = socket(AF_INET, SOCK_DGRAM, 0); assert(iListenFd > 0); struct sockaddr_in stListenAddr; memset(&stListenAddr, 0, sizeof(stListenAddr)); stListenAddr.sin_family = AF_INET; stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY); stListenAddr.sin_port = htons(uiUdpServerPort); int iReuseFlag = 1; assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0); assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0); return iListenFd; } int main() { pthread_mutex_init(&stInQueueMutex, NULL); pthread_mutex_init(&stOutQueueMutex, NULL); iUdpServerFd = CreateUdpServer(uiUdpServerPort); pthread_t tid; assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0); assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0); for (int i = 0; i < iUdpProcessThreadNumber; i++) { assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0); } while (1) { sleep(10); } return 0; }
用 python 写个客户端测试一下
#!/usr/bin/env python import socket HOST = 'xx.xx.xx.xx' PORT = 6000 BUFSIZ = 1024 ADDR = (HOST, PORT) udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) while True: data = raw_input('> ') if not data: break udpCliSock.sendto(data, ADDR) data, ADDR = udpCliSock.recvfrom(BUFSIZ) if not data: break print ADDR, data udpCliSock.close()
跑起来似乎没有问题
[DEBUG: UdpRecvThread@ EpollUdp.cpp,0069] udp recv thread start [DEBUG: UdpSendThread@ EpollUdp.cpp,0085] udp send thread start [DEBUG: UdpProcessThread@ EpollUdp.cpp,0044] udp process thread 1107310912 start [DEBUG: UdpProcessThread@ EpollUdp.cpp,0044] udp process thread 1098918208 start [DEBUG: UdpRecvThread@ EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 1 [DEBUG: UdpProcessThread@ EpollUdp.cpp,0055] thread 1107310912 processing the request 1 [DEBUG: UdpSendThread@ EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 1 [DEBUG: UdpRecvThread@ EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 2 [DEBUG: UdpProcessThread@ EpollUdp.cpp,0055] thread 1107310912 processing the request 2 [DEBUG: UdpSendThread@ EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 2
但是多发几次,就 core 了,这是为什么呢,gdb 调试发现 core 在 memcpy 上面,这个也不太能理解,不过,把代码尝试的改成这样就可以了
#include <stdio.h> #include <assert.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <fcntl.h> #include <string.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <queue> #include <pthread.h> #define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ ); static const int iBufferSize = 2048; int iUdpServerFd; static const unsigned short uiUdpServerPort = 6000; static const int iUdpProcessThreadNumber = 2; struct InQueueEvent { struct sockaddr_in stClientAddr; socklen_t stClienAddrLen; char szRecvBuffer[iBufferSize]; int iRecvLen; }; struct OutQueueEvent { struct sockaddr_in stDestAddr; socklen_t stDestAddrLen; char szSendBuffer[iBufferSize]; int iSendContentLen; }; std::queue<struct InQueueEvent> stInQueue; pthread_mutex_t stInQueueMutex; std::queue<struct OutQueueEvent> stOutQueue; pthread_mutex_t stOutQueueMutex; void* UdpProcessThread(void*) { _PRINTF_("udp process thread %ld start", pthread_self()); while (1) { pthread_mutex_lock(&stInQueueMutex); if (stInQueue.empty()) { pthread_mutex_unlock(&stInQueueMutex); continue; } struct InQueueEvent stInQueueEvent = stInQueue.front(); stInQueue.pop(); pthread_mutex_unlock(&stInQueueMutex); _PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer); struct OutQueueEvent stOutQueueEvent; memset(&stOutQueueEvent, 0, sizeof(stOutQueueEvent)); stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr; stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen; memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen); stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen; pthread_mutex_lock(&stOutQueueMutex); stOutQueue.push(stOutQueueEvent); pthread_mutex_unlock(&stOutQueueMutex); } } void* UdpRecvThread(void*) { _PRINTF_("udp recv thread start"); while(1) { struct InQueueEvent stInQueueEvent; memset(&stInQueueEvent, 0, sizeof(stInQueueEvent)); stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr); stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen)); _PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer); pthread_mutex_lock(&stInQueueMutex); stInQueue.push(stInQueueEvent); pthread_mutex_unlock(&stInQueueMutex); } } void* UdpSendThread(void*) { _PRINTF_("udp send thread start"); while(1) { pthread_mutex_lock(&stOutQueueMutex); if (stOutQueue.empty()) { pthread_mutex_unlock(&stOutQueueMutex); continue; } struct OutQueueEvent stOutQueueEvent = stOutQueue.front(); stOutQueue.pop(); pthread_mutex_unlock(&stOutQueueMutex); int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen); _PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer); } } int CreateUdpServer(unsigned short uiUdpServerPort) { int iListenFd = socket(AF_INET, SOCK_DGRAM, 0); assert(iListenFd > 0); struct sockaddr_in stListenAddr; memset(&stListenAddr, 0, sizeof(stListenAddr)); stListenAddr.sin_family = AF_INET; stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY); stListenAddr.sin_port = htons(uiUdpServerPort); int iReuseFlag = 1; assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0); assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0); return iListenFd; } int main() { pthread_mutex_init(&stInQueueMutex, NULL); pthread_mutex_init(&stOutQueueMutex, NULL); iUdpServerFd = CreateUdpServer(uiUdpServerPort); pthread_t tid; assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0); assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0); for (int i = 0; i < iUdpProcessThreadNumber; i++) { assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0); } while (1) { sleep(10); } return 0; }
注意高亮的部分,在检查队列是否为空的时候,也要包含在锁的区域里面,这样就可以了
———————————————-
2013-12-10 16:37:06 update 使用这个脚本,可以多线程压测服务
#!/usr/bin/env python import socket import random import threading HOST = 'xx.xx.xx.xx' PORT = 6000 BUFSIZ = 1024 ADDR = (HOST, PORT) class UdpTestThread(threading.Thread): def run(self): udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for i in range(1000): data = str(random.random()) udpCliSock.sendto(data, ADDR) data = udpCliSock.recvfrom(BUFSIZ) #print data udpCliSock.close() for i in range(50): UdpTestThread().start()
—————————-
2013-12-10 17:21:05 update 发现上面的例子,压测结果很差,于是写了个极简的,作为比对
#include <sys/socket.h> #include <arpa/inet.h> int main() { int fd = socket(AF_INET, SOCK_DGRAM, 0); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(6000); addr.sin_addr.s_addr = htonl(INADDR_ANY); bind(fd, (struct sockaddr*)&addr, sizeof(addr)); while (1) { char buffer[2048] = {0}; struct sockaddr cli; socklen_t cli_len = sizeof(cli); int recv_len = recvfrom(fd, buffer, sizeof(buffer), 0, &cli, &cli_len); sendto(fd, buffer, recv_len, 0, &cli, cli_len); } return 0; }
发现差不多,看来还要找原因
这几天挖出来个GLib的,自带线程池和宏定义好了的锁,感觉再用C就顺手多了
其实用 libevent 基本就够了,不用造轮子
那玩意儿就是个轮子
还附带各种数据结构与常用工具类
反正我现在是觉得能有稳定开源的就用呗