//ServerNBTHR.c #include #include #include #include #include #include #include #include #include #include #include #define NUM_THREADS 2 pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; void die(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); abort(); } void bark(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); } //Описание поцедуры ведущего потока , которая возвращает //дескрипторы пассивного сокета, привязанного к адресу сервера. int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if((listenSocket=socket(AF_INET,SOCK_STREAM,0))<0) die("socket()",errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = AF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if(bind(listenSocket,(struct sockaddr*)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()",errno); if(listen(listenSocket,5)<0) die("listen()",errno); return listenSocket; } //Описание процедуры выполняемой всеми ведомыми потоками void *serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = (int)data; if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); while(1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); //Добавление дескриптора к множеству readFdSet FD_SET(listenSocket,&readFdSet); maxFdNum = listenSocket; for(currentConn = connections;currentConn!=NULL;currentConn = currentConn->next) { if(currentConn->isReading) FD_SET(currentConn->dataSocket,&readFdSet); else FD_SET(currentConn->dataSocket,&writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn->dataSocket : maxFdNum; } //Получение множества дескрипторов сокетов для обработки if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0) { if(errno == EINTR) continue; else die("select()",errno); } currentConnPtr=&connections; while(*currentConnPtr!=NULL) { //Проверка принадлежности дескриптора //(*currentConnPtr)->dataSocket к множеству readFdSet if((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data),0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK) { bark("recv()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if(result==0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; (*currentConnPtr)->data[result-1]=0; printf("Recieving %d bytes %s as Slave Thread id = '%lu' \n",result-1,(*currentConnPtr)->data,pthread_self()); } } else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data+(*currentConnPtr)->dataSent, (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN) { bark("write()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent +=result; if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%lu' \n",pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if(FD_ISSET(listenSocket,&readFdSet)) { while(1) { /* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket,(struct sockaddr*)NULL,NULL); pthread_mutex_unlock(&request_mutex); if(result < 0) { if(errno==EAGAIN || errno == EWOULDBLOCK) break; die("accept()",errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if(*currentConnPtr==NULL) die("malloc()",0); if(fcntl(result,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%lu' \n",pthread_self()); } } } } } int main(int argc,char *argv[]) { int k; int descSock; char *service="8001"; switch(argc) { case 1: break; case 2: service = argv[1]; break; default: printf ("Usage: ./ServerBNTH [port]\n"); exit(1); } size_t stacksize; pthread_t p_thread[NUM_THREADS]; /* Установка размера стека для ведомых потоков */ pthread_attr_t attr; pthread_attr_init(&attr); stacksize = 500000; pthread_attr_setstacksize (&attr, stacksize); pthread_attr_getstacksize (&attr, &stacksize); /* Получение значения дескриптора пассивного сокета */ descSock = getServerSocket(atoi(service)); /* Запуск ведомых потоков */ for(k=0; k