学习啦 > 学习电脑 > 操作系统 > Linux教程 > 数据包接收详解

数据包接收详解

时间: 春健736 分享

数据包接收详解

  学习啦小编为大家分享了Linux内核数据包处理流程-数据包接收的详细说明,有需要的可以参考下

  数据包接收

  一、从网卡说起

  这并非是一个网卡驱动分析的专门文档,只是对网卡处理数据包的流程进行一个重点的分析。这里以Intel的e100驱动为例进行分析。

  大多数网卡都是一个PCI设备,PCI设备都包含了一个标准的配置寄存器,寄存器中,包含了PCI设备的厂商ID、设备ID等等信息,驱动

  程序使用来描述这些寄存器的标识符。如下:

  CODE:

  struct pci_device_id {

  __u32 vendor, device; /* Vendor and device ID or PCI_ANY_ID*/

  __u32 subvendor, subdevice; /* Subsystem ID's or PCI_ANY_ID */

  __u32 class, class_mask; /* (class,subclass,prog-if) triplet */

  kernel_ulong_t driver_data; /* Data private to the driver */

  };

  这样,在驱动程序中,常常就可以看到定义一个struct pci_device_id 类型的数组,告诉内核支持不同类型的

  PCI设备的列表,以e100驱动为例:

  #define INTEL_8255X_ETHERNET_DEVICE(device_id, ich) {\

  PCI_VENDOR_ID_INTEL, device_id, PCI_ANY_ID, PCI_ANY_ID, \

  PCI_CLASS_NETWORK_ETHERNET << 8, 0xFFFF00, ich }

  static struct pci_device_id e100_id_table[] = {

  INTEL_8255X_ETHERNET_DEVICE(0x1029, 0),

  INTEL_8255X_ETHERNET_DEVICE(0x1030, 0),

  INTEL_8255X_ETHERNET_DEVICE(0x1031, 3),

  ……/*略过一大堆支持的设备*/

  { 0, }

  };

  在内核中,一个PCI设备,使用struct pci_driver结构来描述,

  struct pci_driver {

  struct list_head node;

  char *name;

  struct module *owner;

  const struct pci_device_id *id_table; /* must be non-NULL for probe to be called */

  int (*probe) (struct pci_dev *dev, const struct pci_device_id *id); /* New device inserted */

  void (*remove) (struct pci_dev *dev); /* Device removed (NULL if not a hot-plug capable driver) */

  int (*suspend) (struct pci_dev *dev, pm_message_t state); /* Device suspended */

  int (*resume) (struct pci_dev *dev); /* Device woken up */

  int (*enable_wake) (struct pci_dev *dev, pci_power_t state, int enable); /* Enable wake event */

  void (*shutdown) (struct pci_dev *dev);

  struct device_driver driver;

  struct pci_dynids dynids;

  };

  因为在系统引导的时候,PCI设备已经被识别,当内核发现一个已经检测到的设备同驱动注册的id_table中的信息相匹配时,

  它就会触发驱动的probe函数,以e100为例:

  /*

  * 定义一个名为e100_driver的PCI设备

  * 1、设备的探测函数为e100_probe;

  * 2、设备的id_table表为e100_id_table

  */

  static struct pci_driver e100_driver = {

  .name = DRV_NAME,

  .id_table = e100_id_table,

  .probe = e100_probe,

  .remove = __devexit_p(e100_remove),

  #ifdef CONFIG_PM

  .suspend = e100_suspend,

  .resume = e100_resume,

  #endif

  .driver = {

  .shutdown = e100_shutdown,

  }

  };

  这样,如果系统检测到有与id_table中对应的设备时,就调用驱动的probe函数。

  驱动设备在init函数中,调用pci_module_init函数初始化PCI设备e100_driver:

  static int __init e100_init_module(void)

  {

  if(((1 << debug) - 1) & NETIF_MSG_DRV) {

  printk(KERN_INFO PFX "%s, %s\n", DRV_DESCRIPTION, DRV_VERSION);

  printk(KERN_INFO PFX "%s\n", DRV_COPYRIGHT);

  }

  return pci_module_init(&e100_driver);

  }

  一切顺利的话,注册的e100_probe函数将被内核调用,这个函数完成两个重要的工作:

  1、分配/初始化/注册网络设备;

  2、完成PCI设备的I/O区域的分配和映射,以及完成硬件的其它初始化工作;

  网络设备使用struct net_device结构来描述,这个结构非常之大,许多重要的参考书籍对它都有较为深入的描述,可以参考《Linux设备驱动程序》中网卡驱动设计的相关章节。我会在后面的内容中,对其重要的成员进行注释;

  当probe函数被调用,证明已经发现了我们所支持的网卡,这样,就可以调用register_netdev函数向内核注册网络设备了,注册之前,一般会调用alloc_etherdev为以太网分析一个net_device,然后初始化它的重要成员。

  除了向内核注册网络设备之外,探测函数另一项重要的工作就是需要对硬件进行初始化,比如,要访问其I/O区域,需要为I/O区域分配内存区域,然后进行映射,这一步一般的流程是:

  1、request_mem_region()

  2、ioremap()

  对于一般的PCI设备而言,可以调用:

  1、pci_request_regions()

  2、ioremap()

  pci_request_regions函数对PCI的6个寄存器都会调用资源分配函数进行申请(需要判断是I/O端口还是I/O内存),例如:

  CODE:

  int pci_request_regions(struct pci_dev *pdev, char *res_name)

  {

  int i;

  for (i = 0; i < 6; i++)

  if(pci_request_region(pdev, i, res_name))

  goto err_out;

  return 0;

  CODE:

  int pci_request_region(struct pci_dev *pdev, int bar, char *res_name)

  {

  if (pci_resource_len(pdev, bar) == 0)

  return 0;

  if (pci_resource_flags(pdev, bar) & IORESOURCE_IO) {

  if (!request_region(pci_resource_start(pdev, bar),

  pci_resource_len(pdev, bar), res_name))

  goto err_out;

  }

  else if (pci_resource_flags(pdev, bar) & IORESOURCE_MEM) {

  if (!request_mem_region(pci_resource_start(pdev, bar),

  pci_resource_len(pdev, bar), res_name))

  goto err_out;

  }

  return 0;

  有了这些基础,我们来看设备的探测函数:

  static int __devinit e100_probe(struct pci_dev *pdev,

  const struct pci_device_id *ent)

  {

  struct net_device *netdev;

  struct nic *nic;

  int err;

  /*分配网络设备*/

  if(!(netdev = alloc_etherdev(sizeof(struct nic)))) {

  if(((1 << debug) - 1) & NETIF_MSG_PROBE)

  printk(KERN_ERR PFX "Etherdev alloc failed, abort.\n");

  return -ENOMEM;

  }

  /*设置各成员指针函数*/

  netdev->open = e100_open;

  netdev->stop = e100_close;

  netdev->hard_start_xmit = e100_xmit_frame;

  netdev->get_stats = e100_get_stats;

  netdev->set_multicast_list = e100_set_multicast_list;

  netdev->set_mac_address = e100_set_mac_address;

  netdev->change_mtu = e100_change_mtu;

  netdev->do_ioctl = e100_do_ioctl;

  SET_ETHTOOL_OPS(netdev, &e100_ethtool_ops);

  netdev->tx_timeout = e100_tx_timeout;

  netdev->watchdog_timeo = E100_WATCHDOG_PERIOD;

  netdev->poll = e100_poll;

  netdev->weight = E100_NAPI_WEIGHT;

  #ifdef CONFIG_NET_POLL_CONTROLLER

  netdev->poll_controller = e100_netpoll;

  #endif

  /*设置网络设备名称*/

  strcpy(netdev->name, pci_name(pdev));

  /*取得设备私有数据结构*/

  nic = netdev_priv(netdev);

  /*网络设备指针,指向自己*/

  nic->netdev = netdev;

  /*PCIy设备指针,指向自己*/

  nic->pdev = pdev;

  nic->msg_enable = (1 << debug) - 1;

  /*将PCI设备的私有数据区指向网络设备*/

  pci_set_drvdata(pdev, netdev);

  /*激活PCI设备*/

  if((err = pci_enable_device(pdev))) {

  DPRINTK(PROBE, ERR, "Cannot enable PCI device, aborting.\n");

  goto err_out_free_dev;

  }

  /*判断I/O区域是否是I/O内存,如果不是,则报错退出*/

  if(!(pci_resource_flags(pdev, 0) & IORESOURCE_MEM)) {

  DPRINTK(PROBE, ERR, "Cannot find proper PCI device "

  "base address, aborting.\n");

  err = -ENODEV;

  goto err_out_disable_pdev;

  }

  /*分配I/O内存区域*/

  if((err = pci_request_regions(pdev, DRV_NAME))) {

  DPRINTK(PROBE, ERR, "Cannot obtain PCI resources, aborting.\n");

  goto err_out_disable_pdev;

  }

  /*

  * 告之内核自己的DMA寻址能力,这里不是很明白,因为从0xFFFFFFFF来看,本来就是内核默认的32了

  * 为什么还要调用pci_set_dma_mask来重复设置呢?可能是对ULL而非UL不是很了解吧。

  */

  if((err = pci_set_dma_mask(pdev, 0xFFFFFFFFULL))) {

  DPRINTK(PROBE, ERR, "No usable DMA configuration, aborting.\n");

  goto err_out_free_res;

  }

  SET_MODULE_OWNER(netdev);

  SET_NETDEV_DEV(netdev, &pdev->dev);

  /*分配完成后,映射I/O内存*/

  nic->csr = ioremap(pci_resource_start(pdev, 0), sizeof(struct csr));

  if(!nic->csr) {

  DPRINTK(PROBE, ERR, "Cannot map device registers, aborting.\n");

  err = -ENOMEM;

  goto err_out_free_res;

  }

  if(ent->driver_data)

  nic->flags |= ich;

  else

  nic->flags &= ~ich;

  /*设置设备私有数据结构的大部份默认参数*/

  e100_get_defaults(nic);

  /* 初始化自旋锁,锅的初始化必须在调用 hw_reset 之前执行*/

  spin_lock_init(&nic->cb_lock);

  spin_lock_init(&nic->cmd_lock);

  /* 硬件复位,通过向指定I/O端口设置复位指令实现. */

  e100_hw_reset(nic);

  /*

  * PCI网卡被BIOS配置后,某些特性可能会被屏蔽掉。比如,多数BIOS都会清掉“master”位,

  * 这导致板卡不能随意向主存中拷贝数据。pci_set_master函数数会检查是否需要设置标志位,

  * 如果需要,则会将“master”位置位。

  * PS:什么是PCI master?

  * 不同于ISA总线,PCI总线的地址总线与数据总线是分时复用的。这样做的好处是,一方面

  * 可以节省接插件的管脚数,另一方面便于实现突发数据传输。在做数据传输时,由一个PCI

  * 设备做发起者(主控,Initiator或Master),而另一个PCI设备做目标(从设备,Target或Slave)。

  * 总线上的所有时序的产生与控制,都由Master来发起。PCI总线在同一时刻只能供一对设备完成传输。

  */

  pci_set_master(pdev);

  /*添加两个内核定时器,watchdog和blink_timer*/

  init_timer(&nic->watchdog);

  nic->watchdog.function = e100_watchdog;

  nic->watchdog.data = (unsigned long)nic;

  init_timer(&nic->blink_timer);

  nic->blink_timer.function = e100_blink_led;

  nic->blink_timer.data = (unsigned long)nic;

  INIT_WORK(&nic->tx_timeout_task,

  (void (*)(void *))e100_tx_timeout_task, netdev);

  if((err = e100_alloc(nic))) {

  DPRINTK(PROBE, ERR, "Cannot alloc driver memory, aborting.\n");

  goto err_out_iounmap;

  }

  /*phy寄存器初始化*/

  e100_phy_init(nic);

  if((err = e100_eeprom_load(nic)))

  goto err_out_free;

  memcpy(netdev->dev_addr, nic->eeprom, ETH_ALEN);

  if(!is_valid_ether_addr(netdev->dev_addr)) {

  DPRINTK(PROBE, ERR, "Invalid MAC address from "

  "EEPROM, aborting.\n");

  err = -EAGAIN;

  goto err_out_free;

  }

  /* Wol magic packet can be enabled from eeprom */

  if((nic->mac >= mac_82558_D101_A4) &&

  (nic->eeprom[eeprom_id] & eeprom_id_wol))

  nic->flags |= wol_magic;

  /* ack any pending wake events, disable PME */

  pci_enable_wake(pdev, 0, 0);

  /*注册网络设备*/

  strcpy(netdev->name, "eth%d");

  if((err = register_netdev(netdev))) {

  DPRINTK(PROBE, ERR, "Cannot register net device, aborting.\n");

  goto err_out_free;

  }

  DPRINTK(PROBE, INFO, "addr 0x%lx, irq %d, "

  "MAC addr %02X:%02X:%02X:%02X:%02X:%02X\n",

  pci_resource_start(pdev, 0), pdev->irq,

  netdev->dev_addr[0], netdev->dev_addr[1], netdev->dev_addr[2],

  netdev->dev_addr[3], netdev->dev_addr[4], netdev->dev_addr[5]);

  return 0;

  err_out_free:

  e100_free(nic);

  err_out_iounmap:

  iounmap(nic->csr);

  err_out_free_res:

  pci_release_regions(pdev);

  err_out_disable_pdev:

  pci_disable_device(pdev);

  err_out_free_dev:

  pci_set_drvdata(pdev, NULL);

  free_netdev(netdev);

  return err;

  }

  执行到这里,探测函数的使命就完成了,在对网络设备重要成员初始化时,有:

  netdev->open = e100_open;

  指定了设备的open函数为e100_open,这样,当第一次使用设备,比如使用ifconfig工具的时候,open函数将被调用。

  二、打开设备

  在探测函数中,设置了netdev->open = e100_open; 指定了设备的open函数为e100_open:

  CODE:

  static int e100_open(struct net_device *netdev)

  {

  struct nic *nic = netdev_priv(netdev);

  int err = 0;

  netif_carrier_off(netdev);

  if((err = e100_up(nic)))

  DPRINTK(IFUP, ERR, "Cannot open interface, aborting.\n");

  return err;

  }

  大多数涉及物理设备可以感知信号载波(carrier)的存在,载波的存在意味着设备可以工作

  据个例子来讲:当一个用户拔掉了网线,也就意味着信号载波的消失。

  netif_carrier_off:关闭载波信号;

  netif_carrier_on:打开载波信号;

  netif_carrier_ok:检测载波信号;

  对于探测网卡网线是否连接,这一组函数被使用得较多;

  接着,调用e100_up函数启动网卡,这个“启动”的过程,最重要的步骤有:

  1、调用request_irq向内核注册中断;

  2、调用netif_wake_queue函数来重新启动传输队例;

  CODE:

  static int e100_up(struct nic *nic)

  {

  int err;

  if((err = e100_rx_alloc_list(nic)))

  return err;

  if((err = e100_alloc_cbs(nic)))

  goto err_rx_clean_list;

  if((err = e100_hw_init(nic)))

  goto err_clean_cbs;

  e100_set_multicast_list(nic->netdev);

  e100_start_receiver(nic, 0);

  mod_timer(&nic->watchdog, jiffies);

  if((err = request_irq(nic->pdev->irq, e100_intr, SA_SHIRQ,

  nic->netdev->name, nic->netdev)))

  goto err_no_irq;

  netif_wake_queue(nic->netdev);

  netif_poll_enable(nic->netdev);

  /* enable ints _after_ enabling poll, preventing a race between

  * disable ints+schedule */

  e100_enable_irq(nic);

  return 0;

  err_no_irq:

  del_timer_sync(&nic->watchdog);

  err_clean_cbs:

  e100_clean_cbs(nic);

  err_rx_clean_list:

  e100_rx_clean_list(nic);

  return err;

  }

  这样,中断函数e100_intr将被调用;

  三、网卡中断

  从本质上来讲,中断,是一种电信号,当设备有某种事件发生的时候,它就会产生中断,通过总线把电信号发送给中断控制器,如果中断的线是激活的,中断控制器就把电信号发送给处理器的某个特定引脚。处理器于是立即停止自己正在做的事,跳到内存中内核设置的中断处理程序的入口点,进行中断处理。

  在内核中断处理中,会检测中断与我们刚才注册的中断号匹配,于是,注册的中断处理函数就被调用了。

  当需要发/收数据,出现错误,连接状态变化等,网卡的中断信号会被触发。当接收到中断后,中断函数读取中断状态位,进行合法性判断,如判断中断信号是否是自己的等,然后,应答设备中断——OK,我已经知道了,你回去继续工作吧……

  接着,它就屏蔽此中断,然后netif_rx_schedule函数接收,接收函数 会在未来某一时刻调用设备的poll函数(对这里而言,注册的是e100_poll)实现设备的轮询:

  CODE:

  static irqreturn_t e100_intr(int irq, void *dev_id, struct pt_regs *regs)

  {

  struct net_device *netdev = dev_id;

  struct nic *nic = netdev_priv(netdev);

  u8 stat_ack = readb(&nic->csr->scb.stat_ack);

  DPRINTK(INTR, DEBUG, "stat_ack = 0x%02X\n", stat_ack);

  if(stat_ack == stat_ack_not_ours || /* Not our interrupt */

  stat_ack == stat_ack_not_present) /* Hardware is ejected */

  return IRQ_NONE;

  /* Ack interrupt(s) */

  writeb(stat_ack, &nic->csr->scb.stat_ack);

  /* We hit Receive No Resource (RNR); restart RU after cleaning */

  if(stat_ack & stat_ack_rnr)

  nic->ru_running = RU_SUSPENDED;

  e100_disable_irq(nic);

  netif_rx_schedule(netdev);

  return IRQ_HANDLED;

  }

  对于数据包的接收而言,我们关注的是poll函数中,调用e100_rx_clean进行数据的接收:

  CODE:

  static int e100_poll(struct net_device *netdev, int *budget)

  {

  struct nic *nic = netdev_priv(netdev);

  /*

  * netdev->quota是当前CPU能够从所有接口中接收数据包的最大数目,budget是在

  * 初始化阶段分配给接口的weight值,轮询函数必须接受二者之间的最小值。表示

  * 轮询函数本次要处理的数据包个数。

  */

  unsigned int work_to_do = min(netdev->quota, *budget);

  unsigned int work_done = 0;

  int tx_cleaned;

  /*进行数据包的接收和传输*/

  e100_rx_clean(nic, &work_done, work_to_do);

  tx_cleaned = e100_tx_clean(nic);

  /*接收和传输完成后,就退出poll模块,重启中断*/

  /* If no Rx and Tx cleanup work was done, exit polling mode. */

  if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) {

  netif_rx_complete(netdev);

  e100_enable_irq(nic);

  return 0;

  }

  *budget -= work_done;

  netdev->quota -= work_done;

  return 1;

  }

  static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done,

  unsigned int work_to_do)

  {

  struct rx *rx;

  int restart_required = 0;

  struct rx *rx_to_start = NULL;

  /* are we already rnr? then pay attention!!! this ensures that

  * the state machine progression never allows a start with a

  * partially cleaned list, avoiding a race between hardware

  * and rx_to_clean when in NAPI mode */

  if(RU_SUSPENDED == nic->ru_running)

  restart_required = 1;

  /* Indicate newly arrived packets */

  for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) {

  int err = e100_rx_indicate(nic, rx, work_done, work_to_do);

  if(-EAGAIN == err) {

  /* hit quota so have more work to do, restart once

  * cleanup is complete */

  restart_required = 0;

  break;

  } else if(-ENODATA == err)

  break; /* No more to clean */

  }

  /* save our starting point as the place we'll restart the receiver */

  if(restart_required)

  rx_to_start = nic->rx_to_clean;

  /* Alloc new skbs to refill list */

  for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) {

  if(unlikely(e100_rx_alloc_skb(nic, rx)))

  break; /* Better luck next time (see watchdog) */

  }

  if(restart_required) {

  // ack the rnr?

  writeb(stat_ack_rnr, &nic->csr->scb.stat_ack);

  e100_start_receiver(nic, rx_to_start);

  if(work_done)

  (*work_done)++;

  }

  }

  四、网卡的数据接收

  内核如何从网卡接受数据,传统的经典过程:

  1、数据到达网卡;

  2、网卡产生一个中断给内核;

  3、内核使用I/O指令,从网卡I/O区域中去读取数据;

  我们在许多网卡驱动中,都可以在网卡的中断函数中见到这一过程。

  但是,这一种方法,有一种重要的问题,就是大流量的数据来到,网卡会产生大量的中断,内核在中断上下文中,会浪费大量的资源来处理中断本身。所以,一个问题是,“可不可以不使用中断”,这就是轮询技术,所谓NAPI技术,说来也不神秘,就是说,内核屏蔽中断,然后隔一会儿就去问网卡,“你有没有数据啊?”……

  从这个描述本身可以看到,哪果数据量少,轮询同样占用大量的不必要的CPU资源,大家各有所长吧,呵呵……

  OK,另一个问题,就是从网卡的I/O区域,包括I/O寄存器或I/O内存中去读取数据,这都要CPU去读,也要占用CPU资源,“CPU从I/O区域读,然后把它放到内存(这个内存指的是系统本身的物理内存,跟外设的内存不相干,也叫主内存)中”。于是自然地,就想到了DMA技术——让网卡直接从主内存之间读写它们的I/O数据,CPU,这儿不干你事,自己找乐子去:

  1、首先,内核在主内存中为收发数据建立一个环形的缓冲队列(通常叫DMA环形缓冲区)。

  2、内核将这个缓冲区通过DMA映射,把这个队列交给网卡;

  3、网卡收到数据,就直接放进这个环形缓冲区了——也就是直接放进主内存了;然后,向系统产生一个中断;

  4、内核收到这个中断,就取消DMA映射,这样,内核就直接从主内存中读取数据;

  ——呵呵,这一个过程比传统的过程少了不少工作,因为设备直接把数据放进了主内存,不需要CPU的干预,效率是不是提高不少?

  对应以上4步,来看它的具体实现:

  1、分配环形DMA缓冲区

  Linux内核中,用skb来描述一个缓存,所谓分配,就是建立一定数量的skb,然后把它们组织成一个双向链表;

  2、建立DMA映射

  内核通过调用

  dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction)

  建立映射关系。

  struct device *dev,描述一个设备;

  buffer:把哪个地址映射给设备;也就是某一个skb——要映射全部,当然是做一个双向链表的循环即可;

  size:缓存大小;

  direction:映射方向——谁传给谁:一般来说,是“双向”映射,数据在设备和内存之间双向流动;

  对于PCI设备而言(网卡一般是PCI的),通过另一个包裹函数pci_map_single,这样,就把buffer交给设备了!设备可以直接从里边读/取数据。

  3、这一步由硬件完成;

  4、取消映射

  dma_unmap_single,对PCI而言,大多调用它的包裹函数pci_unmap_single,不取消的话,缓存控制权还在设备手里,要调用它,把主动权掌握在CPU手里——因为我们已经接收到数据了,应该由CPU把数据交给上层网络栈;

  当然,不取消之前,通常要读一些状态位信息,诸如此类,一般是调用

  dma_sync_single_for_cpu()

  让CPU在取消映射前,就可以访问DMA缓冲区中的内容。

  关于DMA映射的更多内容,可以参考《Linux设备驱动程序》“内存映射和DMA”章节相关内容!

  OK,有了这些知识,我们就可以来看e100的代码了,它跟上面讲的步骤基本上一样的——绕了这么多圈子,就是想绕到e100上面了,呵呵!

  在e100_open函数中,调用e100_up,我们前面分析它时,略过了一个重要的东东,就是环形缓冲区的建立,这一步,是通过

  e100_rx_alloc_list函数调用完成的:

  CODE:

  static int e100_rx_alloc_list(struct nic *nic)

  {

  struct rx *rx;

  unsigned int i, count = nic->params.rfds.count;

  nic->rx_to_use = nic->rx_to_clean = NULL;

  nic->ru_running = RU_UNINITIALIZED;

  /*结构struct rx用来描述一个缓冲区节点,这里分配了count个*/

  if(!(nic->rxs = kmalloc(sizeof(struct rx) * count, GFP_ATOMIC)))

  return -ENOMEM;

  memset(nic->rxs, 0, sizeof(struct rx) * count);

  /*虽然是连续分配的,不过还是遍历它,建立双向链表,然后为每一个rx的skb指针分员分配空间

  skb用来描述内核中的一个数据包,呵呵,说到重点了*/

  for(rx = nic->rxs, i = 0; i < count; rx++, i++) {

  rx->next = (i + 1 < count) ? rx + 1 : nic->rxs;

  rx->prev = (i == 0) ? nic->rxs + count - 1 : rx - 1;

  if(e100_rx_alloc_skb(nic, rx)) { /*分配缓存*/

  e100_rx_clean_list(nic);

  return -ENOMEM;

  }

  }

  nic->rx_to_use = nic->rx_to_clean = nic->rxs;

  nic->ru_running = RU_SUSPENDED;

  return 0;

  }

  CODE:

  #define RFD_BUF_LEN (sizeof(struct rfd) + VLAN_ETH_FRAME_LEN)

  static inline int e100_rx_alloc_skb(struct nic *nic, struct rx *rx)

  {

  /*skb缓存的分配,是通过调用系统函数dev_alloc_skb来完成的,它同内核栈中通常调用alloc_skb的区别在于,

  它是原子的,所以,通常在中断上下文中使用*/

  if(!(rx->skb = dev_alloc_skb(RFD_BUF_LEN + NET_IP_ALIGN)))

  return -ENOMEM;

  /*初始化必要的成员 */

  rx->skb->dev = nic->netdev;

  skb_reserve(rx->skb, NET_IP_ALIGN);

  /*这里在数据区之前,留了一块sizeof(struct rfd) 这么大的空间,该结构的

  一个重要作用,用来保存一些状态信息,比如,在接收数据之前,可以先通过

  它,来判断是否真有数据到达等,诸如此类*/

  memcpy(rx->skb->data, &nic->blank_rfd, sizeof(struct rfd));

  /*这是最关键的一步,建立DMA映射,把每一个缓冲区rx->skb->data都映射给了设备,缓存区节点

  rx利用dma_addr保存了每一次映射的地址,这个地址后面会被用到*/

  rx->dma_addr = pci_map_single(nic->pdev, rx->skb->data,

  RFD_BUF_LEN, PCI_DMA_BIDIRECTIONAL);

  if(pci_dma_mapping_error(rx->dma_addr)) {

  dev_kfree_skb_any(rx->skb);

  rx->skb = 0;

  rx->dma_addr = 0;

  return -ENOMEM;

  }

  /* Link the RFD to end of RFA by linking previous RFD to

  * this one, and clearing EL bit of previous. */

  if(rx->prev->skb) {

  struct rfd *prev_rfd = (struct rfd *)rx->prev->skb->data;

  /*put_unaligned(val,ptr);用到把var放到ptr指针的地方,它能处理处理内存对齐的问题

  prev_rfd是在缓冲区开始处保存的一点空间,它的link成员,也保存了映射后的地址*/

  put_unaligned(cpu_to_le32(rx->dma_addr),

  (u32 *)&prev_rfd->link);

  wmb();

  prev_rfd->command &= ~cpu_to_le16(cb_el);

  pci_dma_sync_single_for_device(nic->pdev, rx->prev->dma_addr,

  sizeof(struct rfd), PCI_DMA_TODEVICE);

  }

  return 0;

  }

  e100_rx_alloc_list函数在一个循环中,建立了环形缓冲区,并调用e100_rx_alloc_skb为每个缓冲区分配了空间,并做了

  DMA映射。这样,我们就可以来看接收数据的过程了。

  前面我们讲过,中断函数中,调用netif_rx_schedule,表明使用轮询技术,系统会在未来某一时刻,调用设备的poll函数:

  CODE:

  static int e100_poll(struct net_device *netdev, int *budget)

  {

  struct nic *nic = netdev_priv(netdev);

  unsigned int work_to_do = min(netdev->quota, *budget);

  unsigned int work_done = 0;

  int tx_cleaned;

  e100_rx_clean(nic, &work_done, work_to_do);

  tx_cleaned = e100_tx_clean(nic);

  /* If no Rx and Tx cleanup work was done, exit polling mode. */

  if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) {

  netif_rx_complete(netdev);

  e100_enable_irq(nic);

  return 0;

  }

  *budget -= work_done;

  netdev->quota -= work_done;

  return 1;

  }

  目前,我们只关心rx,所以,e100_rx_clean函数就成了我们关注的对像,它用来从缓冲队列中接收全部数据(这或许是取名为clean的原因吧!):

  CODE:

  static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done,

  unsigned int work_to_do)

  {

  struct rx *rx;

  int restart_required = 0;

  struct rx *rx_to_start = NULL;

  /* are we already rnr? then pay attention!!! this ensures that

  * the state machine progression never allows a start with a

  * partially cleaned list, avoiding a race between hardware

  * and rx_to_clean when in NAPI mode */

  if(RU_SUSPENDED == nic->ru_running)

  restart_required = 1;

  /* 函数最重要的工作,就是遍历环形缓冲区,接收数据*/

  for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) {

  int err = e100_rx_indicate(nic, rx, work_done, work_to_do);

  if(-EAGAIN == err) {

  /* hit quota so have more work to do, restart once

  * cleanup is complete */

  restart_required = 0;

  break;

  } else if(-ENODATA == err)

  break; /* No more to clean */

  }

  /* save our starting point as the place we'll restart the receiver */

  if(restart_required)

  rx_to_start = nic->rx_to_clean;

  /* Alloc new skbs to refill list */

  for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) {

  if(unlikely(e100_rx_alloc_skb(nic, rx)))

  break; /* Better luck next time (see watchdog) */

  }

  if(restart_required) {

  // ack the rnr?

  writeb(stat_ack_rnr, &nic->csr->scb.stat_ack);

  e100_start_receiver(nic, rx_to_start);

  if(work_done)

  (*work_done)++;

  }

  }

  CODE:

  static inline int e100_rx_indicate(struct nic *nic, struct rx *rx,

  unsigned int *work_done, unsigned int work_to_do)

  {

  struct sk_buff *skb = rx->skb;

  struct rfd *rfd = (struct rfd *)skb->data;

  u16 rfd_status, actual_size;

  if(unlikely(work_done && *work_done >= work_to_do))

  return -EAGAIN;

  /* 读取数据之前,也就是取消DMA映射之前,需要先读取cb_complete 状态位,

  以确定数据是否真的准备好了,并且,rfd的actual_size中,也包含了真实的数据大小

  pci_dma_sync_single_for_cpu函数前面已经介绍过,它让CPU在取消DMA映射之前,具备

  访问DMA缓存的能力*/

  pci_dma_sync_single_for_cpu(nic->pdev, rx->dma_addr,

  sizeof(struct rfd), PCI_DMA_FROMDEVICE);

  rfd_status = le16_to_cpu(rfd->status);

  DPRINTK(RX_STATUS, DEBUG, "status=0x%04X\n", rfd_status);

  /* If data isn't ready, nothing to indicate */

  if(unlikely(!(rfd_status & cb_complete)))

  return -ENODATA;

  /* Get actual data size */

  actual_size = le16_to_cpu(rfd->actual_size) & 0x3FFF;

  if(unlikely(actual_size > RFD_BUF_LEN - sizeof(struct rfd)))

  actual_size = RFD_BUF_LEN - sizeof(struct rfd);

  /* 取消映射,因为通过DMA,网卡已经把数据放在了主内存中,这里一取消,也就意味着,

  CPU可以处理主内存中的数据了 */

  pci_unmap_single(nic->pdev, rx->dma_addr,

  RFD_BUF_LEN, PCI_DMA_FROMDEVICE);

  /* this allows for a fast restart without re-enabling interrupts */

  if(le16_to_cpu(rfd->command) & cb_el)

  nic->ru_running = RU_SUSPENDED;

  /*正确地设置data指针,因为最前面有一个sizeof(struct rfd)大小区域,跳过它*/

  skb_reserve(skb, sizeof(struct rfd));

  /*更新skb的tail和len指针,也是就更新接收到这么多数据的长度*/

  skb_put(skb, actual_size);

  /*设置协议位*/

  skb->protocol = eth_type_trans(skb, nic->netdev);

  if(unlikely(!(rfd_status & cb_ok))) {

  /* Don't indicate if hardware indicates errors */

  nic->net_stats.rx_dropped++;

  dev_kfree_skb_any(skb);

  } else if(actual_size > nic->netdev->mtu + VLAN_ETH_HLEN) {

  /* Don't indicate oversized frames */

  nic->rx_over_length_errors++;

  nic->net_stats.rx_dropped++;

  dev_kfree_skb_any(skb);

  } else {

  /*网卡驱动要做的最后一步,就是统计接收计数器,设置接收时间戳,然后调用netif_receive_skb,

  把数据包交给上层协议栈,自己的光荣始命也就完成了*/

  nic->net_stats.rx_packets++;

  nic->net_stats.rx_bytes += actual_size;

  nic->netdev->last_rx = jiffies;

  netif_receive_skb(skb);

  if(work_done)

  (*work_done)++;

  }

  rx->skb = NULL;

  return 0;

  }

  网卡驱动执行到这里,数据接收的工作,也就处理完成了。但是,使用这一种方法的驱动,省去了网络栈中一个重要的内容,就是

  “队列层”,让我们来看看,传统中断接收数据包模式下,使用netif_rx函数调用,又会发生什么。

  PS:九贱没有去研究过所谓的“零拷贝”技术,不太清楚,它同这种DMA直取方式有何不同?难道是把网卡中的I/O内存直接映射到主内存中,这样CPU就可以像读取主内存一样,读取网卡的内存,但是这要求设备要有好大的I/O内存来做缓冲呀!!^o^,外行了……希望哪位DX提点!

  五、队列层

  1、软中断与下半部

  当用中断处理的时候,为了减少中断处理的工作量,比如,一般中断处理时,需要屏蔽其它中断,如果中断处理时间过长,那么其它中断

  有可能得不到及时处理,也以,有一种机制,就是把“不必马上处理”的工作,推迟一点,让它在中断处理后的某一个时刻得到处理。这就

  是下半部。

  下半部只是一个机制,它在Linux中,有多种实现方式,其中一种对时间要求最严格的实现方式,叫“软中断”,可以使用:

  open_softirq()

  来向内核注册一个软中断,

  然后,在合适的时候,调用

  raise_softirq_irqoff()

  触发它。

  如果采用中断方式接收数据(这一节就是在说中断方式接收,后面,就不用这种假设了),同样也需要软中断,可以调用

  open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

  向内核注册一个名为NET_RX_SOFTIR的软中断,net_rx_action是软中断的处理函数。

  然后,在驱动中断处理完后的某一个时刻,调用

  raise_softirq_irqoff(NET_RX_SOFTIRQ);

  触发它,这样net_rx_action将得到执行。

  2、队列层

  什么是队列层?通常,在网卡收发数据的时候,需要维护一个缓冲区队列,来缓存可能存在的突发数据,类似于前面的DMA环形缓冲区。

  队列层中,包含了一个叫做struct softnet_data:

  CODE:

  struct softnet_data

  {

  /*throttle 用于拥塞控制,当拥塞发生时,throttle将被设置,后续进入的数据包将被丢弃*/

  int throttle;

  /*netif_rx函数返回的拥塞级别*/

  int cng_level;

  int avg_blog;

  /*softnet_data 结构包含一个指向接收和传输队列的指针,input_pkt_queue成员指向准备传送

  给网络层的sk_buffs包链表的首部的指针,这个队列中的包是由netif_rx函数递交的*/

  struct sk_buff_head input_pkt_queue;

  struct list_head poll_list;

  struct net_device *output_queue;

  struct sk_buff *completion_queue;

  struct net_device backlog_dev; /* Sorry. 8) */

  };

  内核使用了一个同名的变量softnet_data,它是一个Per-CPU变量,每个CPU都有一个。

  net/core/dev.c

  CODE:

  DECLARE_PER_CPU(struct softnet_data,softnet_data);

  CODE:

  /*

  * 网络模块的核心处理模块.

  */

  static int __init net_dev_init(void)

  {

  int i, rc = -ENOMEM;

  BUG_ON(!dev_boot_phase);

  net_random_init();

  if (dev_proc_init()) /*初始化proc文件系统*/

  goto out;

  if (netdev_sysfs_init()) /*初始化sysfs文件系统*/

  goto out;

  /*ptype_all和ptype_base是重点,后面会详细分析,它们都是

  struct list_head类型变量,这里初始化链表成员*/

  INIT_LIST_HEAD(&ptype_all);

  for (i = 0; i < 16; i++)

  INIT_LIST_HEAD(&ptype_base[i]);

  for (i = 0; i < ARRAY_SIZE(dev_name_head); i++)

  INIT_HLIST_HEAD(&dev_name_head[i]);

  for (i = 0; i < ARRAY_SIZE(dev_index_head); i++)

  INIT_HLIST_HEAD(&dev_index_head[i]);

  /*

  * 初始化包接收队列,这里我们的重点了.

  */

  /*遍历每一个CPU,取得它的softnet_data,我们说过,它是一个struct softnet_data的Per-CPU变量*/

  for (i = 0; i < NR_CPUS; i++) {

  struct softnet_data *queue;

  /*取得第i个CPU的softnet_data,因为队列是包含在它里边的,所以,我会直接说,“取得队列”*/

  queue = &per_cpu(softnet_data, i);

  /*初始化队列头*/

  skb_queue_head_init(&queue->input_pkt_queue);

  queue->throttle = 0;

  queue->cng_level = 0;

  queue->avg_blog = 10; /* arbitrary non-zero */

  queue->completion_queue = NULL;

  INIT_LIST_HEAD(&queue->poll_list);

  set_bit(__LINK_STATE_START, &queue->backlog_dev.state);

  queue->backlog_dev.weight = weight_p;

  /*这里,队列中backlog_dev设备,它是一个伪网络设备,不对应任何物理设备,它的poll函数,指向了

  process_backlog,后面我们会详细分析*/

  queue->backlog_dev.poll = process_backlog;

  atomic_set(&queue->backlog_dev.refcnt, 1);

  }

  #ifdef OFFLINE_SAMPLE

  samp_timer.expires = jiffies + (10 * HZ);

  add_timer(&samp_timer);

  #endif

  dev_boot_phase = 0;

  /*注册收/发软中断*/

  open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);

  open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

  hotcpu_notifier(dev_cpu_callback, 0);

  dst_init();

  dev_mcast_init();

  rc = 0;

  out:

  return rc;

  }

  这样,初始化完成后,在驱动程序中,在中断处理函数中,会调用netif_rx将数据交上来,这与采用轮询技术,有本质的不同:

  CODE:

  int netif_rx(struct sk_buff *skb)

  {

  int this_cpu;

  struct softnet_data *queue;

  unsigned long flags;

  /* if netpoll wants it, pretend we never saw it */

  if (netpoll_rx(skb))

  return NET_RX_DROP;

  /*接收时间戳未设置,设置之*/

  if (!skb->stamp.tv_sec)

  net_timestamp(&skb->stamp);

  /*

  * 这里准备将数据包放入接收队列,需要禁止本地中断,在入队操作完成后,再打开中断.

  */

  local_irq_save(flags);

  /*获取当前CPU对应的softnet_data变量*/

  this_cpu = smp_processor_id();

  queue = &__get_cpu_var(softnet_data);

  /*接收计数器累加*/

  __get_cpu_var(netdev_rx_stat).total++;

  /*接收队列是否已满*/

  if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {

  if (queue->input_pkt_queue.qlen) {

  if (queue->throttle) /*拥塞发生了,丢弃数据包*/

  goto drop;

  /*数据包入队操作*/

  enqueue:

  dev_hold(skb->dev); /*累加设备引入计数器*/

  __skb_queue_tail(&queue->input_pkt_queue, skb); /*将数据包加入接收队列*/

  #ifndef OFFLINE_SAMPLE

  get_sample_stats(this_cpu);

  #endif

  local_irq_restore(flags);

  return queue->cng_level;

  }

  /*

  * 驱动程序不断地调用net_rx函数,实现接收数据包的入队操作,当queue->input_pkt_queue.qlen == 0时(?什么情况下设置)

  * 则进入这段代码,这里,如果已经被设置拥塞标志的话,则清除它,因为这里将要调用软中断,开始将数据包交给

  * 上层了,即上层协议的接收函数将执行出队操作,拥塞自然而然也就不存在了。

  */

  if (queue->throttle)

  queue->throttle = 0;

  /*

  * netif_rx_schedule函数完成两件重要的工作:

  * 1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;

  * 2、触发软中断函数,进行数据包接收处理;

  */

  netif_rx_schedule(&queue->backlog_dev);

  goto enqueue;

  }

  /*前面判断了队列是否已满,如果已满而标志未设置,设置之,并累加拥塞计数器*/

  if (!queue->throttle) {

  queue->throttle = 1;

  __get_cpu_var(netdev_rx_stat).throttled++;

  }

  /*拥塞发生,累加丢包计数器,释放数据包*/

  drop:

  __get_cpu_var(netdev_rx_stat).dropped++;

  local_irq_restore(flags);

  kfree_skb(skb);

  return NET_RX_DROP;

  }

  从这段代码的分析中,我们可以看到,当数据被接收后,netif_rx的工作,就是取得当前CPU的队列,然后入队,然后返回,然后中断函数

  现调用它,它再把数据包入队……

  当队列接收完成后,netif_rx就调用netif_rx_schedule进一步处理数据包,我们注意到:

  1、前面讨论过,采用轮询技术时,同样地,也是调用netif_rx_schedule,把设备自己传递了过去;

  2、这里,采用中断方式,传递的是队列中的一个“伪设备”,并且,这个伪设备的poll函数指针,指向了一个叫做process_backlog的函数;

  netif_rx_schedule函数完成两件重要的工作:

  1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;

  2、触发软中断函数,进行数据包接收处理;

  这样,我们可以猜想,在软中断函数中,不论是伪设备bakclog_dev,还是真实的设备(如前面讨论过的e100),都会被软中断函数以:

  dev->poll()

  的形式调用,对于e100来说,poll函数的接收过程已经分析了,而对于其它所有没有采用轮询技术的网络设备来说,它们将统统调用

  process_backlog函数(我觉得把它改名为pseudo-poll是否更合适一些^o^)。

  OK,我想分析到这里,关于中断处理与轮询技术的差异,已经基本分析开了……

  继续来看,netif_rx_schedule进一步调用__netif_rx_schedule:

  CODE:

  /* Try to reschedule poll. Called by irq handler. */

  static inline void netif_rx_schedule(struct net_device *dev)

  {

  if (netif_rx_schedule_prep(dev))

  __netif_rx_schedule(dev);

  }

  CODE:

  /* Add interface to tail of rx poll list. This assumes that _prep has

  * already been called and returned 1.

  */

  static inline void __netif_rx_schedule(struct net_device *dev)

  {

  unsigned long flags;

  local_irq_save(flags);

  dev_hold(dev);

  /*伪设备也好,真实的设备也罢,都被加入了队列层的设备列表*/

  list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);

  if (dev->quota < 0)

  dev->quota += dev->weight;

  else

  dev->quota = dev->weight;

  /*触发软中断*/

  __raise_softirq_irqoff(NET_RX_SOFTIRQ);

  local_irq_restore(flags);

  }

  软中断被触发,注册的net_rx_action函数将被调用:

  CODE:

  /*接收的软中断处理函数*/

  static void net_rx_action(struct softirq_action *h)

  {

  struct softnet_data *queue = &__get_cpu_var(softnet_data);

  unsigned long start_time = jiffies;

  int budget = netdev_max_backlog;

  local_irq_disable();

  /*

  * 遍历队列的设备链表,如前所述,__netif_rx_schedule已经执行了

  * list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);

  * 设备bakclog_dev已经被添加进来了

  */

  while (!list_empty(&queue->poll_list)) {

  struct net_device *dev;

  if (budget <= 0 || jiffies - start_time > 1)

  goto softnet_break;

  local_irq_enable();

  /*取得链表中的设备*/

  dev = list_entry(queue->poll_list.next,

  struct net_device, poll_list);

  netpoll_poll_lock(dev);

  /*调用设备的poll函数,处理接收数据包,这样,采用轮询技术的网卡,它的真实的poll函数将被调用,

  这就回到我们上一节讨论的e100_poll函数去了,而对于采用传统中断处理的设备,它们调用的,都将是

  bakclog_dev的process_backlog函数*/

  if (dev->quota <= 0 || dev->poll(dev, &budget)) {

  netpoll_poll_unlock(dev);

  /*处理完成后,把设备从设备链表中删除,又重置于末尾*/

  local_irq_disable();

  list_del(&dev->poll_list);

  list_add_tail(&dev->poll_list, &queue->poll_list);

  if (dev->quota < 0)

  dev->quota += dev->weight;

  else

  dev->quota = dev->weight;

  } else {

  netpoll_poll_unlock(dev);

  dev_put(dev);

  local_irq_disable();

  }

  }

  out:

  local_irq_enable();

  return;

  softnet_break:

  __get_cpu_var(netdev_rx_stat).time_squeeze++;

  __raise_softirq_irqoff(NET_RX_SOFTIRQ);

  goto out;

  }

  对于dev->poll(dev, &budget)的调用,一个真实的poll函数的例子,我们已经分析过了,现在来看process_backlog,

  CODE:

  static int process_backlog(struct net_device *backlog_dev, int *budget)

  {

  int work = 0;

  int quota = min(backlog_dev->quota, *budget);

  struct softnet_data *queue = &__get_cpu_var(softnet_data);

  unsigned long start_time = jiffies;

  backlog_dev->weight = weight_p;

  /*在这个循环中,执行出队操作,把数据从队列中取出来,交给netif_receive_skb,直至队列为空*/

  for (;;) {

  struct sk_buff *skb;

  struct net_device *dev;

  local_irq_disable();

  skb = __skb_dequeue(&queue->input_pkt_queue);

  if (!skb)

  goto job_done;

  local_irq_enable();

  dev = skb->dev;

  netif_receive_skb(skb);

  dev_put(dev);

  work++;

  if (work >= quota || jiffies - start_time > 1)

  break;

  }

  backlog_dev->quota -= work;

  *budget -= work;

  return -1;

  /*当队列中的数据包被全部处理后,将执行到这里*/

  job_done:

  backlog_dev->quota -= work;

  *budget -= work;

  list_del(&backlog_dev->poll_list);

  smp_mb__before_clear_bit();

  netif_poll_enable(backlog_dev);

  if (queue->throttle)

  queue->throttle = 0;

  local_irq_enable();

  return 0;

  }

  这个函数重要的工作,就是出队,然后调用netif_receive_skb()将数据包交给上层,这与上一节讨论的poll是一样的。这也是为什么,

  在网卡驱动的编写中,采用中断技术,要调用netif_rx,而采用轮询技术,要调用netif_receive_skb啦!

  到了这里,就处理完数据包与设备相关的部分了,数据包将进入上层协议栈

653624