yonght 发表于 2015-10-12 01:24:52

xen网络前后端交互

  前后端交互,依赖于include/xen/interface/io/ring.h, include/xen/interface/io/netif.h里的定义
  

/*
* Calculate size of a shared ring, given the total available space for the
* ring and indexes (_sz), and the name tag of the request/response structure.
* A ring contains as many entries as will fit, rounded down to the nearest
* power of two (so we can mask with (size-1) to loop around).
*/
#define __CONST_RING_SIZE(_s, _sz)            \
(__RD32(((_sz) - offsetof(struct _s##_sring, ring)) /   \
sizeof(((struct _s##_sring *)0)->ring)))
/*
* The same for passing in an actual pointer instead of a name tag.
*/
#define __RING_SIZE(_s, _sz)                        \
(__RD32(((_sz) - (long)&(_s)->ring + (long)(_s)) / sizeof((_s)->ring)))
#define DEFINE_RING_TYPES(__name, __req_t, __rsp_t)         \
\
/* Shared ring entry */                         \
union __name##_sring_entry {                        \
__req_t req;                            \
__rsp_t rsp;                            \
};                                  \
\
/* Shared ring page */                        \
struct __name##_sring {                         \
RING_IDX req_prod, req_event;                   \
RING_IDX rsp_prod, rsp_event;                   \
union {                           \
struct {                            \
uint8_t smartpoll_active;                   \
} netif;                            \
struct {                            \
uint8_t msg;                        \
} tapif_user;                           \
uint8_t pvt_pad;                     \
} private;                              \
uint8_t pad;                            \
union __name##_sring_entry ring; /* variable-length */       \
};                                  \
/* "Front" end's private variables */                   \
struct __name##_front_ring {                        \
RING_IDX req_prod_pvt;                      \
RING_IDX rsp_cons;                        \
unsigned int nr_ents;                     \
struct __name##_sring *sring;                   \
};                                  \
\
/* "Back" end's private variables */                  \
struct __name##_back_ring {                     \
RING_IDX rsp_prod_pvt;                      \
RING_IDX req_cons;                        \
unsigned int nr_ents;                     \
struct __name##_sring *sring;                   \
};对于ring的操作,一些常用的宏如下
  

#define RING_HAS_UNCONSUMED_RESPONSES(_r)               \
((_r)->sring->rsp_prod - (_r)->rsp_cons)
#define RING_HAS_UNCONSUMED_REQUESTS(_r)                \
({                                  \
unsigned int req = (_r)->sring->req_prod - (_r)->req_cons;\
unsigned int rsp = RING_SIZE(_r) -            \
((_r)->req_cons - (_r)->rsp_prod_pvt);   \
req < rsp ? req : rsp;                      \
})
#define RING_GET_REQUEST(_r, _idx)                  \
(&((_r)->sring->ring[((_idx) & (RING_SIZE(_r) - 1))].req))
#define RING_GET_RESPONSE(_r, _idx)               \
(&((_r)->sring->ring[((_idx) & (RING_SIZE(_r) - 1))].rsp))
#define RING_PUSH_REQUESTS(_r) do {               \
wmb(); /* back sees requests /before/ updated producer index */ \
(_r)->sring->req_prod = (_r)->req_prod_pvt;             \
} while (0)
#define RING_PUSH_RESPONSES(_r) do {                  \
wmb(); /* front sees responses /before/ updated producer index */   \
(_r)->sring->rsp_prod = (_r)->rsp_prod_pvt;             \
} while (0)
#define RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(_r, _notify) do {       \
RING_IDX __old = (_r)->sring->req_prod;             \
RING_IDX __new = (_r)->req_prod_pvt;                \
wmb(); /* back sees requests /before/ updated producer index */ \
(_r)->sring->req_prod = __new;                  \
mb(); /* back sees new requests /before/ we check req_event */\
(_notify) = ((RING_IDX)(__new - (_r)->sring->req_event) <       \
(RING_IDX)(__new - __old));                \
} while (0)
#define RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(_r, _notify) do {      \
RING_IDX __old = (_r)->sring->rsp_prod;             \
RING_IDX __new = (_r)->rsp_prod_pvt;                \
wmb(); /* front sees responses /before/ updated producer index */   \
(_r)->sring->rsp_prod = __new;                  \
mb(); /* front sees new responses /before/ we check rsp_event */    \
(_notify) = ((RING_IDX)(__new - (_r)->sring->rsp_event) <       \
(RING_IDX)(__new - __old));                \
} while (0)
#define RING_FINAL_CHECK_FOR_REQUESTS(_r, _work_to_do) do {   \
(_work_to_do) = RING_HAS_UNCONSUMED_REQUESTS(_r);         \
if (_work_to_do) break;                     \
(_r)->sring->req_event = (_r)->req_cons + 1;            \
mb();                               \
(_work_to_do) = RING_HAS_UNCONSUMED_REQUESTS(_r);         \
} while (0)
#define RING_FINAL_CHECK_FOR_RESPONSES(_r, _work_to_do) do {      \
(_work_to_do) = RING_HAS_UNCONSUMED_RESPONSES(_r);          \
if (_work_to_do) break;                     \
(_r)->sring->rsp_event = (_r)->rsp_cons + 1;            \
mb();                               \
(_work_to_do) = RING_HAS_UNCONSUMED_RESPONSES(_r);          \
} while (0)
  
  


  首先来看RX的报文接收,首先由netfront发起,在xennet_open和xennet_connect中通过xennet_alloc_rx_buffers给后端分配rx buffer。xennet_alloc_rx_buffers之前的文章有提过,这里摘抄如下:

static void xennet_alloc_rx_buffers(struct net_device *dev)
{
unsigned short id;
struct netfront_info *np = netdev_priv(dev);
struct sk_buff *skb;
struct page *page;
int i, batch_target, notify;
RING_IDX req_prod = np->rx.req_prod_pvt;
grant_ref_t ref;
unsigned long pfn;
void *vaddr;
struct xen_netif_rx_request *req;
if (unlikely(!netif_carrier_ok(dev)))
return;
/*
* Allocate skbuffs greedily, even though we batch updates to the
* receive ring. This creates a less bursty demand on the memory
* allocator, so should reduce the chance of failed allocation requests
* both for ourself and for other kernel subsystems.
*/
/*
分配若干单个frag页的skb,用于接收
这里生成一批skb并append到netfront_info->rx_batch的list中,如果遇到__netdev_alloc_skb失败或者alloc_page失败,调用mod_timer延迟100ms重试(这个100ms我觉得是一个坑,其次这里的重试其实是重新调用napi_schedule,继而调用xennet_poll尝试接收)
*/
batch_target = np->rx_target - (req_prod - np->rx.rsp_cons);
for (i = skb_queue_len(&np->rx_batch); i < batch_target; i++) {
skb = __netdev_alloc_skb(dev, RX_COPY_THRESHOLD + NET_IP_ALIGN,
GFP_ATOMIC | __GFP_NOWARN);
if (unlikely(!skb))
goto no_skb;
/* Align ip header to a 16 bytes boundary */
skb_reserve(skb, NET_IP_ALIGN);
page = alloc_page(GFP_ATOMIC | __GFP_NOWARN);
if (!page) {
kfree_skb(skb);
no_skb:
/* Any skbuffs queued for refill? Force them out. */
if (i != 0)
goto refill;
/* Could not allocate any skbuffs. Try again later. */
mod_timer(&np->rx_refill_timer,
jiffies + (HZ/10));
break;
}
skb_shinfo(skb)->frags.page = page;
skb_shinfo(skb)->nr_frags = 1;
__skb_queue_tail(&np->rx_batch, skb);
}
/* Is the batch large enough to be worthwhile? */
if (i < (np->rx_target/2)) {
if (req_prod > np->rx.sring->req_prod)
goto push;
return;
}
/* Adjust our fill target if we risked running out of buffers. */
if (((req_prod - np->rx.sring->rsp_prod) < (np->rx_target / 4)) &&
((np->rx_target *= 2) > np->rx_max_target))
np->rx_target = np->rx_max_target;
refill:
/*
这段代码对于netfront_info->rx_batch里的每个skb,计算对应的ring->req_prod值继而计算出该skb在netfront_info->rx_skbs数组中的ring index,把skb插入到rx_skbs数组的相应位置。之后调用gnttab_claim_grant_reference从grant_ref_t数组中取出一个没用的ref,把ref插入到netfront_info->grant_rx_ref数组的相应位置。调用gnttab_grant_foreign_access_ref让后端可以访问这个page。
*/
for (i = 0; ; i++) {
skb = __skb_dequeue(&np->rx_batch);
if (skb == NULL)
break;
skb->dev = dev;
id = xennet_rxidx(req_prod + i);
BUG_ON(np->rx_skbs);
np->rx_skbs = skb;
ref = gnttab_claim_grant_reference(&np->gref_rx_head);
BUG_ON((signed short)ref < 0);
np->grant_rx_ref = ref;
pfn = page_to_pfn(skb_shinfo(skb)->frags.page);
vaddr = page_address(skb_shinfo(skb)->frags.page);
req = RING_GET_REQUEST(&np->rx, req_prod + i);
gnttab_grant_foreign_access_ref(ref,
np->xbdev->otherend_id,
pfn_to_mfn(pfn),
0);
req->id = id;
req->gref = ref;
}
wmb();      /* barrier so backend seens requests */
/* Above is a suitable barrier to ensure backend will see requests. */
np->rx.req_prod_pvt = req_prod + i;
push:
/*
调用RING_PUSH_REQUESTS_AND_CHECK_NOTIFY检查netfront_info->xen_netif_rx_front_ring是否有新请求需要通知后端,
如果是则调用notify_remote_via_irq通过evtchn通知后端
*/
RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&np->rx, notify);
if (notify)
notify_remote_via_irq(np->netdev->irq);
}
  xennet_alloc_rx_buffers有自己的flow control算法,主要在于计算np->rx_target的值,和基于rx_target分配每次要增加到np->rx_batch中的skb资源,np->rx_batch中的skb都有一个特点,就是一个256长度的header和一个page大小的分片。np->rx_batch中的skb会后续存放到np->rx_skbs数组中,下标为req_prod的值,skb第一个分片的page会对应一个grant
