1 协程基础

1.1 协程定义(Goroutine)

  • 概念:Go 语言特有的轻量级线程,由 Go 运行时(runtime)管理,相比系统线程(Thread),创建和销毁成本极低,占用内存小(初始 2KB)。协程是 Go 程序中最基本的并发执行单元。
  • 创建方式:使用go关键字启动一个协程

    func main() {
        // 匿名函数直接启动协程
        go func() {
            fmt.Println("Hello from goroutine!")
        }() 
    
        // 调用已定义函数启动协程
        go func1()
        go func2()
    
        time.Sleep(time.Second) // 等待协程执行,否则主协程退出导致所有协程终止
        fmt.Println("主协程退出")
    }
    
    

1.2 协程调度模型GMP

Go 调度器采用 Goroutine-Machine-Processor (GMP) 模型,核心组件包括:

  • G (Goroutine):协程的抽象,包含执行栈、程序计数器等信息。
  • M (Machine):   对应操作系统线程,实际执行代码的实体。
  • P (Processor):逻辑处理器,持有运行队列(Local Queue)和 G 上下文,必须绑定 M 才能执行 G。

    M 是 “执行任务的实体”,是唯一能运行 Go 代码的载体。G(任务)本身只是一段代码逻辑,必须依赖 M(操作系统线程)才能在 CPU 上执行 
    M和P是绑定关系,必须成对出现

1.2.1 协程创建

  • 当调用 go func() 时,创建一个新的 G 对象,放入当前 P 的 Local Queue。
  • 若 Local Queue 已满(默认 256 个 G),将一半 G 转移到全局队列(Global Queue)。

1.2.2 协程执行

  • M 从绑定的 P 的 Local Queue 获取 G 执行。
  • 若 Local Queue 为空,从 Global Queue 批量获取 G(通常为 P 的 GOMAXPROCS/2)。
  • 若 Global Queue 也为空,从其他 P 的 Local Queue 窃取(Work Stealing) 一半 G。

1.2.3 协程阻塞 / 唤醒

  • 当 G 执行系统调用(如 I/O)时,M 与 P 解绑,P 可被其他 M 接管继续执行队列中的 G。

如果 M 因系统调用被阻塞时,P 继续绑定 M,会导致以下问题:

  1. P 无法工作:P 的本地队列中可能有大量就绪的 G,但由于 M 被阻塞,这些 G 无法执行。
  2. CPU 核心浪费:如果 P 对应一个 CPU 核心,该核心将处于闲置状态,即使还有其他任务可执行。
  3. 因此,当 G 执行系统调用时,调度器会 解绑 M 和 P,允许 P 继续工作,避免 CPU 资源浪费
  • 系统调用返回后,G 重新加入某个 P 的队列等待执行。

2 并发模式 

2.1 共享内存并发

多个协程通过共享变量访问数据,需使用同步原语(如sync.Mutexsync.RWMutex)保护临界区

var (
    counter int
    mu      sync.Mutex
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    wg.Wait()
    fmt.Println("Counter:", counter) // 输出1000,无竞争
}


2.2 CSP 并发(通过通道通信)

使用channel实现协程间通信和同步,遵循 “不要通过共享内存来通信,而要通过通信来共享内存” 原则

func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for num := range ch {
        fmt.Println("Received:", num)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}


2.3 并发任务控制

// 普通的协程创建方法:
go func() {
  // your code1
}()
go func() {
  // your code2
}()
// go on

这段 Go 代码的执行顺序如下:

  1. 启动 goroutine 1:主协程创建并启动第一个匿名函数(// your code1),该函数在后台异步执行。

  2. 启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(// your code2),同样在后台异步执行。

  3. 主协程继续执行:主协程不会等待这两个 goroutine 完成,而是立即继续执行// go on之后的代码。

  4. 并行执行 goroutine// your code1// your code2的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。由于主协程未等待它们,若主协程提前结束(例如程序退出),这两个 goroutine 可能被强制终止

2.3.1 sync.WaitGroup

wg.Wait()会阻塞直到2个协程执行完后

go func() { 
// func1
  wg.Done()
}()
go func() {
// func2
 wg.Done()
}()
wg.Wait()
// go on

这段 Go 代码的执行顺序如下:

  1. 启动 goroutine 1:主协程创建并启动第一个匿名函数(func1),该函数在后台异步执行。

  2. 启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(func2),同样在后台异步执行。

  3. 主协程阻塞:主协程执行wg.Wait(),进入阻塞状态,等待所有被等待的 goroutine 完成。

  4. 并行执行 goroutinefunc1func2的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。每个 goroutine 在完成任务后调用wg.Done()通知等待组。

  5. 恢复主协程:当所有被等待的 goroutine(即func1func2)都调用了wg.Done()后,wg.Wait()返回,主协程继续执行后续代码(// go on)。

   主协程         |    goroutine 1      |    goroutine 2

---------------------------------------------------------------

wg.Add(2)      |                             |

启动func1       |  开始执行func1   |

启动func2       |                            | 开始执行func2

wg.Wait()阻塞 |          ...               |         ...                

                        |      执行完毕       |

                        | wg.Done()          |

                        |                            |      执行完毕

                        |                            |      wg.Done()

wg.Wait()返回  |                            |

继续执行后续代码

2.3.2 errgroup.Group

var g errgroup.Group
g.Go(func() error {
    // 任务1:可能返回错误
    return nil
})
g.Go(func() error {
    // 任务2:可能返回错误
    return errors.New("task failed")
})
if err := g.Wait(); err != nil {
    // 处理首个错误(如任务2失败)
}

执行顺序

  1. 主协程启动两个 goroutine 并行执行

  2. 若其中一个 goroutine 返回非 nil 错误:

    • 自动调用内置的context.CancelFunc

    • 向其他 goroutine 发送取消信号(通过 context)

    • g.Wait()立即返回首个错误

  3. 所有 goroutine(包括未出错的)需主动检查 context 状态并提前退出

2.3.3 对比

特性 errgroup.Group sync.WaitGroup
错误处理 自动捕获首个非 nil 错误并终止所有 goroutine 不处理错误
执行控制 首个错误发生后,其他 goroutine 会被 CancelFunc 终止 所有 goroutine 独立运行至完成
结果聚合 可返回首个错误,用于统一错误处理 无内置错误传递机制
取消机制 支持通过 context 传播取消信号 无内置取消机制


3. 并发panic处理

协程中发生 panic 若未被捕获,仅会导致该协程崩溃,不会影响其他协程和主程序,但可能导致资源泄漏 

3.1 普通goroutine的panic处理

对于普通的goroutine,可以在协程函数内部使用defer和recover组合来捕获panic。defer语句会将函数推迟到外层函数返回之前执行,而recover函数用于捕获panic,它只能在defer修饰的函数中有效

func worker() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered in worker:", r)
        }
    }()
    // 可能触发panic的代码
    var data map[string]int
    data["key"] = 1 // 触发panic: assignment to entry in nil map
}

