配置spark-streaming读取flume数据

int32位 posted @ Mar 22, 2016 03:46:23 PM in hadoop/spark , 2243 阅读
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

1.依赖配置

由于我们使用sbt构建项目,因此所有依赖库写入build.sbtlibraryDependencies即可,格式 为groupId % artifactId % version,具体字段含义建议参考maven.

我们这次代码除了需要spark-core外,还需要第三方库spark-streaming-flume,因此build.sbt 大致内容为:

name := "FlumeEventCount"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"

libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.0"

注意build.sbt行与行之间要有空行,这是语法要求!

2.测试代码

通过调用FlumeUtils.createStream()方法创建flume流,本次测试仅仅统计每次(每隔2秒)获取 的数据行数(事件数),代码为:

package com.huawei.test

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with an AvroSink in Flume. It will start
 *  an Avro server on at the request host:port address and listen for requests.
 *  Your Flume AvroSink should be pointed to this address.
 *
 *  Usage: FlumeEventCount <host> <port>
 *    <host> is the host the Flume receiver will be started on - a receiver
 *           creates a server and listens for flume events.
 *    <port> is the port the Flume receiver will listen on.
 *
 *  To run this example:
 *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
 */
object FlumeEventCount{
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.配置flume

只需要把sink配置成SparkSink即可

agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel

## 4.打包程序

cd $PROJECT_ROOT # PROJECT_ROOT为项目根路径,即build.sbt的位置
ant package

## 5.运行

注意:除了spark-core依赖包会由spark-submit自动引入,其他依赖包比如 spark-streaming-flume必须手动引入:

  • 设置CLASSPATH,把依赖包放入CLASSPATH中
  • 使用--jars参数手动加入

此次测试采用后种方法,即使用--jars参数。这个工程只差spark-streaming-flume包,sbt编译 时已经自动下载到本地,位于~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars, 把所有的jar包复制到工程的lib目录下.

cp ~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars/*.jar lib

使用spark-submit 提交程序,为了避免每次重复输入命令,写一个脚本用于提交:

#!/bin/sh
spark-submit --master local[*] --class com.huawei.test.FlumeEventCount\
--jars lib/*.jar\
target/scala-2.10/flumeeventcount_2.10-1.0.jar localhost 50000

其中localhost表示flume写入的主机名,50000表示flume写入端口

6.运行结果

当flume有数据流时,程序会捕捉事件,统计每次的事件总数。

6.运行结果

当flume有数据流时,程序会捕捉事件,统计每次的事件总数。

-------------------------------------------
Time: 1436942874000 ms
-------------------------------------------
Received 1345 flume events.

-------------------------------------------
Time: 1436942876000 ms
-------------------------------------------
Received 2132 flume events.

-------------------------------------------
Time: 1436942878000 ms
-------------------------------------------
Received 0 flume events.
 
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!
  • 无匹配
  • 无匹配
Alyssa 说:
2022年12月26日 21:48

There are a few steps to configuring spark-streaming to read flume data. The first is to set up a flume agent on each node that will be sending data. The next step is to define a engagement rings spark streaming context that will read data from the flume agent. Finally, it is necessary to define a flume sink that will write the data to spark.

Bushra 说:
2023年4月04日 00:23

i love reading this article so beautiful!!great job! Relevant research

Bushra 说:
2023年4月04日 00:37

i love reading this article so beautiful!!great job! Read Much More

Bushra 说:
2023年4月08日 23:55

Thanks for your information, it was really very helpfull.. pressure washing services near Gibsonia PA

Bushra 说:
2023年4月09日 04:02

thanks for this usefull article, waiting for this article like this again. Read Much More

Bushra 说:
2023年4月09日 04:24

This is very educational content and written well for a change. It's nice to see that some people still understand how to write a quality post! Click Through to the Following Page

amarnath helicopter 说:
2024年6月05日 07:21

Cool stuff you have and you keep overhaul every one of us

tech 说:
2024年6月06日 21:20

Writing with style and getting good compliments on the article is quite hard, to be honest.But you've done it so calmly and with so cool feeling and you've nailed the job. This article is possessed with style and I am giving good compliment. Best! تأشيرة تركيا للمواطنين الجامايكي

tech 说:
2024年6月23日 19:35

Great survey. I'm sure you're getting a great response. Wikipedia Links

tech 说:
2024年6月30日 19:21

Cool stuff you have and you keep overhaul every one of us Edu Backlinks


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter