eastany / eastany.github.com

我的博客
4 stars 3 forks source link

go源码阅读sync #36

Open eastany opened 9 years ago

eastany commented 9 years ago

once.go

type Once struct {
    m    Mutex
    done uint32       //记录是否执行
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {    //如果已经执行过f,则直接返回
        return
    }
    // Slow-path.
    o.m.Lock()                                         //加锁 确保只执行一次
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)         //执行f之后设置 状态
        f()
    }
}
eastany commented 9 years ago
type Mutex struct {
    state int32
    sema  uint32
}

// Mutex互斥锁 // Mutexes可以作为结构体成员实例化 //零值代表未加锁

type Locker interface {
    Lock()
    Unlock()
}

锁接口

//互斥锁的加锁方法
// 如果已加锁则调用该方法的协程阻塞,直到解锁
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if raceenabled {
            raceAcquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}
eastany commented 9 years ago

pool.go

type Pool struct {
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array
    New func() interface{}
}

// Local per-P Pool appendix.
type poolLocal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
    pad     [128]byte     // Prevents false sharing.
}

P则为每个线程,也就是物理并行执行的单元 Pool设计为临时数据的缓存,其中的数据有可能丢失。 提供了PUT/GET接口 PUT:抢占式下时no-op的,首先从全局pool中找到P自己的pool,对shared加锁 append GET:从P自己的pool中获取,如果没有,则会向其他P的pool中获取

func poolCleanup() 该函数会在GC之前执行,并且对pool变量全部置0/nil

eastany commented 9 years ago

rwmutex.go

type RWMutex struct {
    w           Mutex  // 写互斥锁
    writerSem   uint32 // 读信号
    readerSem   uint32 // 写信号
    readerCount int32  // 读数量
    readerWait  int32  // 写数量
}

RLock: readerSem+1 RUnlock:readerSem-1,最后一个的话,则release

Lock:则是互斥锁来实现,把reader当作writer来处理。。。 Unlock:release所有读信号,互斥锁解开