ref,存放到np->grant_rx_ref数组中,下标和np->rx_skbs的一一对应。在调用gnttab_grant_foreign_access_ref之后,此时该page就可以授权给后端访问了。

总结下来,xennet_alloc_rx_buffers就是为了构造若干个xen_netif_rx_request,req id随着req_prod递增,req gref为skb第一个分片对应的page(需要前端通过gnttab_grant_foreign_access_ref进行授权)。前端做完所有这些事情之后,调用RING_PUSH_REQUESTS_AND_CHECK_NOTIFY通知后端  当后端收到包准备发送给前端时,首先通过xen_netbk_count_skb_slots计算这个skb需要的xen_netif_rx_request个数,并记录在vif->rx_req_cons_peek中,rx_req_cons_peek用来判断ring是不是满了
  后端收包主要在xen_netbk_rx_action中实现,每次xen_netbk_rx_action中都会分配一个struct netrx_pending_operations,使用netbk->grant_copy_op, netbk->meta来保存skb每一个分片对应的gnttab_copy,netbk_rx_meta信息。netbk_gop_skb用来构造与skb分片page个数对应的meta, copy数组项。gnttab_copy用的grant_ref_t就是从xen_netif_rx_request中获取的GR
  后端通过hypercall把skb分片的内容拷贝到GR指定的page,之后调用make_rx_response生成xen_netif_rx_response,调用netbk_add_frag_responsess为每个分片生成xen_netif_rx_responses,通过RING_PUSH_RESPONSES_AND_CHECK_NOTIFY通知前端
  总结下来,后端收包包含如下步骤:取出若干xen_netif_rx_request,设置gnttab_copy,把报文内容拷贝到rx request的相应page中,生成xen_netif_rx_response,最后通过RING_PUSH_RESPONSES_AND_CHECK_NOTIFY通知前端
  最后,前端通过xennet_poll来收包,会调用xennet_get_responses和xennet_fill_frags从ring中每一个xen_netif_rx_response生成skb,更新rsp_cons,最后通过handle_incoming_queue让协议栈接收这个包
  


  再来看TX的报文发送,核心函数是xennet_start_xmit
  

