教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 使用kafka在spark 3 0中进行结构化流式传输

使用kafka在spark 3 0中进行结构化流式传输

发布时间:2021-12-19   编辑:jiaochengji.com
教程集为您提供使用kafka在spark 3 0中进行结构化流式传输等资源,欢迎您收藏本站,我们将为您提供最新的使用kafka在spark 3 0中进行结构化流式传输资源
<article style="font-size: 16px;">
<section>
<h2> 在Spark中使用的简介 <span style="font-weight: bold;">(</span>Introduction to working with(in) Spark<span style="font-weight: bold;">)</span></h2>

After the previous post wherein we explored Apache Kafka, let us now take a look at Apache Spark. This blog post covers working within Spark’s interactive shell environment, launching applications (including onto a standalone cluster), streaming data and lastly, structured streaming using Kafka. To get started right away, all of the examples will run inside Docker containers.

在上一篇探讨Apache Kafka的文章之后,现在让我们看一下Apache Spark。 这篇博客文章介绍了在Spark的交互式Shell环境中工作,启动应用程序(包括在独立集群中),流数据以及使用Kafka进行结构化流。 为了立即开始,所有示例都将在Docker容器中运行。

<h1> 火花 <span style="font-weight: bold;">(</span>Spark<span style="font-weight: bold;">)</span></h1> <figure style="display:block;text-align:center;">
<figcaption>Image credit 图片信誉 </figcaption></figure>

Spark was initially developed at UC Berkeley’s AMPLab in 2009 by Matei Zaharia, and open-sourced in 2010. In 2013 its codebase was donated to the Apache Software Foundation which released it as Apache Spark in 2014.

Spark最初由Matei Zaharia于2009年在加州大学伯克利分校的AMPLab开发,并于2010年开源。2013年,其代码库捐赠给了Apache软件基金会,该基金会于2014年以Apache Spark的形式发布。

<blockquote>

“Apache Spark™ is a unified analytics engine for large-scale data processing”

“ Apache Spark™是用于大规模数据处理的统一分析引擎”

</blockquote>

It offers APIs for Java, Scala, Python and R. Furthermore, it provides the following tools:

它提供了Java,Scala,Python和R的API。此外,它还提供了以下工具:

<ul><li>

Spark SQL: used for SQL and structured data processing.

Spark SQL :用于SQL和结构化数据处理。

</li><li>

MLib: used for machine learning.

MLib :用于机器学习。

</li><li>

GraphX: used for graph processing.

GraphX :用于图形处理。

</li><li>

Structured Streaming: used for incremental computation and stream processing.

结构化流 :用于增量计算和流处理。

</li></ul>
</section><section>

Prerequisites:This project uses Docker and docker-compose. View this link to find out how to install them for your OS.

先决条件:该项目使用Docker和docker-compose。 查看此链接以了解如何为您的操作系统安装它们。

Clone my git repo:

克隆我的git repo:

<pre><code class="has">git clone https://github.com/Wesley-Bos/spark3.0-examples.git</code></pre>

Note: <em>depending on your pip and Python version, the commands vary a little:</em>

注意: <em>根据您的pip和Python版本,命令会有所不同:</em>

<ul><li>

<em>pip becomes pip3</em>

<em>点变成pip3</em>

</li><li>

<em>python become python3</em>

<em>python成为python3</em>

</li></ul>

Before we begin, create a new environment. I use Anaconda to do this but feel free to use any tool of your liking. Activate the environment and install the required libraries by executing the following commands:

在开始之前,请创建一个新环境。 我使用Anaconda来执行此操作,但是可以随意使用任何您喜欢的工具。 通过执行以下命令来激活环境并安装所需的库:

<pre><code class="has">pip install -r requirements.txt</code></pre>

<em>Be sure to activate the correct environment in every </em>new<em> terminal you open!</em>

<em>确保在</em> <em>您打开的</em> <em>每个</em> <em>终端中</em> <em>激活正确的环境</em> <em>!</em>

<h1> 1. Spark交互式外壳 <span style="font-weight: bold;">(</span>1. Spark interactive shell<span style="font-weight: bold;">)</span></h1>

Run the following commands to launch Spark:

运行以下命令以启动Spark:

<pre><code class="has">docker build -t pxl_spark -f Dockerfile .docker run --rm --network host -it pxl_spark /bin/bash</code></pre>

Executing code, in Spark, can be performed within the interactive shell or by submitting the programming file directly to Spark, as an application, using the command spark-submit.

