使用kafka在spark 3 0中进行结构化流式传输
After the previous post wherein we explored Apache Kafka, let us now take a look at Apache Spark. This blog post covers working within Spark’s interactive shell environment, launching applications (including onto a standalone cluster), streaming data and lastly, structured streaming using Kafka. To get started right away, all of the examples will run inside Docker containers.
在上一篇探讨Apache Kafka的文章之后,现在让我们看一下Apache Spark。 这篇博客文章介绍了在Spark的交互式Shell环境中工作,启动应用程序(包括在独立集群中),流数据以及使用Kafka进行结构化流。 为了立即开始,所有示例都将在Docker容器中运行。
<h1> 火花 <span style="font-weight: bold;">(</span>Spark<span style="font-weight: bold;">)</span></h1> <figure style="display:block;text-align:center;">Spark was initially developed at UC Berkeley’s AMPLab in 2009 by Matei Zaharia, and open-sourced in 2010. In 2013 its codebase was donated to the Apache Software Foundation which released it as Apache Spark in 2014.
Spark最初由Matei Zaharia于2009年在加州大学伯克利分校的AMPLab开发,并于2010年开源。2013年,其代码库捐赠给了Apache软件基金会,该基金会于2014年以Apache Spark的形式发布。
<blockquote>“Apache Spark™ is a unified analytics engine for large-scale data processing”
“ Apache Spark™是用于大规模数据处理的统一分析引擎”
</blockquote>It offers APIs for Java, Scala, Python and R. Furthermore, it provides the following tools:
它提供了Java,Scala,Python和R的API。此外,它还提供了以下工具:
<ul><li>Spark SQL: used for SQL and structured data processing.
Spark SQL :用于SQL和结构化数据处理。
</li><li>MLib: used for machine learning.
MLib :用于机器学习。
</li><li>GraphX: used for graph processing.
GraphX :用于图形处理。
</li><li>Structured Streaming: used for incremental computation and stream processing.
结构化流 :用于增量计算和流处理。
</li></ul>Prerequisites:This project uses Docker and docker-compose. View this link to find out how to install them for your OS.
先决条件:该项目使用Docker和docker-compose。 查看此链接以了解如何为您的操作系统安装它们。
Clone my git repo:
克隆我的git repo:
<pre><code class="has">git clone https://github.com/Wesley-Bos/spark3.0-examples.git</code></pre>Note: <em>depending on your pip and Python version, the commands vary a little:</em>
注意: <em>根据您的pip和Python版本,命令会有所不同:</em>
<ul><li><em>pip becomes pip3</em>
<em>点变成pip3</em>
</li><li><em>python become python3</em>
<em>python成为python3</em>
</li></ul>Before we begin, create a new environment. I use Anaconda to do this but feel free to use any tool of your liking. Activate the environment and install the required libraries by executing the following commands:
在开始之前,请创建一个新环境。 我使用Anaconda来执行此操作,但是可以随意使用任何您喜欢的工具。 通过执行以下命令来激活环境并安装所需的库:
<pre><code class="has">pip install -r requirements.txt</code></pre><em>Be sure to activate the correct environment in every </em>new<em> terminal you open!</em>
<em>确保在</em> <em>您打开的</em> <em>每个</em> 新 <em>终端中</em> <em>激活正确的环境</em> <em>!</em>
<h1> 1. Spark交互式外壳 <span style="font-weight: bold;">(</span>1. Spark interactive shell<span style="font-weight: bold;">)</span></h1>Run the following commands to launch Spark:
运行以下命令以启动Spark:
<pre><code class="has">docker build -t pxl_spark -f Dockerfile .docker run --rm --network host -it pxl_spark /bin/bash</code></pre>Executing code, in Spark, can be performed within the interactive shell or by submitting the programming file directly to Spark, as an application, using the command spark-submit.
在Spark中执行代码可以在交互式外壳中执行,也可以使用命令spark-submit将编程文件作为应用程序直接提交给Spark。
To start up the interactive shell, run the command below:
要启动交互式shell,请运行以下命令:
<pre><code class="has">pyspark</code></pre><em>This post centres on working with Python. However, if you desire to work in Scala, use spark-shell instead.</em>
<em>这篇文章的重点是使用Python。</em> <em>但是,如果您希望在Scala中工作,请改用spark-shell。</em>
Try out these two examples to get a feeling with the shell environment.
尝试这两个示例,以了解shell环境。
Read a .csv file:
读取.csv文件:
<pre><code class="has">>>> file = sc.textFile(“supplementary_files/subjects.csv”)>>> file.collect()
>>> file.take(AMOUNT_OF_SAMPLES)>>> subjects = file.map(lambda row: row.split(“,”)[0])
>>> subjects.collect()</code></pre>
Read a text file:
读取文本文件:
<pre><code class="has">>>> file = sc.textFile(“supplementary_files/text.txt”)>>> file.collect()
>>> file.take(2)>>> wordCount = file.flatMap(lambda text: text.lower().strip().split(“ “)).map(lambda word: (word, 1)).reduceByKey(lambda sum_occurences, next_occurence: sum_occurences next_occurence)>>> wordCount.collect()</code></pre>
Press Ctrl d to exit the shell.
按Ctrl d退出外壳。
<h1> 2. Spark应用程序—在集群上启动 <span style="font-weight: bold;">(</span>2. Spark application — launching on a cluster<span style="font-weight: bold;">)</span></h1> <figure style="display:block;text-align:center;">Spark applications can be performed by itself or on a cluster. The most straightforward approach is deploying Spark on a private cluster.
Spark应用程序可以单独执行,也可以在集群上执行。 最直接的方法是在专用集群上部署Spark。
Follow the instructions below to execute an application on a cluster.
请按照以下说明在集群上执行应用程序。
Initiate the Spark container:
启动Spark容器:
<pre><code class="has">docker run --rm --network host -it pxl_spark /bin/bash</code></pre>Start a master:
启动大师:
<pre><code class="has">start-master.sh</code></pre>Go to the web UI and copy the URL of the Spark Master.
转到Web UI,然后复制Spark Master的URL。
Start a worker:
开始工作:
<pre><code class="has">start-slave.sh URL_MASTER</code></pre><em>Reload the web UI; a worker should be added.</em>
<em>重新加载Web UI;</em> <em>应该增加一个工人。</em>
Launch an example onto the cluster:
在集群上启动一个示例:
<pre><code class="has">spark-submit --master URL_MASTER examples/src/main/python/pi.py 10</code></pre><em>View the web UI; an application has now been completed.</em>
<em>查看网页界面;</em> <em>申请已完成。</em>
Consult the official documentation for more specific information.
有关更多特定信息,请查阅官方文档 。
<h1> 3. Spark应用程序-流数据 <span style="font-weight: bold;">(</span>3. Spark application — streaming data<span style="font-weight: bold;">)</span></h1>The above examples solely handled stationary code. The subsequent cases entail streaming data along with five DStream transformations to explore.
以上示例仅处理固定代码。 随后的情况需要流数据以及要探索的五个DStream转换。
<em>Note that these transformations are a mere glimpse of the viable options. View the </em><em>official documentation</em><em> for additional information regarding pyspark streaming.</em>
<em>请注意,这些转换只是可行选项的一瞥。</em> <em>查看</em> <em>官方文档</em> <em>以获取有关pyspark流的其他信息。</em>
In a separate terminal, run the netcat command on port 8888:
在另一个终端中,在端口8888上运行netcat命令:
<pre><code class="has">nc -lC 8888</code></pre>In the Spark container, submit one of the cases for DStreams. Beneath is a summary of what each code sample does.
在Spark容器中,提交DStreams的一种情况。 下面是每个代码示例的摘要。
<ul><li>reduce_by_key.py: count the occurrence of the word ‘ERROR’, per batch.
reduce_by_key.py : 每批计算单词“ ERROR”的出现。
</li><li>update_by_key.py: count the occurrence of all the words throughout a stream of data.
update_by_key.py :统计整个数据流中所有单词的出现。
</li><li>count_by_window.py: count the number of lines within a window.
count_by_window.py :计算窗口中的行数。
</li><li>reduce_by_window.py: calculate the sum of the values within a window.
reduce_by_window.py :计算窗口内值的总和。
</li><li>reduce_by_key_and_window.py: count the occurrence of ERROR-messages within a window.
reduce_by_key_and_window.py :计算窗口中ERROR消息的发生。
</li></ul>Enter text data inside the netcat terminal. In the Spark terminal, the data is displayed accordingly. An example can be seen in the images below.
在netcat终端中输入文本数据。 在Spark终端中,将相应显示数据。 下图显示了一个示例。
<pre><code class="has">spark-submit python_code_samples/update_by_key.py</code></pre>Lastly, there is structured streaming. A concise, to the point, description of structured streaming reads: <em>“Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.”</em>
最后,是结构化的流媒体。 简而言之,对结构化流的描述为: <em>“结构化流提供了快速,可伸缩,容错,端到端的一次精确流处理,而用户无需推理流。”</em>
The objective of this last section is to ingest data into Kafka, access it in Spark and finally write it back to Kafka.
最后一部分的目的是将数据吸收到Kafka中,在Spark中访问它,最后将其写回到Kafka。
<figure style="display:block;text-align:center;">Launch the Kafka environment:
启动Kafka环境:
<pre><code class="has">docker-compose -f ./kafka/docker-compose.yml up -d</code></pre>Produce and consume data:
产生和使用数据:
For convenience, open two terminal beside each other.
为方便起见,请打开两个彼此相邻的端子。
<pre><code class="has">python kafka/producer.pypython kafka/consumer.py</code></pre>Submit the application to Spark (inside the Spark container):
将应用程序提交到Spark(在Spark容器内部):
<pre><code class="has">spark-submit --packages org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.0 python_code_samples/kafka_structured_stream.py</code></pre>Open a new terminal and start the new_consumer:
打开一个新终端并启动new_consumer:
<pre><code class="has">python kafka/new_consumer.py</code></pre>In the producer terminal, enter data; both consumers will display this data. The messages can be seen in the Confluent Control centre as well.
在生产者终端中,输入数据; 两个使用者都将显示此数据。 这些消息也可以在Confluent控制中心中看到。
<h1> 回顾 <span style="font-weight: bold;">(</span>Recap<span style="font-weight: bold;">)</span></h1>Throughout this article, we explored the following issues:
在本文中,我们探讨了以下问题:
<ul><li>Reading files within the interactive shell.在交互式外壳中读取文件。 </li><li>Launching an application; by itself and on a cluster.
启动一个应用程序; 本身和群集上。 </li><li>Working with streaming data.
处理流数据。 </li><li>Working with structured streaming.
使用结构化流。 </li></ul>
An interesting blog post from Databricks gives a more extensive view of structure streaming. This particular post explains how to utilise Spark to consume and transform data streams from Apache Kafka.
Databricks的一篇有趣的博客文章提供了结构流的更广泛视图。 这篇特别的文章介绍了如何利用Spark来使用和转换Apache Kafka中的数据流。
Lastly, I want to thank you for reading until the end! Any feedback on where and how I can improve is much appreciated. Feel free to message me.
最后,感谢您的阅读直到最后! 非常感谢您对我在哪里以及如何改进的任何反馈。 随时给我发消息。
翻译自: https://medium.com/swlh/structured-streaming-in-spark-3-0-using-kafka-db44cf871d7a
</blockquote> </article> 到此这篇关于“使用kafka在spark 3 0中进行结构化流式传输”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!您可能感兴趣的文章:
使用kafka在spark 3 0中进行结构化流式传输
golang 大数据平台_大数据平台是什么?有哪些功能?如何搭建大数据平台?
scala与python区别有哪些
php入门数组的使用到面试题
php shell html 输出_PHP和Go中的闭包变量作用域
NATS—基础介绍
使用Flask实现视频的流媒体传输
go windows 安装zookeeper ,链接kafka
go 操作kafka包 sarama (Producer流程)
PHP网站大流量与高并发的解决方法