由于我们使用sbt
构建项目,因此所有依赖库写入build.sbt
的libraryDependencies
即可,格式 为groupId % artifactId % version
,具体字段含义建议参考maven.
我们这次代码除了需要spark-core
外,还需要第三方库spark-streaming-flume
,因此build.sbt
大致内容为:
name := "FlumeEventCount"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.0"
注意build.sbt
行与行之间要有空行,这是语法要求!
通过调用FlumeUtils.createStream()
方法创建flume流,本次测试仅仅统计每次(每隔2秒)获取 的数据行数(事件数),代码为:
package com.huawei.test
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
/**
* Produces a count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
* Your Flume AvroSink should be pointed to this address.
*
* Usage: FlumeEventCount <host> <port>
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
*/
object FlumeEventCount{
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
}
}
只需要把sink配置成SparkSink即可
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel
## 4.打包程序
cd $PROJECT_ROOT # PROJECT_ROOT为项目根路径,即build.sbt的位置
ant package
## 5.运行
注意:除了spark-core
依赖包会由spark-submit
自动引入,其他依赖包比如 spark-streaming-flume
必须手动引入:
CLASSPATH
,把依赖包放入CLASSPATH中--jars
参数手动加入
此次测试采用后种方法,即使用--jars
参数。这个工程只差spark-streaming-flume
包,sbt编译 时已经自动下载到本地,位于~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars
, 把所有的jar包复制到工程的lib目录下.
cp ~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars/*.jar lib
使用spark-submit 提交程序,为了避免每次重复输入命令,写一个脚本用于提交:
#!/bin/sh
spark-submit --master local[*] --class com.huawei.test.FlumeEventCount\
--jars lib/*.jar\
target/scala-2.10/flumeeventcount_2.10-1.0.jar localhost 50000
其中localhost
表示flume写入的主机名,50000
表示flume写入端口
当flume有数据流时,程序会捕捉事件,统计每次的事件总数。
当flume有数据流时,程序会捕捉事件,统计每次的事件总数。
-------------------------------------------
Time: 1436942874000 ms
-------------------------------------------
Received 1345 flume events.
-------------------------------------------
Time: 1436942876000 ms
-------------------------------------------
Received 2132 flume events.
-------------------------------------------
Time: 1436942878000 ms
-------------------------------------------
Received 0 flume events.
2022年12月26日 21:48
There are a few steps to configuring spark-streaming to read flume data. The first is to set up a flume agent on each node that will be sending data. The next step is to define a engagement rings spark streaming context that will read data from the flume agent. Finally, it is necessary to define a flume sink that will write the data to spark.
2023年4月04日 00:23
i love reading this article so beautiful!!great job! Relevant research
2023年4月04日 00:37
i love reading this article so beautiful!!great job! Read Much More
2023年4月08日 23:55
Thanks for your information, it was really very helpfull.. pressure washing services near Gibsonia PA
2023年4月09日 04:02
thanks for this usefull article, waiting for this article like this again. Read Much More
2023年4月09日 04:24
This is very educational content and written well for a change. It's nice to see that some people still understand how to write a quality post! Click Through to the Following Page
2024年6月05日 07:21
Cool stuff you have and you keep overhaul every one of us
2024年6月06日 21:20
Writing with style and getting good compliments on the article is quite hard, to be honest.But you've done it so calmly and with so cool feeling and you've nailed the job. This article is possessed with style and I am giving good compliment. Best! تأشيرة تركيا للمواطنين الجامايكي
2024年6月23日 19:35
Great survey. I'm sure you're getting a great response. Wikipedia Links
2024年6月30日 19:21
Cool stuff you have and you keep overhaul every one of us Edu Backlinks
2024年8月31日 10:24
Good website, where did you find the details for this post? Still, I'm glad I found it, and I'll be returning soon to see what other postings you have. أهلية التأشيرة الهندية لمواطني ليختنشتاين
2024年9月23日 12:58
You truly have a gift for creating original stuff. I appreciate the way you articulate your ideas in this piece and the way you think. Your writing style really strikes me. I appreciate you adding to the beauty of my experience. obywatele Czarnogóry uprawnieni do indyjskiej wizy elektronicznej
2024年10月16日 05:16
I appreciate your great blog. Where else might I get information conveyed in such a flawless manner? I have a project that I am currently working on, and I have been watching for such info. elegibilitat de visat a l'Índia visat a l'Índia per a ciutadans de Kirguizistan
2024年11月16日 19:59
New York fake IDs are often sought after, but using them risks serious legal consequences and fines. PENNSYLVANIA FAKE ID