func main() {
    go worker()
    time.Sleep(time.Second)
    fmt.Println("Main continues")
}

3.2 使用 sync.WaitGroup 时的 panic 处理

sync.WaitGroup常用于等待一组goroutine完成任务。在这种场景下,每个goroutine内部仍需使用defer和recover捕获panic,并且可以通过额外的机制将panic信息传递给主协程。 

import (
    "fmt"
    "sync"
)

type Result struct {
    Err  error
    Data interface{}
}

func worker(id int, wg *sync.WaitGroup, resultChan chan<- Result) {
    defer func() {
        if r := recover(); r != nil {
            resultChan <- Result{Err: fmt.Errorf("panic in worker %d: %v", id, r)}
        }
    }()
    // 模拟可能触发panic的任务
    if id == 2 {
        panic("simulated panic")
    }
    resultChan <- Result{Data: fmt.Sprintf("Worker %d finished", id)}
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    resultChan := make(chan Result)
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg, resultChan)
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    for result := range resultChan {
        if result.Err != nil {
            fmt.Println(result.Err)
        } else {
            fmt.Println(result.Data)
        }
    }
}

        worker函数通过defer和recover捕获panic,并将错误信息封装成Result结构体发送到resultChan通道。主协程从通道中接收结果,判断是否存在错误并进行相应处理,确保即使有goroutine发生panic,也能及时获取信息并继续执行后续逻辑。 

3.3 使用 errgroup.Group 时的 panic 处理

        errgroup.Group可以方便地并行执行多个任务,并在其中一个任务出错时快速返回错误。然而,它只能处理函数返回的错误,无法自动捕获goroutine内部的panic。因此,需要手动在每个任务函数中添加panic捕获逻辑,并将panic转换为错误返回给errgroup.Group。​

3.3.1 方法一:手动封装panic捕获

import (
    "fmt"
    "golang.org/x/sync/errgroup"
)

func safeGo(g *errgroup.Group, fn func() error) {
    g.Go(func() error {
        defer func() {
            if r := recover(); r != nil {
                return fmt.Errorf("panic occurred: %v", r)
            }
        }()
        return fn()
    })
}

func main() {
    var g errgroup.Group
    safeGo(&g, func() error {
        // 可能触发panic的任务
        panic("unexpected error")
        return nil
    })

    if err := g.Wait(); err != nil {
        fmt.Println("Error:", err) // 输出 panic occurred: unexpected error
    }
}

3.3.2 封装增强版errgroup

import (
    "fmt"
    "golang.org/x/sync/errgroup"
    "sync"
)

type SafeGroup struct {
    g       errgroup.Group
    mu      sync.Mutex
    panics  []interface{}
}

func (sg *SafeGroup) Go(fn func() error) {
    sg.g.Go(func() error {
        defer func() {
            if r := recover(); r != nil {
                sg.mu.Lock()
                sg.panics = append(sg.panics, r)
                sg.mu.Unlock()
            }
        }()
        return fn()
    })
}

func (sg *SafeGroup) Wait() error {
    if err := sg.g.Wait(); err != nil {
        return err
    }
    if len(sg.panics) > 0 {
        return fmt.Errorf("panics occurred: %v", sg.panics)
    }
    return nil
}

func main() {
    var sg SafeGroup
    sg.Go(func() error {
        panic("panic in goroutine")
        return nil
    })

    if err := sg.Wait(); err != nil {
        fmt.Println("Error:", err) // 输出: panics occurred: [panic in goroutine]
    }
}

        这两种方案都能有效地在errgroup.Group中处理panic,方案一通过简单的函数封装,在每个任务中添加panic捕获;方案二则通过自定义结构体,将panic信息集中管理,在Wait方法中统一返回错误,方便在复杂场景下对panic进行更灵活的处理。​

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