基于消息機制的異步架構之消息隊列
消息隊列的頭文件msgqueue.h/*
- msgqueue.h /
ifndef MSGQUEUEH
define MSGQUEUEH
include "conn.h"
define MAX_MSG_LENGTH (1024)
typedef struct MSG { char buf[MAX_MSG_LENGTH]; uint16 buf_len; CONN c; struct MSG next;
} MSG;
typedef struct MSG_QUEUE { MSG head; MSG tail; int size; } MSG_QUEUE;
MSG_QUEUE* create_msg_queue();
extern int push_msg(MSG_QUEUE q,CONN c);
int empty_msg_queue( MSG_QUEUE *q);
int get_msg(MSG_QUEUE q ,MSG temp_msg);
endif / MSGQUEUEH /
消息隊列的實現文件 * msgqueue.c
/*
- msgqueue.c /
include "msgqueue.h"
MSG_QUEUE* create_msg_queue() {
MSG_QUEUE qmsg=malloc(sizeof(MSG_QUEUE)); /申請頭尾指針結點/ if(qmsg==NULL){ fprintf(stderr, "create_msg_queue malloc error, errno: %d %m\n", errno); return NULL; } MSG msg=malloc(sizeof(MSG)); /申請鏈隊頭結點/ if(msg==NULL){ fprintf(stderr, "create_head_msg malloc error, errno: %d %m\n", errno); free(qmsg); return NULL; }
msg->next=NULL; qmsg->head=qmsg->tail=msg; qmsg->size=0; return qmsg; }
int push_msg(MSG_QUEUE q,CONN c) { if(q->size>10240){ fprintf(stderr, "push_msg error,msg_queue size is over %d ,errno: %d %m\n",q->size, errno); return -1; }
MSG msg; msg=malloc(sizeof(MSG)); /申請新結點*/ if(msg==NULL){ fprintf(stderr, "create_new_msg malloc error, errno: %d %m\n", errno); return -1; } memcpy(msg->buf,c->in_buf,c->in_buf_len); msg->buf_len=c->in_buf_len; msg->c=c; msg->next=NULL; q->tail->next=msg; q->tail=msg; q->size++;
return 0; }
int empty_msg_queue( MSG_QUEUE *q) { if (q->head==q->tail) return 0; else return 1; }
int get_msg(MSG_QUEUE q ,MSG temp_msg) { if (q->head==q->tail){ return -1; } MSG *temp=q->head->next; if(q->head->next == q->tail) //如果要出隊的結點為最后一個結點,使q->rear指向頭結點防止出現懸空的指針 q->tail = q->head;
// msg_buf = temp->buf; //將出隊的數據元素存入*e temp_msg->c=temp->c; memcpy(temp_msg->buf,temp->buf,temp->buf_len); temp_msg->buf_len=temp->buf_len;
q->head->next = temp->next; //使下一個結點成為隊頭,如果沒有下一個結點則為NULL
q->size--; //int buf_len=temp->buf_len; free(temp); //刪除要出隊的結點 return 0;
}</pre>