教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 go windows 安装zookeeper ,链接kafka

go windows 安装zookeeper ,链接kafka

发布时间:2022-03-26   编辑:jiaochengji.com
教程集为您提供go windows 安装zookeeper ,链接kafka等资源,欢迎您收藏本站,我们将为您提供最新的go windows 安装zookeeper ,链接kafka资源

1,windows安装 zookeeper

关于zookeeper的详细解释如下 

https://www.cnblogs.com/ultranms/p/9585191.html

参考博客  https://blog.csdn.net/ring300/article/details/80446918

下载地址  https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper

下载了两次3.6版本 windows都无法解压  所以我就只能下载3.4版本的,解压之后正常

启动zookpeeker 闪退  按照众多博客解释 是windows 系统找不到系统变量 java_home 也就是在zkServer.cmd找不到设置的变量

需要现在 zkServer.cmd 末尾添加  pause 让执行命令行暂停 不闪退 查看错误代码  

以下详细说明设置的解决的博客 

 https://www.cnblogs.com/china-baizhuangli/p/8920776.html

如果你没有安装java 最好先下载一个java.sdk安装好之后 再进行之后的步骤

https://www.oracle.com/technetwork/java/javase/downloads/index.html

 

2, windows 安装kafka

关于kafka的解释如下

https://blog.csdn.net/lingbo229/article/details/80761778

下载地址

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.1/kafka_2.12-2.3.1.tgz

安装启动

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的主题

kafka-topics.bat --list --zookeeper 127.0.0.1:2181

增加分区数

kafka-topics.bat --zookeeper 127.0.0.1:2181  --alter --topic test--partitions 4

创建生产者

kafka-console-producer.bat --broker-list localhost:9092 --topic test

消费者

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test--from-beginning

都创建了之后 在生产者发送的消息 都会发送到消费者   

创建的 主题  topic  不管是消费者还是生产者都是链接这个topic  

 

参考以下博客

https://blog.csdn.net/weixin_42544006/article/details/88887145  

kafka图形管理界面

http://www.kafkatool.com/download.html

 

 go   链接 kafka 

windows 可能需要下载   https://github.com/shopify/sarama

copy以下博客的代码用了  还是能联通

https://www.jianshu.com/p/3102418e5a7d

<pre class="has"><code>package main import ( "bufio" "flag" "fmt" "io/ioutil" "log" "os" "strings" "crypto/tls" "crypto/x509" "github.com/Shopify/sarama" ) var ( command string hosts string topic string partition int saslEnable bool username string password string tlsEnable bool clientcert string clientkey string cacert string ) func main() { flag.StringVar(&command, "command", "consumer", "consumer|producer") //修改此处 可以切换为消费者还是生产者 flag.StringVar(&hosts, "host", "localhost:9092", "Common separated kafka hosts") flag.StringVar(&topic, "topic", "test", "Kafka topic") flag.IntVar(&partition, "partition", 0, "Kafka topic partition") flag.BoolVar(&saslEnable, "sasl", false, "SASL enable") flag.StringVar(&username, "username", "", "SASL Username") flag.StringVar(&password, "password", "", "SASL Password") flag.BoolVar(&tlsEnable, "tls", false, "TLS enable") flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate") flag.StringVar(&clientkey, "key", "key.pem", "Client Key") flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate") flag.Parse() config := sarama.NewConfig() if saslEnable { config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = password } if tlsEnable { //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { log.Fatal(err) } config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } client, err := sarama.NewClient(strings.Split(hosts, ","), config) if err != nil { log.Fatalf("unable to create kafka client: %q", err) } if command == "consumer" { consumer, err := sarama.NewConsumerFromClient(client) if err != nil { log.Fatal(err) } defer consumer.Close() loopConsumer(consumer, topic, partition) } else { producer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { log.Fatal(err) } defer producer.Close() loopProducer(producer, topic, partition) } } func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil { return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err } func loopProducer(producer sarama.AsyncProducer, topic string, partition int) { scanner := bufio.NewScanner(os.Stdin) fmt.Print("> ") for scanner.Scan() { text := scanner.Text() if text == "" { } else if text == "exit" || text == "quit" { break } else { producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)} log.Printf("Produced message: [%s]\n", text) } fmt.Print("> ") } } func loopConsumer(consumer sarama.Consumer, topic string, partition int) { partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { log.Println(err) return } defer partitionConsumer.Close() for { msg := <-partitionConsumer.Messages() log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset) } } </code></pre> <pre class="has"><code>再次启动kafka报错 Error while executing topic command : Replication factor: 1 larger than available brokers: 0. [2019-11-11 09:14:37,012] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.  (kafka.admin.TopicCommand$)</code></pre>

1, 有几个版本

参考博客 https://www.cnblogs.com/QuestionsZhang/p/10288880.html

创建kafka的topic显示,存活的brokers为0

 

原因:

     没有在kafka目录下创建zookeeper ,指定myid

<em id="__mceDel">解决:</em>

<em><em>   cd kafka_2.11-1.1.0</em></em>

<em>  mkdir zookeeper
  cd zookeeper
  touch myid
  echo 0 > myid
重新启动kafka就ok</em>

<em>kafka-server-start.bat ../../config.server.properties</em>

 

<em>2,linux的报错问题</em>

https://www.iteye.com/blog/rayfuxk-2279596

3,方案三 参考博客

https://www.cnblogs.com/thinkingandworkinghard/p/6113069.html?utm_source=itdadao&utm_medium=referral

 

 需要启动broker:

<pre><code>bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties 4)FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.net.UnknownHostException: bogon: bogon: 域名解析暂时失败 2.11的版本用这个版本解决不行,2.8的可以。到centor 目录 /etc/hosts文件,添加 你的ip hostname localhost ,自己的为例: 之后再执行</code>终于启动成功 有毒  </pre>

kafka-topics.bat --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test4

到此这篇关于“go windows 安装zookeeper ,链接kafka”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
go windows 安装zookeeper ,链接kafka
LogAgent的工作流程
go-zookeeper递归获取接口
零基础学习Go语言
使用kafka在spark 3 0中进行结构化流式传输
系统协调 zookeeper golang入门
php nodeJs thrift协议,实现zookeeper节点数据自动发现
linux下php安装php-kafka和php-rdkafka扩展的方法详解
【跟着我们学Golang】Go语言全平台安装
windows安装golang

[关闭]
~ ~