stormi 框架所原创的代理方案集成了类似 Spring 的容器方案,Springboot 的自动配置方案和 Spring Cloud 的微服务方案的功能,并且更容易使用,功能更强大,可扩展性更强。stormi 原生集成了服务注册与发现,分布式锁,和分布式事务等功能,还提供了一套进程间通信以及跨进程间任务调度的解决方案,让微服务的开发真正实现跨主机联动,使得多人同时开发同一个功能成为现实,基于该方案本框架实现了 Cooperation 代理用于替代传统的服务注册与发现模式。该框架已经给各位实现了 Redis 代理,Config 代理,Server 代理,MySQL 代理,Transaction 代理,NSQD 代理和、Cooperation 代理和Sync代理,其中最强大的是 Redis 代理和 Config 代理,这两个代理是最底层的代理,所有的代理都依赖这两个代理.。github 链接:https://github.com/stormi-li/stormi
go get -u github.com/stormi-li/stormi
package main
import "github.com/stormi-li/stormi"
func main() {
stormi.NodeBuilder.Install()
}
//启动 redis 单例
package main
import "github.com/stormi-li/stormi"
func main() {
stormi.NodeBuilder.CreateRedisNode(213, stormi.NodeType.RedisStandalone, "127.0.0.1", "C:\\Users\\lilili\\Desktop\\stormistudy\\redisstandalone")
}
//启动 Redis 集群
package main
import "github.com/stormi-li/stormi"
func main() {
stormi.NodeBuilder.CreateRedisCluster(2131, 2132, 2133, 2134, 2135, 2136, "127.0.0.1", "C:\\Users\\lilili\\Desktop\\stormistudy\\rediscluster")
}
//启动 Redis 集群节点并将其加入 Redis 集群当中
package main
import "github.com/stormi-li/stormi"
func main() {
stormi.NodeBuilder.CreateRedisNode(2137, stormi.NodeType.RedisCluster, "127.0.0.1", "C:\\Users\\lilili\\Desktop\\stormistudy\\rediscluster")
stormi.NodeBuilder.AddNodeToRedisCluster("127.0.0.1:2137", "127.0.0.1:2131", stormi.NodeType.RedisMaster)
stormi.NodeBuilder.CreateRedisNode(2138, stormi.NodeType.RedisCluster, "127.0.0.1", "C:\\Users\\lilili\\Desktop\\stormistudy\\rediscluster")
stormi.NodeBuilder.AddNodeToRedisCluster("127.0.0.1:2138", "127.0.0.1:2131", stormi.NodeType.RedisSlave)
}
package main
import "github.com/stormi-li/stormi"
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
rp.RedisClusterNodesInfo()
}
----------------------------------
package main
import "github.com/stormi-li/stormi"
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:213")
rp.RedisSingleNodeInfo()
}
package main
import (
"context"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:213")
rc := rp.RedisClient()
// rcc := rp.RedisClusterClient()
rc.Set(context.Background(), "stormi", "stormi", 0)
}
-----------------------------------------------------------
package main
import (
"context"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
//rp.RedisClient()
rcc := rp.RedisClusterClient()
rcc.Set(context.Background(), "stormi", "stormi", 0)
}
//分布式锁,实现了看门狗机制和锁id识别
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
go func() {
l := rp.NewLock("lock")
for {
l.Lock()
fmt.Println(1)
time.Sleep(1000 * time.Millisecond)
l.UnLock()
time.Sleep(20 * time.Millisecond)
}
}()
go func() {
l := rp.NewLock("lock")
for {
l.Lock()
fmt.Println(2)
l.UnLock()
}
}()
select {}
}
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
res := rp.Wait("stormi-channel", 100*time.Second)
fmt.Println(res)
}
-----------------------------------------------------------
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
rp.Notify("stormi-channel", "hello world!!!")
}
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
rp.CycleWait("stormi-channel", 5*time.Second, true, func(msg *string) error {
if msg != nil {
fmt.Println(*msg)
} else {
fmt.Println("超时")
}
return nil
})
}
-----------------------------------------------------------
package main
import (
"time"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
for {
rp.Notify("stormi-channel", "stormi-notify")
time.Sleep(1000 * time.Millisecond)
}
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
pubsub := rp.GetPubSub("stormi-pubsub")
//开启订阅
rp.Subscribe(pubsub, 0, func(msg string) int {
if msg == "stop" {
return 1
}
fmt.Println(msg)
return 0
})
fmt.Println("订阅关闭")
}
-----------------------------------------------------------
package main
import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
msgc := make(chan string, 1)
sdc := make(chan struct{}, 1)
exit := make(chan struct{}, 1)
go func() {
//开启发布
rp.Publish("stormi-pubsub", msgc, sdc)
fmt.Println("发布关闭")
exit <- struct{}{}
}()
for i := 0; i < 10; i++ {
msgc <- uuid.NewString()
time.Sleep(1 * time.Second)
}
msgc <- "stop"
sdc <- struct{}{}
<-exit
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
pubsub := rp.GetPubSub("stormi-chat")
rp.Subscribe(pubsub, 0, func(msg string) int {
fmt.Println(msg)
return 0
})
}
----------------------------------
package main
import (
"bufio"
"os"
"strings"
"github.com/stormi-li/stormi"
)
func main() {
rp := stormi.NewRedisProxy("127.0.0.1:2131")
reader := bufio.NewReader(os.Stdin)
for {
text, _ := reader.ReadString('\n')
text = strings.TrimSpace(text)
rp.Notify("stormi-chat", text)
}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
c := cp.NewConfig()
c.Name = "nsqd"
c.Addr = "127.0.0.1:3131"
cp.Register(c)
cp.ConfigPersistence()
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cmap := cp.Pull("nsqd")
for _, c := range cmap {
fmt.Printf("c: %v\n", c)
}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cmap := cp.Pull("nsqd")
for _, c := range cmap {
c.Ignore = true
cp.Update(c)
}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cmap := cp.Pull("nsqd")
for _, c := range cmap {
c.Ignore = true
cp.Remove(c)
}
cp.RemoveRegister("nsqd")
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
c1 := cp.NewConfig()
c1.Name = "nsqd"
c1.Addr = "127.0.0.1:3131"
c2 := cp.NewConfig()
c2.Name = "nsqd"
c2.Addr = "127.0.0.1:3132"
//如果配置不存在就会注册配置
cp.Refreshs([]stormi.Config{c1, c2})
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cmap := cp.Pull("nsqd")
cp.Removes(cmap)
}
package main
import (
"time"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
time.Sleep(3 * time.Second)
cp.Sync()
}
package main
import "github.com/stormi-li/stormi"
func main() {
stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
select{}
}
-----------------------------------------------------------
package main
import "github.com/stormi-li/stormi"
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cp.NotifySync("同步信息")
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cp.AddConfigHandler("nsqd", func(cmap map[string]*stormi.Config) {
for _, c := range cmap {
fmt.Printf("c: %v\n", c)
}
})
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cp.AddConfigSyncNotificationHandler(func(configProxy stormi.ConfigProxy, msg string) {
fmt.Println(msg)
})
select {}
}
-----------------------------------------------------------
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cp.NotifySync("同步新配置")
}
package main
import (
"time"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewServerProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
cp.ConfigProxy().AddConfigSyncNotificationHandler(func(configProxy stormi.ConfigProxy, msg string) {})
cp.Register("stormiserver", "127.0.0.1:8888", 3, 3*time.Second)
select {}
}
-----------------------------------------------------------
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewServerProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
cp.Discover("stormiserver", 3*time.Second, func(addr string) error {
fmt.Println(addr)
return nil
})
select {}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.Register(33061, "127.0.0.1:3306", "root", "123456", "stormi")
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.ConnectByNodeId(33061)
}
-----------------------------------------------------------------
//动态修改数据库连接
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
mp := stormi.NewMysqlProxy(cp)
mp.ConnectByNodeId(33061)
cp.AddConfigHandler("mysql", func(cmap map[string]*stormi.Config) {
for _, c := range cmap {
if c.NodeId == 33061 {
mp.ConnectByConfig(*c)
break
}
}
})
select {}
}
-----------------------------------------------------------------
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cp := stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
cmap := cp.Pull("mysql")
for _, c := range cmap {
c.NodeId = 33061
c.Addr = "192.168.37.139:3306"
cp.Update(c)
cp.NotifySync("33061节点数据库更新, 请重新连接数据库")
break
}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.ConnectByNodeId(33061)
ct := ConfigTable{}
mp.DB().AutoMigrate(&ct)
ct.Name = "nsqd"
ct.Addr = "127.0.0.1:3131"
mp.DB().Create(&ct)
}
type ConfigTable struct {
Name string
Addr string
}
package main
import (
"fmt"
"math/rand"
"strconv"
"github.com/stormi-li/stormi"
)
func main() {
tp := stormi.NewTransactionProxy(stormi.NewRedisProxy("127.0.0.1:2131"))
ids := tp.NewDTxIds(3)
server1(ids[0])
server2(ids[1])
server3(ids[2])
tp.DCommit(ids, func(statement [][2]string) {
fmt.Println("分布式事务失败")
fmt.Println(statement)
})
select {}
}
func server1(id string) {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.ConnectByNodeId(33061)
ct := ConfigTable{}
ct.Name = "nsqd"
ct.Addr = "127.0.0.1:" + strconv.Itoa(rand.Intn(10000))
dtx := mp.NewDTx(id)
dtx.DB().Create(&ct)
dtx.Rollback()
}
func server2(id string) {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.ConnectByNodeId(33061)
ct := ConfigTable{}
ct.Name = "nsqd"
ct.Addr = "127.0.0.1:" + strconv.Itoa(rand.Intn(10000))
dtx := mp.NewDTx(id)
dtx.DB().Create(&ct)
dtx.Commit()
}
func server3(id string) {
mp := stormi.NewMysqlProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
mp.ConnectByNodeId(33061)
ct := ConfigTable{}
ct.Name = "nsqd"
ct.Addr = "127.0.0.1:" + strconv.Itoa(rand.Intn(10000))
dtx := mp.NewDTx(id)
dtx.DB().Create(&ct)
dtx.Commit()
}
type ConfigTable struct {
Name string
Addr string
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
np := stormi.NewNsqdProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
stormi.NodeBuilder.CreateNsqdNode(4441, 4442, "C:\\Users\\lilili\\Desktop\\nsqd")
stormi.NodeBuilder.CreateNsqdNode(4443, 4444, "C:\\Users\\lilili\\Desktop\\nsqd")
stormi.NodeBuilder.CreateNsqdNode(4445, 4446, "C:\\Users\\lilili\\Desktop\\nsqd")
stormi.NodeBuilder.CreateNsqdNode(4447, 4448, "C:\\Users\\lilili\\Desktop\\nsqd")
np.Register("127.0.0.1:4441")
np.Register("127.0.0.1:4443")
np.Register("127.0.0.1:4445")
np.Register("127.0.0.1:4447")
}
--------------------------------------
//启动 60 个 nsqd 节点
package main
import (
"strconv"
"github.com/stormi-li/stormi"
)
func main() {
np := stormi.NewNsqdProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
for i := 0; i < 60; i++ {
stormi.NodeBuilder.CreateNsqdNode(4440+2*i, 4440+2*i+1, "C:\\Users\\lilili\\Desktop\\nsqd")
np.Register("127.0.0.1:" + strconv.Itoa(4440+2*i))
}
}
package main
import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/nsqio/go-nsq"
"github.com/stormi-li/stormi"
)
func main() {
np := stormi.NewNsqdProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")))
np.AddConsumeHandler("stormi-nsqd", "channel1", func(message *nsq.Message) error {
fmt.Println(string(message.Body))
return nil
})
for {
np.Publish("stormi-nsqd", []byte(uuid.NewString()))
time.Sleep(200 * time.Millisecond)
}
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cop := stormi.NewCooperationProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")), "UserServer")
cop.CreateCoprotocol()
}
package UserServer
// 协议码
const (
Insert = iota
Update
)
// 数据传输结构体
type UserServerDto struct {
Id int
UserName string
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
cop := stormi.NewCooperationProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")), "UserServer")
cop.PushCoprotocol()
}
package main
import (
"encoding/json"
"time"
"github.com/stormi-li/stormi"
UserServer "github.com/stormi-li/stormi/coprotocol/UserServer"
)
func main() {
handler1()
handler2()
select {}
}
func handler1() {
cop := stormi.NewCooperationProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")), "UserServer")
cophd := cop.NewHandler()
cophd.SetBufferSize(10)
cophd.SetConcurrency(2)
cophd.Handle(UserServer.Insert, func(data []byte) any {
user := UserServer.UserServerDto{}
json.Unmarshal(data, &user)
user.Id = 1
user.UserName = "handler1"
time.Sleep(100 * time.Millisecond)
return user
})
}
func handler2() {
cop := stormi.NewCooperationProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")), "UserServer")
cophd := cop.NewHandler()
cophd.SetBufferSize(10)
cophd.SetConcurrency(2)
cophd.Handle(UserServer.Insert, func(data []byte) any {
user := UserServer.UserServerDto{}
json.Unmarshal(data, &user)
user.Id = 2
user.UserName = "handler2"
time.Sleep(200 * time.Millisecond)
return user
})
}
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
UserServer "github.com/stormi-li/stormi/coprotocol/UserServer"
)
func main() {
cop := stormi.NewCooperationProxy(stormi.NewConfigProxy(stormi.NewRedisProxy("127.0.0.1:2131")), "UserServer")
caller := cop.NewCaller()
caller.SetTimeout(10 * time.Second)
caller.SetConcurrency(100)
for i := 0; i < 50; i++ {
go func() {
for {
dto := UserServer.UserServerDto{}
caller.Call(UserServer.Insert, UserServer.UserServerDto{Id: 1, UserName: "stormi"}, &dto)
fmt.Println(dto)
time.Sleep(1 * time.Second)
}
}()
}
select {}
}
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
t := stormi.Utils.NewTimer()
time.Sleep(100 * time.Millisecond)
fmt.Println(t.Stamp())
time.Sleep(100 * time.Millisecond)
fmt.Println(t.StampAndReset())
time.Sleep(100 * time.Millisecond)
fmt.Println(t.Stamp())
}
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
sc := stormi.NewStormiChat(stormi.NewRedisProxy("127.0.0.1:2131"))
sc.StartSub()
}
-----------------------------------------------------------
package main
import (
"github.com/stormi-li/stormi"
)
func main() {
sc := stormi.NewStormiChat(stormi.NewRedisProxy("127.0.0.1:2131"))
sc.StartPub()
}
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
l := syp.NewRWLock("myrwlock")
l.RLock()
fmt.Println("read")
time.Sleep(10 * time.Second)
l.RUnlock()
}
-----------------------------------------------------------
package main
import (
"fmt"
"time"
"github.com/stormi-li/stormi"
)
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
l := syp.NewRWLock("myrwlock")
l.WLock()
fmt.Println("write")
time.Sleep(5 * time.Second)
l.WUnlock()
}
package main
import (
"fmt"
"github.com/stormi-li/stormi"
)
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
wg := syp.NewWaitGroup("waitgroup")
wg.Add(3)
wg.Wait()
fmt.Println("done")
}
-----------------------------------------------------------
package main
import "github.com/stormi-li/stormi"
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
wg := syp.NewWaitGroup("waitgroup")
wg.Done()
}
package main
import (
"fmt"
"sync"
"github.com/stormi-li/stormi"
)
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
cond := syp.NewCond("cond")
var wg sync.WaitGroup
for {
wg.Add(3)
go func() {
cond.Wait()
fmt.Println("wait1")
wg.Done()
}()
go func() {
cond.Wait()
fmt.Println("wait2")
wg.Done()
}()
go func() {
cond.Wait()
fmt.Println("wait3")
wg.Done()
}()
wg.Wait()
}
}
-----------------------------------------------------------
package main
import (
"time"
"github.com/stormi-li/stormi"
)
func main() {
syp := stormi.NewSyncProxy(stormi.NewRedisProxy("127.0.0.1:213"))
cond := syp.NewCond("cond")
cond.Singal()
time.Sleep(2 * time.Second)
cond.Broadcast()
}