go windows 安装zookeeper ,链接kafka
1,windows安装 zookeeper
关于zookeeper的详细解释如下
https://www.cnblogs.com/ultranms/p/9585191.html
参考博客 https://blog.csdn.net/ring300/article/details/80446918
下载地址 https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper
下载了两次3.6版本 windows都无法解压 所以我就只能下载3.4版本的,解压之后正常
启动zookpeeker 闪退 按照众多博客解释 是windows 系统找不到系统变量 java_home 也就是在zkServer.cmd找不到设置的变量
需要现在 zkServer.cmd 末尾添加 pause 让执行命令行暂停 不闪退 查看错误代码
以下详细说明设置的解决的博客
https://www.cnblogs.com/china-baizhuangli/p/8920776.html
如果你没有安装java 最好先下载一个java.sdk安装好之后 再进行之后的步骤
https://www.oracle.com/technetwork/java/javase/downloads/index.html
2, windows 安装kafka
关于kafka的解释如下
https://blog.csdn.net/lingbo229/article/details/80761778
下载地址
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.1/kafka_2.12-2.3.1.tgz
安装启动
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的主题
kafka-topics.bat --list --zookeeper 127.0.0.1:2181
增加分区数
kafka-topics.bat --zookeeper 127.0.0.1:2181 --alter --topic test--partitions 4
创建生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test--from-beginning
都创建了之后 在生产者发送的消息 都会发送到消费者
创建的 主题 topic 不管是消费者还是生产者都是链接这个topic
参考以下博客
https://blog.csdn.net/weixin_42544006/article/details/88887145
kafka图形管理界面
http://www.kafkatool.com/download.html
go 链接 kafka
windows 可能需要下载 https://github.com/shopify/sarama
copy以下博客的代码用了 还是能联通
https://www.jianshu.com/p/3102418e5a7d
<pre class="has"><code>package main import ( "bufio" "flag" "fmt" "io/ioutil" "log" "os" "strings" "crypto/tls" "crypto/x509" "github.com/Shopify/sarama" ) var ( command string hosts string topic string partition int saslEnable bool username string password string tlsEnable bool clientcert string clientkey string cacert string ) func main() { flag.StringVar(&command, "command", "consumer", "consumer|producer") //修改此处 可以切换为消费者还是生产者 flag.StringVar(&hosts, "host", "localhost:9092", "Common separated kafka hosts") flag.StringVar(&topic, "topic", "test", "Kafka topic") flag.IntVar(&partition, "partition", 0, "Kafka topic partition") flag.BoolVar(&saslEnable, "sasl", false, "SASL enable") flag.StringVar(&username, "username", "", "SASL Username") flag.StringVar(&password, "password", "", "SASL Password") flag.BoolVar(&tlsEnable, "tls", false, "TLS enable") flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate") flag.StringVar(&clientkey, "key", "key.pem", "Client Key") flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate") flag.Parse() config := sarama.NewConfig() if saslEnable { config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = password } if tlsEnable { //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { log.Fatal(err) } config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } client, err := sarama.NewClient(strings.Split(hosts, ","), config) if err != nil { log.Fatalf("unable to create kafka client: %q", err) } if command == "consumer" { consumer, err := sarama.NewConsumerFromClient(client) if err != nil { log.Fatal(err) } defer consumer.Close() loopConsumer(consumer, topic, partition) } else { producer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { log.Fatal(err) } defer producer.Close() loopProducer(producer, topic, partition) } } func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil { return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err } func loopProducer(producer sarama.AsyncProducer, topic string, partition int) { scanner := bufio.NewScanner(os.Stdin) fmt.Print("> ") for scanner.Scan() { text := scanner.Text() if text == "" { } else if text == "exit" || text == "quit" { break } else { producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)} log.Printf("Produced message: [%s]\n", text) } fmt.Print("> ") } } func loopConsumer(consumer sarama.Consumer, topic string, partition int) { partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { log.Println(err) return } defer partitionConsumer.Close() for { msg := <-partitionConsumer.Messages() log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset) } } </code></pre> <pre class="has"><code>再次启动kafka报错 Error while executing topic command : Replication factor: 1 larger than available brokers: 0. [2019-11-11 09:14:37,012] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0. (kafka.admin.TopicCommand$)</code></pre>1, 有几个版本
参考博客 https://www.cnblogs.com/QuestionsZhang/p/10288880.html
创建kafka的topic显示,存活的brokers为0
原因:
没有在kafka目录下创建zookeeper ,指定myid
<em id="__mceDel">解决:</em>
<em><em> cd kafka_2.11-1.1.0</em></em>
<em> mkdir zookeeper
cd zookeeper
touch myid
echo 0 > myid
重新启动kafka就ok</em>
<em>kafka-server-start.bat ../../config.server.properties</em>
<em>2,linux的报错问题</em>
https://www.iteye.com/blog/rayfuxk-2279596
3,方案三 参考博客
https://www.cnblogs.com/thinkingandworkinghard/p/6113069.html?utm_source=itdadao&utm_medium=referral
需要启动broker:
<pre><code>bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties 4)FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.net.UnknownHostException: bogon: bogon: 域名解析暂时失败 2.11的版本用这个版本解决不行,2.8的可以。到centor 目录 /etc/hosts文件,添加 你的ip hostname localhost ,自己的为例: 之后再执行</code>终于启动成功 有毒 </pre>kafka-topics.bat --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test4
到此这篇关于“go windows 安装zookeeper ,链接kafka”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!您可能感兴趣的文章:
go windows 安装zookeeper ,链接kafka
LogAgent的工作流程
go-zookeeper递归获取接口
零基础学习Go语言
使用kafka在spark 3 0中进行结构化流式传输
系统协调 zookeeper golang入门
php nodeJs thrift协议,实现zookeeper节点数据自动发现
linux下php安装php-kafka和php-rdkafka扩展的方法详解
【跟着我们学Golang】Go语言全平台安装
windows安装golang