Open hhstore opened 3 years ago
core
下, 集成了丰富的组件功能. core/stores/sqlc
core/stores/cache
core/discov
core/breaker
core/fx
, 类似 RxGo整体调用链路说明:
CachedConn
模块: core/stores/sqlc/cachedsql.go
sqlx.SqlConn
: core/stores/sqlx/sqlconn.go
cache.Cache
: cacheNode
模块: core/stores/cache/cachenode.go
NewNodeConn()
NewConn
QueryRow()
和 Exec()
方法, 可以快速掌握. core/stores/sqlc/cachedsql.go
QueryRow()
方法: https://github.com/tal-tech/go-zero/blob/v1.1.5/core/stores/sqlc/cachedsql.go#L88Exec()
: https://github.com/tal-tech/go-zero/blob/f01472c9ea/core/stores/sqlc/cachedsql.go#L69
type (
// A CachedConn is a DB connection with cache capability.
CachedConn struct {
db sqlx.SqlConn
cache cache.Cache // cache 支持
}
)
// NewNodeConn returns a CachedConn with a redis node cache. func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) CachedConn { return CachedConn{ db: db, cache: cache.NewNode(rds, exclusiveCalls, stats, sql.ErrNoRows, opts...), } }
// NewConn returns a CachedConn with a redis cluster cache. func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedConn { return CachedConn{ db: db, cache: cache.New(c, exclusiveCalls, stats, sql.ErrNoRows, opts...), } }
// QueryRow unmarshals into v with given key and query func. func (cc CachedConn) QueryRow(v interface{}, key string, query QueryFn) error { // 关键调用: cache.Take() 内实现 cache set 操作, 见下面 return cc.cache.Take(v, key, func(v interface{}) error { return query(cc.db, v) }) }
// Exec runs given exec on given keys, and returns execution result. func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) { res, err := exec(cc.db) if err != nil { return nil, err }
// 主动删除缓存: insert/update/delete 操作, 会导致数据变更, 主动删除 cache, 简单安全
if err := cc.DelCache(keys...); err != nil {
return nil, err
}
return res, nil
}
> core/stores/cache/cachenode.go
- https://github.com/tal-tech/go-zero/blob/v1.1.5/core/stores/cache/cachenode.go#L28
- https://github.com/tal-tech/go-zero/blob/v1.1.5/core/stores/cache/cachenode.go#L115
- https://github.com/tal-tech/go-zero/blob/v1.1.5/core/stores/cache/cachenode.go#L103
- doTake(): https://github.com/tal-tech/go-zero/blob/v1.1.5/core/stores/cache/cachenode.go#L166
```go
type cacheNode struct {
rds *redis.Redis // redis 写缓存
expiry time.Duration
notFoundExpiry time.Duration
barrier syncx.SharedCalls
r *rand.Rand
lock *sync.Mutex
unstableExpiry mathx.Unstable
stat *Stat
errNotFound error
}
// TakeWithExpire takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result.
func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
return c.doTake(v, key, query, func(v interface{}) error {
return c.Set(key, v) // 关键调用
})
}
func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
cacheVal func(v interface{}) error) error {
val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
if err := c.doGetCache(key, v); err != nil {
if err == errPlaceholder {
return nil, c.errNotFound
} else if err != c.errNotFound {
// why we just return the error instead of query from db,
// because we don't allow the disaster pass to the dbs.
// fail fast, in case we bring down the dbs.
return nil, err
}
if err = query(v); err == c.errNotFound {
if err = c.setCacheWithNotFound(key); err != nil {
logx.Error(err)
}
return nil, c.errNotFound
} else if err != nil {
c.stat.IncrementDbFails()
return nil, err
}
if err = cacheVal(v); err != nil {
logx.Error(err)
}
}
return jsonx.Marshal(v)
})
if err != nil {
return err
}
if fresh {
return nil
}
// got the result from previous ongoing query
c.stat.IncrementTotal()
c.stat.IncrementHit()
return jsonx.Unmarshal(val.([]byte), v)
}
// SetCache sets the cache with key and v, using c.expiry.
func (c cacheNode) Set(key string, v interface{}) error {
return c.SetWithExpire(key, v, c.aroundDuration(c.expiry))
}
// SetCacheWithExpire sets the cache with key and v, using given expire.
func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
data, err := jsonx.Marshal(v)
if err != nil {
return err
}
return c.rds.Setex(key, string(data), int(expire.Seconds())) // redis 写缓存
}
1
1
1
1
xxxxxxxxxxxxxx | xxxxxxxxxxxxxx | xxxx |
---|---|---|
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
xxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxx | xxxxxxxxxx |
related: