基于队列的 UDP 多线程收发 demo

来看这个代码

#include <stdio.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <string.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <queue>
#include <pthread.h>

#define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ );

static const int iBufferSize = 2048;
int iUdpServerFd;
static const unsigned short uiUdpServerPort = 6000;
static const int iUdpProcessThreadNumber = 2;

struct InQueueEvent
{
    struct sockaddr_in stClientAddr;
    socklen_t stClienAddrLen;
    char szRecvBuffer[iBufferSize];
    int iRecvLen;
};

struct OutQueueEvent
{
    struct sockaddr_in stDestAddr;
    socklen_t stDestAddrLen;
    char szSendBuffer[iBufferSize];
    int iSendContentLen;
};

std::queue<struct InQueueEvent> stInQueue;
pthread_mutex_t stInQueueMutex;
std::queue<struct OutQueueEvent> stOutQueue;
pthread_mutex_t stOutQueueMutex;

void* UdpProcessThread(void*)
{
    _PRINTF_("udp process thread %ld start", pthread_self());
    while (1)
    {
        if (stInQueue.empty())
        {
            continue;
        }
        pthread_mutex_lock(&stInQueueMutex);
        struct InQueueEvent stInQueueEvent = stInQueue.front();
        stInQueue.pop();
        pthread_mutex_unlock(&stInQueueMutex);
        _PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer);
        struct OutQueueEvent stOutQueueEvent;
        stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr;
        stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen;
        memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen);
        stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen;
        pthread_mutex_lock(&stOutQueueMutex);
        stOutQueue.push(stOutQueueEvent);
        pthread_mutex_unlock(&stOutQueueMutex);        
    }
}

void* UdpRecvThread(void*)
{
    _PRINTF_("udp recv thread start");
    while(1)
    {
        struct InQueueEvent stInQueueEvent;
        memset(&stInQueueEvent, 0, sizeof(stInQueueEvent));
        stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr);
        stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen));
        _PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer);
        pthread_mutex_lock(&stInQueueMutex);
        stInQueue.push(stInQueueEvent);
        pthread_mutex_unlock(&stInQueueMutex);
    }
}

void* UdpSendThread(void*)
{
    _PRINTF_("udp send thread start");
    while(1)
    {
        if (stOutQueue.empty())
        {
            continue;
        }
        pthread_mutex_lock(&stOutQueueMutex);
        struct OutQueueEvent stOutQueueEvent = stOutQueue.front();
        stOutQueue.pop();
        pthread_mutex_unlock(&stOutQueueMutex);
        int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen);
        _PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer);
    }
}

int CreateUdpServer(unsigned short uiUdpServerPort)
{
    int iListenFd = socket(AF_INET, SOCK_DGRAM, 0);
    assert(iListenFd > 0);
    struct sockaddr_in stListenAddr;
    memset(&stListenAddr, 0, sizeof(stListenAddr));
    stListenAddr.sin_family = AF_INET;
    stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    stListenAddr.sin_port = htons(uiUdpServerPort);
    int iReuseFlag = 1;
    assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0);
    assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0);  
    return iListenFd;
}

int main()
{
    pthread_mutex_init(&stInQueueMutex, NULL);
    pthread_mutex_init(&stOutQueueMutex, NULL);
    iUdpServerFd = CreateUdpServer(uiUdpServerPort);
    pthread_t tid;
    assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0);
    assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0);
    for (int i = 0; i < iUdpProcessThreadNumber; i++)
    {
        assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0);
    }    
    while (1)
    {
        sleep(10);
    }
    return 0;
}

用 python 写个客户端测试一下

#!/usr/bin/env python

import socket

HOST = 'xx.xx.xx.xx'
PORT = 6000
BUFSIZ = 1024
ADDR = (HOST, PORT)

udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while True:
    data = raw_input('> ')
    if not data:
        break
    udpCliSock.sendto(data, ADDR)
    data, ADDR = udpCliSock.recvfrom(BUFSIZ)
    if not data:
        break
    print ADDR, data

udpCliSock.close()

跑起来似乎没有问题

