玩转Golang的channel,二百行代码实现PubSub模式
两百行代码实现生产级PubSub工具类
引言
PubSub(Publish/Subscribe)模式,,意为“发布/订阅”模式,是为了解决一对多的依赖关系,使多个消费者同时监听某一个主题,不仅可以让生产者和消费者解耦,同时也让不同的消费者之间相互解耦(注:有些反模式依赖订阅者执行的先后顺序,使用共享数据来传递状态,是需要避免的,因为这样会使消费者耦合在一起,不能独立变化)。这其中的关键就在于需要有中介来维护订阅关系,并负责把生产的消息,传递给订阅方。
在 Golang 这门语言中,channel 天然就适合来当这个中介,下面就让我们一步步根据 PubSub 模式,实现工具类 EventBus.
定义类型
首先,让我们先定义一些基本类型和核心操作。
//EventID是Event的唯一标识
type EventID int64
//Event
type Event interface {
ID() EventID
}
//EventHandler
type EventHandler interface {
OnEvent(ctx context.Context, event Event) error
CanAutoRetry(err error) bool
}
// JobStatus holds information related to a job status
type JobStatus struct {
RunAt time.Time
FinishedAt time.Time
Err error
}
//EventBus ...
type EventBus struct {}
func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) { }
func (eb *EventBus) Unsubscribe(eventID EventID, handlers ...EventHandler) { }
func (eb *EventBus) Publish(evt Event) <-chan JobStatus { }
重点拆解
首先,消费者要通过 Subscribe 来订阅相关的主题,这其中的重点就是需要根据 EventID 维护订阅的消费者,很自然的想到 map,我们选择用handlers map[EventID][]EventHandler
来维护,考虑到并发问题,还需要来加个锁。
//Subscribe ...
func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.handlers[eventID] = append(eb.handlers[eventID], handlers...)
}
这里实现的比较简单,没有考虑一个消费者,重复订阅的问题,留给了使用方自己处理。(但同一个消费者为什么要多次调用 subcribe,订阅同一个主题呢,感觉是在写 bug)
下面就是最核心的 Publish 函数了,一方面一定是需要一个 channel(最好是有 buffer 的)来传递 Event 数据,另一方面,为了保证性能,需要有一些常驻协程,来监听消息,并启动相关的消费者。以下是相关代码(在完整版代码里,添加了日志、错误处理等,这里为了展示重点,暂且隐去)
func (eb *EventBus) Start() {
if eb.started {
return
}
for i := 0; i < eb.eventWorkers; i++ {
eb.wg.Add(1)
go eb.eventWorker(eb.eventJobQueue)
}
eb.started = true
}
func (eb *EventBus) eventWorker(jobQueue <-chan EventJob) {
loop:
for {
select {
case job := <-jobQueue:
jobStatus := JobStatus{
RunAt: time.Now(),
}
ctx, cancel := context.WithTimeout(context.Background(), eb.timeout)
g, _ := errgroup.WithContext(ctx)
for index := range job.handlers {
handler := job.handlers[index]
g.Go(func() error {
return eb.runHandler(ctx, handler, job.event)
})
}
jobStatus.Err = g.Wait()
jobStatus.FinishedAt = time.Now()
select {
case job.resultChan <- jobStatus:
default:
}
cancel()
}
}
}
做好上面的准备工作后,以下就是真正的 Publish 代码了。
// EventJob ...
type EventJob struct {
event Event
handlers []EventHandler
resultChan chan JobStatus
}
//Publish ...
func (eb *EventBus) Publish(evt Event) <-chan JobStatus {
eb.mu.RLock()
defer eb.mu.RUnlock()
if ehs, ok := eb.handlers[evt.ID()]; ok {
handlers := make([]EventHandler, len(ehs))
copy(handlers, ehs) //snapshot一份当时的消费者
job := EventJob{
event: evt,
handlers: handlers,
resultChan: make(chan JobStatus, 1),
}
var jobQueue = eb.eventJobQueue
select {
case jobQueue <- job:
default:
}
return job.resultChan
} else {
err := fmt.Errorf("no handlers for event(%d)", evt.ID())
resultChan := make(chan JobStatus, 1)
resultChan <- JobStatus{
Err: err,
}
return resultChan
}
}
这里没有在 eventWorker 中直接从 handlers 中根据 ID 拿到相关的消费者,一方面是为了让 eventWorker 更加通用,另一方面也是为减少因为锁操作引起的阻塞。
至此,我们已经把最核心的代码一一拆解完成,完整代码,请参见 channelx 项目中的event_bus.go
使用示例
没有例子的工具类是不完整的,下面就提供一个例子。
先定义一个 event,这里把id
定义成私有的,然后在构造函数中,强制指定。
const ExampleEventID channelx.EventID = 1
type ExampleEvent struct {
id channelx.EventID
}
func NewExampleEvent() ExampleEvent {
return ExampleEvent{id:ExampleEventID}
}
func (evt ExampleEvent) ID() channelx.EventID {
return evt.id
}
接下来是 event handler,需要根据实际的需要,在OnEvent
中检查接收到的事件是否是订阅的事件,以及接收到事件结构是否能转换成特定的类型。在防御编程后,就可以处理事件逻辑了。
type ExampleHandler struct {
logger channelx.Logger
}
func NewExampleHandler(logger channelx.Logger) *ExampleHandler {
return &ExampleHandler{
logger: logger,
}
}
func (h ExampleHandler) Logger() channelx.Logger{
return h.logger
}
func (h ExampleHandler) CanAutoRetry(err error) bool {
return false
}
func (h ExampleHandler) OnEvent(ctx context.Context, event channelx.Event) error {
if event.ID() != ExampleEventID {
return fmt.Errorf("subscribe wrong event(%d)", event.ID())
}
_, ok := event.(ExampleEvent)
if !ok {
return fmt.Errorf("failed to convert received event to ExampleEvent")
}
// handle the event here
h.Logger().Infof("event handled")
return nil
}
最后,就是EventBus
的启动,事件的订阅和发布了。
eventBus := channelx.NewEventBus(logger, "test", 4,4,2, time.Second, 5 * time.Second)
eventBus.Start()
handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())