static int xennet_start_xmit(struct sk_buff *skb, struct net_device *dev)
{
unsigned short id;
struct netfront_info *np = netdev_priv(dev);
struct xen_netif_tx_request *tx;
struct xen_netif_extra_info *extra;
char *data = skb->data;
RING_IDX i;
grant_ref_t ref;
unsigned long mfn;
int notify;
int frags = skb_shinfo(skb)->nr_frags;
unsigned int offset = offset_in_page(data);
unsigned int len = skb_headlen(skb);
frags += DIV_ROUND_UP(offset + len, PAGE_SIZE);
if (unlikely(frags > MAX_SKB_FRAGS + 1)) {
printk(KERN_ALERT &quot;xennet: skb rides the rocket: %d frags\n&quot;,
frags);
dump_stack();
goto drop;
}
spin_lock_irq(&np->tx_lock);
if (unlikely(!netif_carrier_ok(dev) ||
(frags > 1 && !xennet_can_sg(dev)) ||
netif_needs_gso(dev, skb))) {
spin_unlock_irq(&np->tx_lock);
goto drop;
}
i = np->tx.req_prod_pvt;
/*
* tx_skb_freelist代表了tx_skbs中空闲entry的头部索引, skb_entry->link指向下一个空闲entry
* tx_skbs中空闲entry被用来存放待发送的skb头部及分片
*/
id = get_id_from_freelist(&np->tx_skb_freelist, np->tx_skbs);
np->tx_skbs.skb = skb;
/* 从req_prod_pvt中得到下一个xen_netif_tx_request,从tx_skb_freelist中得到下一个空闲的tx_skb项,把待发送的skb放到该entry中
* tx->id等于skb在tx_skbs中的下标
*/
tx = RING_GET_REQUEST(&np->tx, i);
tx->id   = id;
ref = gnttab_claim_grant_reference(&np->gref_tx_head);
BUG_ON((signed short)ref < 0);
mfn = virt_to_mfn(data);
gnttab_grant_foreign_access_ref(
ref, np->xbdev->otherend_id, mfn, GNTMAP_readonly);
tx->gref = np->grant_tx_ref = ref;
tx->offset = offset;
tx->size = len;
extra = NULL;
tx->flags = 0;
if (skb->ip_summed == CHECKSUM_PARTIAL)
/* local packet? */
tx->flags |= XEN_NETTXF_csum_blank | XEN_NETTXF_data_validated;
else if (skb->ip_summed == CHECKSUM_UNNECESSARY)
/* remote but checksummed. */
tx->flags |= XEN_NETTXF_data_validated;
if (skb_shinfo(skb)->gso_size) {
struct xen_netif_extra_info *gso;
gso = (struct xen_netif_extra_info *)
RING_GET_REQUEST(&np->tx, ++i);
if (extra)
extra->flags |= XEN_NETIF_EXTRA_FLAG_MORE;
else
tx->flags |= XEN_NETTXF_extra_info;
gso->u.gso.size = skb_shinfo(skb)->gso_size;
gso->u.gso.type = XEN_NETIF_GSO_TYPE_TCPV4;
gso->u.gso.pad = 0;
gso->u.gso.features = 0;
gso->type = XEN_NETIF_EXTRA_TYPE_GSO;
gso->flags = 0;
extra = gso;
}
np->tx.req_prod_pvt = i + 1;
/* xennet_make_frags首先解决skb头部跨多个page的问题,对于每一个跨的page,都会增加一个xen_netif_tx_request,过程和第一个skb头部相同
* 之后对skb的每一个分片,执行同样的操作,分为如下几步
* 1. tx->flags |= XEN_NETTXF_more_data,设置上一个xen_netif_tx_request,表示下面还有更多分片。首个skb头部不需要
* 2. get_id_from_freelist得到空闲entry的id,这个id同样也是xen_netif_tx_request的id
* 3. 通过req_prod_pvt取出下一个xen_netif_tx_request
* 4. 把skb头部的page或者skb分片的page授权给后端访问
* 5. 设置xen_netif_tx_request的gref, offset, size, flags
* 每一个tx_skbs的entry的skb指针都指向要发送的skb,可以认为每个分片(包括头部分片)都会增加一个skb的引用
*/
xennet_make_frags(skb, dev, tx);
tx->size = skb->len;
RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&np->tx, notify);
if (notify)
notify_remote_via_irq(np->netdev->irq);
dev->stats.tx_bytes += skb->len;
dev->stats.tx_packets++;
/* Note: It is not safe to access skb after xennet_tx_buf_gc()! */
/* xennet_tx_buf_gc用来回收rsp_cons到rsp_prod之间的xen_netif_tx_response
* xen_netif_tx_response的id,就是tx_skbs中对应的id,grant_tx_ref中对应的id
* 因此需要解除grant_tx_ref的授权,把id重新放回tx_skb_freelist数组
* skb头部和每一个分片都会释放一个skb引用计数,直到最后skb被释放
*/
xennet_tx_buf_gc(dev);
if (!netfront_tx_slot_available(np))
netif_stop_queue(dev);
spin_unlock_irq(&np->tx_lock);
return NETDEV_TX_OK;
drop:
dev->stats.tx_dropped++;
dev_kfree_skb(skb);
return NETDEV_TX_OK;
}

前端准备好xen_netif_tx_request之后,通过event channel通知到后端,后端的xenvif_interrupt会唤醒kthread线程来处理TX/RX包,后端处理TX报文的函数为static void xen_netbk_tx_action(struct xen_netbk *netbk)
{
unsigned nr_gops;
int ret;
nr_gops = xen_netbk_tx_build_gops(netbk);
if (nr_gops == 0)
return;
ret = HYPERVISOR_grant_table_op(GNTTABOP_copy,
netbk->tx_copy_ops, nr_gops);
BUG_ON(ret);
xen_netbk_tx_submit(netbk);
}

xen_netbk_tx_build_gops用来把前端的若干xen_netif_tx_request生成一个可以发送的skb,追加到xen_netbk->tx_queue中,并且准备好pending_tx_info和tx_copy_ops数组的对应entry,后续通过GNTTABOP_copy的hypercall完成从前端到后端页的拷贝。
  
  xen_netbk_tx_submit用来获得skb头部以及分片的pending_idx(xen_netif_tx_request的信息都存在xen_netbk的pending_tx_info中,通过pending_ring的pending_idx索引),对于skb头部,直接把数据拷贝过去,对于skb分片,调用get_page增加一个引用计数,在真正发送skb之前,把pending_tx_info中所有page通过netbk_idx_release释放给前端
  



版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: xen网络前后端交互