[DEBUG:       UdpRecvThread@        EpollUdp.cpp,0069] udp recv thread start
[DEBUG:       UdpSendThread@        EpollUdp.cpp,0085] udp send thread start
[DEBUG:    UdpProcessThread@        EpollUdp.cpp,0044] udp process thread 1107310912 start
[DEBUG:    UdpProcessThread@        EpollUdp.cpp,0044] udp process thread 1098918208 start
[DEBUG:       UdpRecvThread@        EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 1
[DEBUG:    UdpProcessThread@        EpollUdp.cpp,0055] thread 1107310912 processing the request 1
[DEBUG:       UdpSendThread@        EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 1
[DEBUG:       UdpRecvThread@        EpollUdp.cpp,0076] recv 1 bytes from ip 10.31.18.40 port 62272 message 2
[DEBUG:    UdpProcessThread@        EpollUdp.cpp,0055] thread 1107310912 processing the request 2
[DEBUG:       UdpSendThread@        EpollUdp.cpp,0097] send 1 bytes to ip 10.31.18.40 port 62272 message 2

但是多发几次,就 core 了,这是为什么呢,gdb 调试发现 core 在 memcpy 上面,这个也不太能理解,不过,把代码尝试的改成这样就可以了

#include <stdio.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <string.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <queue>
#include <pthread.h>

#define _PRINTF_(format, ...) printf("[%s:%20s@%20s,%04d] " format "\n", "DEBUG",__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__ );

static const int iBufferSize = 2048;
int iUdpServerFd;
static const unsigned short uiUdpServerPort = 6000;
static const int iUdpProcessThreadNumber = 2;

struct InQueueEvent
{
    struct sockaddr_in stClientAddr;
    socklen_t stClienAddrLen;
    char szRecvBuffer[iBufferSize];
    int iRecvLen;
};

struct OutQueueEvent
{
    struct sockaddr_in stDestAddr;
    socklen_t stDestAddrLen;
    char szSendBuffer[iBufferSize];
    int iSendContentLen;
};

std::queue<struct InQueueEvent> stInQueue;
pthread_mutex_t stInQueueMutex;
std::queue<struct OutQueueEvent> stOutQueue;
pthread_mutex_t stOutQueueMutex;

void* UdpProcessThread(void*)
{
    _PRINTF_("udp process thread %ld start", pthread_self());
    while (1)
    {
        pthread_mutex_lock(&stInQueueMutex);
        if (stInQueue.empty())
        {
            pthread_mutex_unlock(&stInQueueMutex);
            continue;
        }
        struct InQueueEvent stInQueueEvent = stInQueue.front();
        stInQueue.pop();
        pthread_mutex_unlock(&stInQueueMutex);
        _PRINTF_("thread %ld processing the request %s", pthread_self(), stInQueueEvent.szRecvBuffer);
        struct OutQueueEvent stOutQueueEvent;
        memset(&stOutQueueEvent, 0, sizeof(stOutQueueEvent));
        stOutQueueEvent.stDestAddr = stInQueueEvent.stClientAddr;
        stOutQueueEvent.stDestAddrLen = stInQueueEvent.stClienAddrLen;
        memcpy(stOutQueueEvent.szSendBuffer, stInQueueEvent.szRecvBuffer, stInQueueEvent.iRecvLen);
        stOutQueueEvent.iSendContentLen = stInQueueEvent.iRecvLen;
        pthread_mutex_lock(&stOutQueueMutex);
        stOutQueue.push(stOutQueueEvent);
        pthread_mutex_unlock(&stOutQueueMutex);        
    }
}

void* UdpRecvThread(void*)
{
    _PRINTF_("udp recv thread start");
    while(1)
    {
        struct InQueueEvent stInQueueEvent;
        memset(&stInQueueEvent, 0, sizeof(stInQueueEvent));
        stInQueueEvent.stClienAddrLen = sizeof(stInQueueEvent.stClientAddr);
        stInQueueEvent.iRecvLen = recvfrom(iUdpServerFd, &(stInQueueEvent.szRecvBuffer), sizeof(stInQueueEvent.szRecvBuffer), 0, (struct sockaddr*)&(stInQueueEvent.stClientAddr), &(stInQueueEvent.stClienAddrLen));
        _PRINTF_("recv %d bytes from ip %s port %d message %s", stInQueueEvent.iRecvLen, inet_ntoa(stInQueueEvent.stClientAddr.sin_addr), ntohs(stInQueueEvent.stClientAddr.sin_port), stInQueueEvent.szRecvBuffer);
        pthread_mutex_lock(&stInQueueMutex);
        stInQueue.push(stInQueueEvent);
        pthread_mutex_unlock(&stInQueueMutex);
    }
}

void* UdpSendThread(void*)
{
    _PRINTF_("udp send thread start");
    while(1)
    {
        pthread_mutex_lock(&stOutQueueMutex);
        if (stOutQueue.empty())
        {
            pthread_mutex_unlock(&stOutQueueMutex);
            continue;
        }
        struct OutQueueEvent stOutQueueEvent = stOutQueue.front();
        stOutQueue.pop();
        pthread_mutex_unlock(&stOutQueueMutex);
        int iSendLen = sendto(iUdpServerFd, &(stOutQueueEvent.szSendBuffer), stOutQueueEvent.iSendContentLen, 0, (struct sockaddr*)&(stOutQueueEvent.stDestAddr), stOutQueueEvent.stDestAddrLen);
        _PRINTF_("send %d bytes to ip %s port %d message %s", iSendLen, inet_ntoa(stOutQueueEvent.stDestAddr.sin_addr), ntohs(stOutQueueEvent.stDestAddr.sin_port), stOutQueueEvent.szSendBuffer);
    }
}

int CreateUdpServer(unsigned short uiUdpServerPort)
{
    int iListenFd = socket(AF_INET, SOCK_DGRAM, 0);
    assert(iListenFd > 0);
    struct sockaddr_in stListenAddr;
    memset(&stListenAddr, 0, sizeof(stListenAddr));
    stListenAddr.sin_family = AF_INET;
    stListenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    stListenAddr.sin_port = htons(uiUdpServerPort);
    int iReuseFlag = 1;
    assert(setsockopt(iListenFd, SOL_SOCKET, SO_REUSEADDR, &iReuseFlag, sizeof(int)) == 0);
    assert(bind(iListenFd, (struct sockaddr*)&stListenAddr, sizeof(stListenAddr)) == 0);  
    return iListenFd;
}

int main()
{
    pthread_mutex_init(&stInQueueMutex, NULL);
    pthread_mutex_init(&stOutQueueMutex, NULL);
    iUdpServerFd = CreateUdpServer(uiUdpServerPort);
    pthread_t tid;
    assert(pthread_create(&tid, NULL, UdpRecvThread, NULL) == 0);
    assert(pthread_create(&tid, NULL, UdpSendThread, NULL) == 0);
    for (int i = 0; i < iUdpProcessThreadNumber; i++)
    {
        assert(pthread_create(&tid, NULL, UdpProcessThread, NULL) == 0);
    }    
    while (1)
    {
        sleep(10);
    }
    return 0;
}

注意高亮的部分,在检查队列是否为空的时候,也要包含在锁的区域里面,这样就可以了

———————————————-

2013-12-10 16:37:06 update 使用这个脚本,可以多线程压测服务

#!/usr/bin/env python

import socket
import random
import threading

HOST = 'xx.xx.xx.xx'
PORT = 6000
BUFSIZ = 1024
ADDR = (HOST, PORT)

class UdpTestThread(threading.Thread):
    def run(self):
        udpCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        for i in range(1000):
            data = str(random.random())
            udpCliSock.sendto(data, ADDR)
            data = udpCliSock.recvfrom(BUFSIZ)
            #print data
        udpCliSock.close()

for i in range(50):
    UdpTestThread().start()

—————————-

2013-12-10 17:21:05 update 发现上面的例子,压测结果很差,于是写了个极简的,作为比对

#include <sys/socket.h>
#include <arpa/inet.h>

int main() {
    int fd = socket(AF_INET, SOCK_DGRAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(6000);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    bind(fd, (struct sockaddr*)&addr, sizeof(addr));
    while (1) {
        char buffer[2048] = {0};
        struct sockaddr cli;
        socklen_t cli_len = sizeof(cli);
        int recv_len = recvfrom(fd, buffer, sizeof(buffer), 0, &cli, &cli_len);
        sendto(fd, buffer, recv_len, 0, &cli, cli_len);
    }
    return 0;
}

发现差不多,看来还要找原因

4 thoughts on “基于队列的 UDP 多线程收发 demo

Leave a Reply to ZRJ Cancel reply

Your email address will not be published. Required fields are marked *