在Spark中执行代码可以在交互式外壳中执行,也可以使用命令spark-submit将编程文件作为应用程序直接提交给Spark。

To start up the interactive shell, run the command below:

要启动交互式shell,请运行以下命令:

<pre><code class="has">pyspark</code></pre>

<em>This post centres on working with Python. However, if you desire to work in Scala, use spark-shell instead.</em>

<em>这篇文章的重点是使用Python。</em> <em>但是,如果您希望在Scala中工作,请改用spark-shell。</em>

Try out these two examples to get a feeling with the shell environment.

尝试这两个示例,以了解shell环境。

Read a .csv file:

读取.csv文件:

<pre><code class="has">>>> file = sc.textFile(“supplementary_files/subjects.csv”)
>>> file.collect()
>>> file.take(AMOUNT_OF_SAMPLES)>>> subjects = file.map(lambda row: row.split(“,”)[0])
>>> subjects.collect()</code></pre>

Read a text file:

读取文本文件:

<pre><code class="has">>>> file = sc.textFile(“supplementary_files/text.txt”)
>>> file.collect()
>>> file.take(2)>>> wordCount = file.flatMap(lambda text: text.lower().strip().split(“ “)).map(lambda word: (word, 1)).reduceByKey(lambda sum_occurences, next_occurence: sum_occurences next_occurence)>>> wordCount.collect()</code></pre>

Press Ctrl d to exit the shell.

按Ctrl d退出外壳。

<h1> 2. Spark应用程序—在集群上启动 <span style="font-weight: bold;">(</span>2. Spark application — launching on a cluster<span style="font-weight: bold;">)</span></h1> <figure style="display:block;text-align:center;">
<figcaption> Photo by Mike Bergmann on Unsplash </figcaption><figcaption>Mike Bergmann在 Unsplash上 拍摄的照片 </figcaption></figure>

Spark applications can be performed by itself or on a cluster. The most straightforward approach is deploying Spark on a private cluster.

Spark应用程序可以单独执行,也可以在集群上执行。 最直接的方法是在专用集群上部署Spark。

Follow the instructions below to execute an application on a cluster.

请按照以下说明在集群上执行应用程序。

Initiate the Spark container:

启动Spark容器:

<pre><code class="has">docker run --rm --network host -it pxl_spark /bin/bash</code></pre>

Start a master:

启动大师:

<pre><code class="has">start-master.sh</code></pre>

Go to the web UI and copy the URL of the Spark Master.

转到Web UI,然后复制Spark Master的URL。

Start a worker:

开始工作:

<pre><code class="has">start-slave.sh URL_MASTER</code></pre>

<em>Reload the web UI; a worker should be added.</em>

<em>重新加载Web UI;</em> <em>应该增加一个工人。</em>

Launch an example onto the cluster:

在集群上启动一个示例:

<pre><code class="has">spark-submit --master URL_MASTER examples/src/main/python/pi.py 10</code></pre>

<em>View the web UI; an application has now been completed.</em>

<em>查看网页界面;</em> <em>申请已完成。</em>

Consult the official documentation for more specific information.

有关更多特定信息,请查阅官方文档 。

<h1> 3. Spark应用程序-流数据 <span style="font-weight: bold;">(</span>3. Spark application — streaming data<span style="font-weight: bold;">)</span></h1>

The above examples solely handled stationary code. The subsequent cases entail streaming data along with five DStream transformations to explore.

以上示例仅处理固定代码。 随后的情况需要流数据以及要探索的五个DStream转换。

<em>Note that these transformations are a mere glimpse of the viable options. View the </em><em>official documentation</em><em> for additional information regarding pyspark streaming.</em>

<em>请注意,这些转换只是可行选项的一瞥。</em> <em>查看</em> <em>官方文档</em> <em>以获取有关pyspark流的其他信息。</em>

In a separate terminal, run the netcat command on port 8888:

在另一个终端中,在端口8888上运行netcat命令:

<pre><code class="has">nc -lC 8888</code></pre>

In the Spark container, submit one of the cases for DStreams. Beneath is a summary of what each code sample does.

在Spark容器中,提交DStreams的一种情况。 下面是每个代码示例的摘要。

<ul><li>

reduce_by_key.py: count the occurrence of the word ‘ERROR’, per batch.

reduce_by_key.py每批计算单词“ ERROR”的出现。

</li><li>

update_by_key.py: count the occurrence of all the words throughout a stream of data.

update_by_key.py :统计整个数据流中所有单词的出现。

</li><li>

count_by_window.py: count the number of lines within a window.

