回顾
上一讲中,我们知道了broker是做什么的,以及go-micro默认的实现方案,这个http broker作为默认的实现方案,供我们学习使用,一般情况下在生产环境中,大家会根据业务诉求,以及消息是否允许丢失,都会自定义一个实现方案,根据不同的业务需求,选择比较稳定的开源方案,比如Kafka,Nsq,RabbitMq等,甚至Redis中的发布订阅管道也可以作为一个实现方案来选择,就看大家的需要了。
上一讲的最后,我们也提到了,go-micro可插拔的实现一个broker,原理就是定义了一组接口方法,只要你的实现方案中,实现了这组方法,就可以在go-micro中去使用你自定义的broker。
原文参考:从go-micro的broker开始学习go-micro
Nsq
我再另外一篇文章中讲解了Nsq,想要了解的可以去翻翻看。
我们简单回顾一下,这个工具是一个分布式的实时消息平台。具有默认情况下,消息不持久,至少被传递一次,消息是无序的等特点。
如何部署一个分布式的集群也很简单,大家可以使用docker来部署一个Nsq集群,然后我们就可以使用Nsq在go-micro的架构下实现我们的异步消息系统了
Broker接口
我们再来回顾一下,Broker接口的组成
//Brokerisaninterfaceusedforasynchronousmessaging.typeBrokerinterface{Init(...Option)errorOptions()OptionsAddress()stringConnect()errorDisconnect()errorPublish(topicstring,m*Message,opts...PublishOption)errorSubscribe(topicstring,hHandler,opts...SubscribeOption)(Subscriber,error)String()string}
主要方法就是初始化Init,连接Connect以及Disconnect方法,和发布Publish以及订阅Subscribe方法。
实现方案
定义结构体
typensqBrokerstruct{lookupdAddrs[]stringaddrs[]stringoptsbroker.Optionsconfig*nsq.Configsync.Mutexrunningboolp[]*nsq.Producerc[]*subscriber}typepublicationstruct{topicstringm*broker.Messagenm*nsq.Messageoptsbroker.PublishOptionserrerror}typesubscriberstruct{topicstringoptsbroker.SubscribeOptionsc*nsq.Consumer//handlersowecanresubcribehnsq.HandlerFunc//concurrencynint}
如何实例化我们的NsqBroker实例的呢
funcNewBroker(opts...broker.Option)broker.Broker{options:=broker.Options{//DefaultcodecCodec:json.Marshaler{},//DefaultcontextContext:context.Background(),}for_,o:=rangeopts{o(&options)}varaddrs[]stringfor_,addr:=rangeoptions.Addrs{iflen(addr)>0{addrs=append(addrs,addr)}}iflen(addrs)==0{addrs=[]string{"127.0.0.1:4150"}}n:=&nsqBroker{addrs:addrs,opts:options,config:nsq.NewConfig(),}n.configure(n.opts.Context)returnn}
可以看到在nsqBroker实例中的config是我们Nsq的连接配置
然后就是我们Broker接口中的Init方法,做初始化处理
func(n*nsqBroker)Init(opts...broker.Option)error{for_,o:=rangeopts{o(&n.opts)}varaddrs[]stringfor_,addr:=rangen.opts.Addrs{iflen(addr)>0{addrs=append(addrs,addr)}}iflen(addrs)==0{addrs=[]string{"127.0.0.1:4150"}}n.addrs=addrsn.configure(n.opts.Context)returnnil}
初始化结束后,再来看看Connect方法
func(n*nsqBroker)Connect()error{n.Lock()defern.Unlock()ifn.running{returnnil}producers:=make([]*nsq.Producer,0,len(n.addrs))//createproducersfor_,addr:=rangen.addrs{p,err:=nsq.NewProducer(addr,n.config)iferr!=nil{returnerr}iferr=p.Ping();err!=nil{returnerr}producers=append(producers,p)}//createconsumersfor_,c:=rangen.c{channel:=c.opts.Queueiflen(channel)==0{channel=uuid.New().String()+"#ephemeral"}cm,err:=nsq.NewConsumer(c.topic,channel,n.config)iferr!=nil{returnerr}cm.AddConcurrentHandlers(c.h,c.n)c.c=cmiflen(n.lookupdAddrs)>0{c.c.ConnectToNSQLookupds(n.lookupdAddrs)}else{err=c.c.ConnectToNSQDs(n.addrs)iferr!=nil{returnerr}}}n.p=producersn.running=truereturnnil}
根据我们nsqBroker结构体中的addrs,来创建发布者列表,根据nsqBroker中的消费者列表,创建nsq的Consumer.然后根据是否配置了nsq lookupaddr来连接到相关组件,如果没配就直接连到指定地址的nsqd.
重点来了,看看如何实现消息的发布和订阅
func(n*nsqBroker)Publish(topicstring,message*broker.Message,opts...broker.PublishOption)error{p:=n.p[rand.Intn(len(n.p))]options:=broker.PublishOptions{}for_,o:=rangeopts{o(&options)}var(doneChanchan*nsq.ProducerTransactiondelaytime.Duration)ifoptions.Context!=nil{ifv,ok:=options.Context.Value(asyncPublishKey{}).(chan*nsq.ProducerTransaction);ok{doneChan=v}ifv,ok:=options.Context.Value(deferredPublishKey{}).(time.Duration);ok{delay=v}}b,err:=n.opts.Codec.Marshal(message)iferr!=nil{returnerr}ifdoneChan!=nil{ifdelay>0{returnp.DeferredPublishAsync(topic,delay,b,doneChan)}returnp.PublishAsync(topic,b,doneChan)}else{ifdelay>0{returnp.DeferredPublish(topic,delay,b)}returnp.Publish(topic,b)}}
从发布在列表中取出一个发布者,然后将编码后的消息发布到指定topic中。
最后就是订阅者方法
func(n*nsqBroker)Subscribe(topicstring,handlerbroker.Handler,opts...broker.SubscribeOption)(broker.Subscriber,error){options:=broker.SubscribeOptions{AutoAck:true,}for_,o:=rangeopts{o(&options)}concurrency,maxInFlight:=DefaultConcurrentHandlers,DefaultConcurrentHandlersifoptions.Context!=nil{ifv,ok:=options.Context.Value(concurrentHandlerKey{}).(int);ok{maxInFlight,concurrency=v,v}ifv,ok:=options.Context.Value(maxInFlightKey{}).(int);ok{maxInFlight=v}}channel:=options.Queueiflen(channel)==0{channel=uuid.New().String()+"#ephemeral"}config:=*n.configconfig.MaxInFlight=maxInFlightc,err:=nsq.NewConsumer(topic,channel,&config)iferr!=nil{returnnil,err}h:=nsq.HandlerFunc(func(nm*nsq.Message)error{if!options.AutoAck{nm.DisableAutoResponse()}varmbroker.Messageiferr:=n.opts.Codec.Unmarshal(nm.Body,&m);err!=nil{returnerr}p:=&publication{topic:topic,m:&m}p.err=handler(p)returnp.err})c.AddConcurrentHandlers(h,concurrency)iflen(n.lookupdAddrs)>0{err=c.ConnectToNSQLookupds(n.lookupdAddrs)}else{err=c.ConnectToNSQDs(n.addrs)}iferr!=nil{returnnil,err}sub:=&subscriber{c:c,opts:options,topic:topic,h:h,n:concurrency,}n.c=append(n.c,sub)returnsub,nil}
在这个方法中,我们通过Nsq config来创建nsq消费者。然后定义一个nsq HandlerFunc来处理消费到的消息。然后就可以连接到nsq服务上进行监听消息了。最后创建的订阅者实例放到nsqBroker实例中的订阅者列表中。想要了解nsq Consumer 是如何连接Nsqd服务进程的,可以去go-nsq代码看具体的流程这个已经不属于我们今天的主题了,大家只要知道,Nsq建议我们使用lookupd来进行topic发布者的服务发现。
//ConnectToNSQDstakesmultiplensqdaddressestoconnectdirectlyto.////ItisrecommendedtouseConnectToNSQLookupdsothattopicsarediscovered//automatically.Thismethodisusefulwhenyouwanttoconnecttolocalinstance.func(r*Consumer)ConnectToNSQDs(addresses[]string)error{for_,addr:=rangeaddresses{err:=r.ConnectToNSQD(addr)iferr!=nil{returnerr}}returnnil}
至此一个自定义的broker实现方案就出来了,你可以通过kafka来实现自己的broker。