Redis、Kafka或RabbitMQ:哪个作为微服务消息代理最合适?
将异步通信用于微服务的场合,通常使用消息代理(Message Broker)。消息代理确保不同微服务之间的通信可靠稳定,保证消息在系统内得到管理和监视,并且消息不会被丢失。
开发者可以选择的一些消息代理有很多,它们的规模和数据功能各不相同。本篇文章将比较三种最受欢迎的消息代理:RabbitMQ,Kafka与Redis。
首先让我们了解微服务通信。
在微服务之间有常见的两种通信方式:同步与异步。
在同步通信中,调用方在发送下一条消息之前等待响应,并且它作为HTTP之上的REST协议运行。相反,在异步通信中,无需等待响应即可发送消息。这适用于分布式系统,通常需要消息代理来管理消息。
你选择的通信类型应考虑不同的参数,例如微服务的结构方式,适当的基础架构,延迟,规模,依赖关系以及通信目的。异步通信的建立可能会更加复杂,并且需要添加更多组件才能堆叠,但是将异步通信用于微服务的好处远大于缺点。
首先根据定义,异步通信是非阻塞的;第二,它也比同步操作支持更好的缩放;第三,在微服务崩溃的情况下,异步通信机制提供了各种恢复技术,通常更擅长处理与崩溃有关的错误。
另外,当使用代理而不是REST协议时,接收通信的服务实际上并不需要彼此了解。在旧的服务运行了很长时间之后,甚至可以引入新的服尺誉务,即能做到更好的解耦服务。
最后,在选择异步操作时,您将增强将来创建集中发现,监视,负载平衡甚至策略执行器的能力。这将为您提供在代码和系统构建中具有灵活性,可伸缩性和更多功能的功能。
异步通信通常通过消息代理进行管理。改掘也有其他方法,例如aysncio,但它们更加稀少和有限。
在选择代理执行异步操作时,应考虑以下几点:
一对一
一对多
我们检查了那里最新和最出色的服务,以找出这三个类别中最强的提供商。
RabbitMQ(AMQP)
规模:根据配置和资源,这里的运行速度约为每秒50K msg。
持久性:支持持久性消息和瞬时消息。
一对一与一对多的消费者:两者都有。
RabbitMQ于2007年发布,是最早创建的常见消息代理之一。它是一个开放源代码,通过实现高级消息队列协议(AMQP)通过点对点和pub-sub方法传递消息。它旨在支持复杂的路由逻辑。
有一些托管服务可让您将其用作SaaS,但它不是本机主要云提供商堆栈的一部分。RabbitMQ支持所有主要语言,包括Python,Java,.NET,PHP,Ruby,JavaScript,Go,Swift等。
在持久模式下,可能会遇到一些性能问题。
kafka
规模:每秒最多可以发送一百万条消息。
持久性:是的。
一对一vs一对多的消费者:只有一对多陵歼段(乍一看似乎很奇怪,对吧?!)。
Kafka曾在Azure,AWS和Confluent上管理SaaS。他们都是Kafka项目的创建者和主要贡献者。Kafka支持所有主要语言,包括Python,Java,C C ++,Clojure,.NET,PHP,Ruby,JavaScript,Go,Swift等。
Redis
规模:每秒最多可以发送一百万条消息。
持久性:基本上不是,它是内存中的数据存储。
一对一与一对多的消费者:两者都有。
Redis与其他消息代理有点不同。Redis的核心是一个内存中的数据存储,可以用作高性能键值存储或消息代理。另一个区别是Redis没有持久性,而是将其内存转储到Disk DB中。它还非常适合实时数据处理。
最初,Redis不是一对一和一对多的。但是,由于Redis 5.0引入了pub-sub,因此功能得到了增强,一对多成为真正的选择。
我们介绍了RabbitMQ,Kafka和Redis的一些特征。这三种动物都是它们的类别,但是如上所述,它们的运行方式大不相同。这是我们建议正确的消息代理根据不同用例使用的建议。
短命消息:Redis
Redis的内存数据库几乎适用于不需要持久性的消息短暂的用例。因为Redis提供了非常快速的服务和内存功能,所以它是短保留消息的理想选择,在这些消息中持久性不是很重要,您可以容忍一些丢失。随着5.0中Redis流的发布,它也成为了一对多用例的候选者,由于局限性和旧的pub-sub功能,绝对需要使用它。
大量数据:Kafka
Kafka是一个高吞吐量的分布式队列,用于长时间存储大量数据。对于需要持久性的一对多用例,Kafka是理想的选择。
复杂路由:RabbitMQ
RabbitMQ是一个较老但很成熟的代理,具有许多支持复杂路由的功能。当所需速率不高(超过数万msg sec)时,它甚至将支持复杂的路由通信。
考虑您的软件堆栈
当然,最后要考虑的是你当前的软件堆栈。如果你正在寻找一个相对简单的集成过程,并且不想在堆栈中维护其他代理,那么你可能更倾向于使用已由堆栈支持的代理。
例如,如果你在RabbitMQ之上的系统中使用Celery for Task Queue,那么您会获得与RabbitMQ或Redis一起使用的动力,而不是不支持Kafka且需要进行一些重写的Kafka。
我们通过平台的发展和壮大使用了以上所有内容,然后再进行一些使用!重要的是要记住,每种工具都有自己的优点和缺点,这与了解它们并为工作以及特定的时机,情况和要求选择合适的工具有关。
Golang kafka简述和操作(sarama同步异步和消费组)
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达简并不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三尘带方库拦兄迹
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
docker 配置 kafka+zookeeper,golang接入示例
配置zookeeper 使用kafka/bin/下自带的zk
运行 报错 卒。配置低了
docker-compose.yml
报错
换云搬瓦工的机器试一下滑颂粗
但是docker ps -a 发现只有zookeeper启动了,kafka失败, 检查日志 发现kafka运行需要java环境,而且对内存有要求,搬瓦工的vps不足够
因此修改docker-compose.yml 加入以下
stop 再启动
完美
测试
进入容器
查看已信镇经建好的topic (docker-compose.yml)
发送樱碧消息
接收消息
接下来是golang接入kafka了
运行
聊聊golang的zap的ZapKafkaWriter
本文主要研羡芹究一下golang的zap的ZapKafkaWriter
WriteSyncer内嵌了io.Writer接兄李毕口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;扰搏ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。
一次golang sarama kafka内存占用大的排查经历
环境:
现象:golang微服务内存占用超过镇信物1G,查看日坦明志发现大量kafka相关错误日志,继而查看kafka集群,其中一个kafka节点容器挂掉了。
疑问 为什么kafka集群只有一个broker挂了,客户端就大量报错呢
通过beego admin页面获取 mem-1.memprof
可以看到调用栈为 withRecover backgroundMetadataUpdataer refreshMeaatdata RefreshMetada tryRefreshMetadata ...
sarama-cluster: NewClient
为什么kafka集群只有一个broker,但是NewClient确失败了?
在kafka容器里查看topic, 发现Replicas和Isr只有一个,找御液到kafka官方配置说明,自动生成的topic需要配置default.replication.factor这个参数,才会生成3副本。