count_by_window.py :计算窗口中的行数。

</li><li>

reduce_by_window.py: calculate the sum of the values within a window.

reduce_by_window.py :计算窗口内值的总和。

</li><li>

reduce_by_key_and_window.py: count the occurrence of ERROR-messages within a window.

reduce_by_key_and_window.py :计算窗口中ERROR消息的发生。

</li></ul>

Enter text data inside the netcat terminal. In the Spark terminal, the data is displayed accordingly. An example can be seen in the images below.

netcat终端中输入文本数据。 在Spark终端中,将相应显示数据。 下图显示了一个示例。

<pre><code class="has">spark-submit python_code_samples/update_by_key.py</code></pre>
<figure style="display:block;text-align:center;">
</figure><figure style="display:block;text-align:center;">
<figcaption> Example of update_by_key.py </figcaption><figcaption> update_by_key.py的示例 </figcaption></figure>
<h1> 4. Spark结构化流 <span style="font-weight: bold;">(</span>4. Spark structured streaming<span style="font-weight: bold;">)</span></h1>

Lastly, there is structured streaming. A concise, to the point, description of structured streaming reads: <em>“Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.”</em>

最后,是结构化的流媒体。 简而言之,对结构化流的描述为: <em>“结构化流提供了快速,可伸缩,容错,端到端的一次精确流处理,而用户无需推理流。”</em>

The objective of this last section is to ingest data into Kafka, access it in Spark and finally write it back to Kafka.

最后一部分的目的是将数据吸收到Kafka中,在Spark中访问它,最后将其写回到Kafka。

<figure style="display:block;text-align:center;">
<figcaption>Image credit 图片信誉 </figcaption></figure>

Launch the Kafka environment:

启动Kafka环境:

<pre><code class="has">docker-compose -f ./kafka/docker-compose.yml up -d</code></pre>

Produce and consume data:

产生和使用数据:

For convenience, open two terminal beside each other.

为方便起见,请打开两个彼此相邻的端子。

<pre><code class="has">python kafka/producer.pypython kafka/consumer.py</code></pre>

Submit the application to Spark (inside the Spark container):

将应用程序提交到Spark(在Spark容器内部):

<pre><code class="has">spark-submit --packages org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.0 python_code_samples/kafka_structured_stream.py</code></pre>

Open a new terminal and start the new_consumer:

打开一个新终端并启动new_consumer:

<pre><code class="has">python kafka/new_consumer.py</code></pre>

In the producer terminal, enter data; both consumers will display this data. The messages can be seen in the Confluent Control centre as well.

生产者终端中,输入数据; 两个使用者都将显示此数据。 这些消息也可以在Confluent控制中心中看到。

<h1> 回顾 <span style="font-weight: bold;">(</span>Recap<span style="font-weight: bold;">)</span></h1>

Throughout this article, we explored the following issues:

在本文中,我们探讨了以下问题:

<ul><li>Reading files within the interactive shell.

在交互式外壳中读取文件。 </li><li>Launching an application; by itself and on a cluster.

启动一个应用程序; 本身和群集上。 </li><li>Working with streaming data.

处理流数据。 </li><li>Working with structured streaming.

使用结构化流。 </li></ul>

An interesting blog post from Databricks gives a more extensive view of structure streaming. This particular post explains how to utilise Spark to consume and transform data streams from Apache Kafka.

Databricks的一篇有趣的博客文章提供了结构流的更广泛视图。 这篇特别的文章介绍了如何利用Spark来使用和转换Apache Kafka中的数据流。

Lastly, I want to thank you for reading until the end! Any feedback on where and how I can improve is much appreciated. Feel free to message me.

最后,感谢您的阅读直到最后! 非常感谢您对我在哪里以及如何改进的任何反馈。 随时给我发消息。

</section>
<blockquote>

翻译自: https://medium.com/swlh/structured-streaming-in-spark-3-0-using-kafka-db44cf871d7a

</blockquote> </article> 到此这篇关于“使用kafka在spark 3 0中进行结构化流式传输”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
使用kafka在spark 3 0中进行结构化流式传输
golang 大数据平台_大数据平台是什么?有哪些功能?如何搭建大数据平台?
scala与python区别有哪些
php入门数组的使用到面试题
php shell html 输出_PHP和Go中的闭包变量作用域
NATS—基础介绍
使用Flask实现视频的流媒体传输
go windows 安装zookeeper ,链接kafka
go 操作kafka包 sarama (Producer流程)
PHP网站大流量与高并发的解决方法

[关闭]
~ ~