非侵入式mongodb async find patch
前阵子为mongodb的cdriver添加了一个async find补丁, 不过简陋的将源码插入其中.. , 觉得怎么看怎么别扭... 如果官方更新了... 就容易悲剧了, 所以, 为了自己不用老是跟着改动, 就单独的将patch分离出来, 没什么技术含量, 仅仅作为记录.mongo_async.h
//base info: create by final/*effect: mongo db async query patch***/#ifndef _MONGO_ASYNC_H_#define _MONGO_ASYNC_H_#include "mongo.h"typedef int (*mongo_async_send_handler)(const char* pbuf, size_t len, void* arg);// async mongo find interface, the param id is the callback param#define MONGO_ASYNC_READ_HEAD_LEN (sizeof(mongo_header) + sizeof(mongo_reply_fields))void mongo_async_init(mongo_async_send_handler pfunc);int mongo_async_find(mongo_connection* conn, const char* ns, const bson* query, bson* fields, int nToReturn, int nToSkip, int options, int id, void* arg);mongo_cursor* mongo_async_response(mongo_connection* conn, const char* ns, mongo_reply* mm);mongo_reply * mongo_read_async_response_head(mongo_connection* conn, const char* buf, int* data_len/*out*/, int* query_id/*out*/);// buf size must be MONGO_ASYNC_READ_HEAD_LEN and return left data lenmongo_reply* mongo_read_async_response_data(mongo_connection* conn, mongo_reply* out, const char* buf);#endif
mongo_async.c
//base info: create by hyz//effect: mongo db async patch#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mongo_async.h"//TODO...extern mongo_message * mongo_message_create( int len , int id , int responseTo , int op );extern char * mongo_data_append( char * start , const void * data , int len );extern char * mongo_data_append32( char * start , const void * data);static mongo_async_send_handler async_handler = NULL; staticint mongo_message_async_send(mongo_connection* conn, mongo_message* mm, void* arg){mongo_header head; /* little endian */bson_little_endian32(&head.len, &mm->head.len);bson_little_endian32(&head.id, &mm->head.id);bson_little_endian32(&head.responseTo, &mm->head.responseTo);bson_little_endian32(&head.op, &mm->head.op);MONGO_TRY{if( async_handler((char*)&head, sizeof(head), arg) ){free(mm);return 1;} if( async_handler((char*)&mm->data, mm->head.len - sizeof(head), arg) ){free(mm);return 1;} }MONGO_CATCH{free(mm);MONGO_RETHROW();}free(mm);return 0;}void mongo_async_init(mongo_async_send_handler pfunc){if( !pfunc ){printf("mongodb async init failed/n");exit(-5);}async_handler = pfunc;}// async mongo find // note: the id must not be zero or mongo_message_create will call rand() to generateint mongo_async_find(mongo_connection* conn, const char* ns, const bson* query, bson* fields, int nToReturn, int nToSkip, int options, int id, void* arg){char * data;mongo_message * mm = mongo_message_create( 16 + /* header */4 + /*options */strlen( ns ) + 1 + /* ns */4 + 4 + /* skip,return */bson_size( query ) +bson_size( fields ) ,id , 0 , mongo_op_query );data = &mm->data;data = mongo_data_append32( data , &options );data = mongo_data_append( data , ns , strlen( ns ) + 1 );data = mongo_data_append32( data , &nToSkip );data = mongo_data_append32( data , &nToReturn );data = mongo_data_append( data , query->data , bson_size( query ) );if ( fields )data = mongo_data_append( data , fields->data , bson_size( fields ) );bson_fatal_msg( (data == ((char*)mm) + mm->head.len), "query building fail!" );return mongo_message_async_send(conn, mm, arg);}// buf len must be >= MONGO_ASYNC_READ_HEAD_LENmongo_reply * mongo_read_async_response_head(mongo_connection* conn, const char* buf, int* data_len/*out*/, int* query_id/*out*/ ){mongo_header head; /* header from network */mongo_reply_fields fields; /* header from network */mongo_reply * out; /* native endian */int len;memcpy(&head, buf, sizeof(head));memcpy(&fields, buf + sizeof(head), sizeof(fields));bson_little_endian32(&len, &head.len);if (len < sizeof(head)+sizeof(fields) || len > 64*1024*1024)MONGO_THROW(MONGO_EXCEPT_NETWORK); /* most likely corruption */out = (mongo_reply*)bson_malloc(len);out->head.len = len;bson_little_endian32(&out->head.id, &head.id);bson_little_endian32(&out->head.responseTo, &head.responseTo);bson_little_endian32(&out->head.op, &head.op);bson_little_endian32(&out->fields.flag, &fields.flag);bson_little_endian64(&out->fields.cursorID, &fields.cursorID);bson_little_endian32(&out->fields.start, &fields.start);bson_little_endian32(&out->fields.num, &fields.num);*data_len = out->head.len - MONGO_ASYNC_READ_HEAD_LEN;*query_id = out->head.responseTo;return out;}// read response datamongo_reply* mongo_read_async_response_data(mongo_connection* conn, mongo_reply* out, const char* buf){int len = out->head.len;MONGO_TRY{memcpy(&out->objs, buf, len - MONGO_ASYNC_READ_HEAD_LEN);}MONGO_CATCH{free(out);MONGO_RETHROW();}return out;}mongo_cursor* mongo_async_response(mongo_connection* conn, const char* ns, mongo_reply* mm){int sl;volatile mongo_cursor * cursor; /* volatile due to longjmp in mongo exception handler */cursor = (mongo_cursor*)bson_malloc(sizeof(mongo_cursor));MONGO_TRY{cursor->mm = mm;}MONGO_CATCH{free((mongo_cursor*)cursor); /* cast away volatile, not changing type */MONGO_RETHROW();}sl = strlen(ns)+1;cursor->ns = bson_malloc(sl);if (!cursor->ns){free(cursor->mm);free((mongo_cursor*)cursor); /* cast away volatile, not changing type */return 0;}memcpy((void*)cursor->ns, ns, sl); /* cast needed to silence GCC warning */cursor->conn = conn;cursor->current.data = NULL;return (mongo_cursor*)cursor;}
页:
[1]