Golang实现简单爬虫框架(3)——简单并发版

  • 时间: 2019-05-22 12:10:23

在上篇文章 Golang实现简单爬虫框架(2)——单任务版爬虫 中我们实现了一个简单的单任务版爬虫,对于单任务版爬虫,每次都要请求页面,然后解析数据,然后才能请求下一个页面。整个过程中,获取网页数据速度比较慢,那么我们就把获取数据模块做成并发执行。在项目的基础上,实现多任务并发版爬虫。

项目github地址: https://github.com/NovemberCh... , 回滚到相应记录食用,效果更佳。

1、项目架构

首先我们把但任务版爬虫架构中的 Fetcher 模块和 Parser 模块合并成一个 Worker 模块,然后并发执行 Worker 模块

然后得到并发版的架构图:

  • 在并发版爬虫中,会同时执行多个 Worker ,每个 Worker 任务接受一个 Request 请求,然后请求页面解析数据,输出解析出的 RequestsItem
  • 因为又很多 RequestWorker ,所以还需要 Scheduler 模块,负责对请求任务的调度处理
  • Engine 模块接受 Worker 发送的 RequestsItems ,当前我们先把 Items 打印出,把解析出的 Request 发送给调度器
  • 其中 EngineScheduler 是一个 goruntineWorker 包含多个 goruntine ,各个模块之间都是用 channel

    进行连接

    先放上重构后的项目文件结构:

2、Worker实现

我们从engine.go中提取下面功能作为Worker模块,同时把engine.go 更名为simple.go。修改后的simple.go文件请自行调整,或者去github项目 源代码 回滚查看。

engine/worker.go

package engineimport (    "crawler/fetcher"    "log")// 输入 Request, 返回 ParseResultfunc worker(request Request) (ParseResult, error) {    log.Printf("Fetching %s\n", request.Url)    content, err := fetcher.Fetch(request.Url)    if err != nil {        log.Printf("Fetch error, Url: %s %v\n", request.Url, err)        return ParseResult{}, err    }    return request.ParseFunc(content), nil}

对于每一个Worker接受一个请求,然后返回解析出的内容

3、并发引擎Concurrent实现

请大家根据架构图来看,效果会更好。

package engineimport "log"// 并发引擎type ConcurrendEngine struct {   Scheduler   Scheduler    // 任务调度器   WorkerCount int            // 任务并发数量}// 任务调度器type Scheduler interface {   Submit(request Request) // 提交任务   ConfigMasterWorkerChan(chan Request)    // 配置初始请求任务}func (e *ConcurrendEngine) Run(seeds ...Request) {   in := make(chan Request)            // scheduler的输入   out := make(chan ParseResult)    // worker的输出   e.Scheduler.ConfigMasterWorkerChan(in)    // 把初始请求提交给scheduler   // 创建 goruntine   for i := 0; i < e.WorkerCount; i++ {      createWorker(in, out)   }   // engine把请求任务提交给 Scheduler   for _, request := range seeds {      e.Scheduler.Submit(request)   }   itemCount := 0   for {      // 接受 Worker 的解析结果      result := <-out      for _, item := range result.Items {         log.Printf("Got item: #%d: %v\n", itemCount, item)         itemCount++      }      // 然后把 Worker 解析出的 Request 送给 Scheduler      for _, request := range result.Requests {         e.Scheduler.Submit(request)      }   }}// 创建任务,调用worker,分发goruntinefunc createWorker(in chan Request, out chan ParseResult) {   go func() {      for {         request := <-in         result, err := worker(request)         if err != nil {            continue         }         out <- result      }   }()}

4、任务调度器Scheduler实现

scheduler/scheduler.go

package schedulerimport "crawler/engine"type SimpleScheduler struct {    workerChan chan engine.Request}func (s *SimpleScheduler) Submit(request engine.Request) {    // send request down to worker chan    go func() {        s.workerChan <- request    }()}// 把初始请求发送给 Schedulerfunc (s *SimpleScheduler) ConfigMasterWorkerChan(in chan engine.Request) {    s.workerChan = in}

5、main函数

package mainimport (    "crawler/engine"    "crawler/scheduler"    "crawler/zhenai/parser")func main() {    e := engine.ConcurrendEngine{    // 配置爬虫引擎        Scheduler:   &scheduler.SimpleScheduler{},        WorkerCount: 50,    }    e.Run(engine.Request{        // 配置爬虫目标信息        Url:       "http://www.zhenai.com/zhenghun",        ParseFunc: parser.ParseCityList,    })}

6、小结

本次博客我们实现一个最简单的并发版爬虫,调度器源源不断的接受任务,一旦有一个worker空闲,就给其分配任务。这样子有一个缺点,就是我们不知道我们分发出那么多worker的工作情况,对worker的控制力比较弱,所以在下次博客中会用队列来实现任务调度。

如果想获取 Google工程师深度讲解go语言 视频资源的,可以在评论区留言。

项目的 源代码 已经托管到Github上,对于各个版本都有记录,欢迎大家查看,记得给个star,在此先谢谢大家了