配置spark-streaming读取flume数据

1.依赖配置

由于我们使用sbt构建项目,因此所有依赖库写入build.sbtlibraryDependencies即可,格式 为groupId % artifactId % version,具体字段含义建议参考maven.

我们这次代码除了需要spark-core外,还需要第三方库spark-streaming-flume,因此build.sbt 大致内容为:

name := "FlumeEventCount"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"

libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.0"

注意build.sbt行与行之间要有空行,这是语法要求!

2.测试代码

通过调用FlumeUtils.createStream()方法创建flume流,本次测试仅仅统计每次(每隔2秒)获取 的数据行数(事件数),代码为:

package com.huawei.test

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with an AvroSink in Flume. It will start
 *  an Avro server on at the request host:port address and listen for requests.
 *  Your Flume AvroSink should be pointed to this address.
 *
 *  Usage: FlumeEventCount <host> <port>
 *    <host> is the host the Flume receiver will be started on - a receiver
 *           creates a server and listens for flume events.
 *    <port> is the port the Flume receiver will listen on.
 *
 *  To run this example:
 *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
 */
object FlumeEventCount{
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.配置flume

只需要把sink配置成SparkSink即可

agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel

## 4.打包程序

cd $PROJECT_ROOT # PROJECT_ROOT为项目根路径,即build.sbt的位置
ant package

## 5.运行

注意:除了spark-core依赖包会由spark-submit自动引入,其他依赖包比如 spark-streaming-flume必须手动引入:

  • 设置CLASSPATH,把依赖包放入CLASSPATH中
  • 使用--jars参数手动加入

此次测试采用后种方法,即使用--jars参数。这个工程只差spark-streaming-flume包,sbt编译 时已经自动下载到本地,位于~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars, 把所有的jar包复制到工程的lib目录下.

cp ~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars/*.jar lib

使用spark-submit 提交程序,为了避免每次重复输入命令,写一个脚本用于提交:

#!/bin/sh
spark-submit --master local[*] --class com.huawei.test.FlumeEventCount\
--jars lib/*.jar\
target/scala-2.10/flumeeventcount_2.10-1.0.jar localhost 50000

其中localhost表示flume写入的主机名,50000表示flume写入端口

6.运行结果

当flume有数据流时,程序会捕捉事件,统计每次的事件总数。

6.运行结果

当flume有数据流时,程序会捕捉事件,统计每次的事件总数。

-------------------------------------------
Time: 1436942874000 ms
-------------------------------------------
Received 1345 flume events.

-------------------------------------------
Time: 1436942876000 ms
-------------------------------------------
Received 2132 flume events.

-------------------------------------------
Time: 1436942878000 ms
-------------------------------------------
Received 0 flume events.
 
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

nutch集成solr和中文分词

一、构建nutch环境

1. 设置代理

由于nutch使用ant构建,ant调用ivy,会从maven仓库中下载依赖包,因此若公司需要代理才能上网,需要设置代理,如果公司不需要代理即可上网,此步骤直接省略.

总结设置代理遇到的几个坑:

  • 强制使用系统代理,即 ant -autoproxy , 结果失败!
  • 修改 build.xml , 增加 setproxy ,设置代理,结果失败!
  • 设置 ANT_OPTS ,添加选项
  • -Dhttp.proxyHost=http://proxy.xxx.com
  • -Dhttp.proxyPort=xxx , 结果失败!
  • 传递 http_proxy ,结果失败!

失败原因: proxyHost只需要包括主机名,而不需要指定协议

成功编译为修改build.xml文件,设置代理,内容如下:

<target name="myproxy">
      <setproxy proxyhost="proxy.xxx.com" proxyport="913" />
</target>

并修改ivy-init指令,如下:

<target name="ivy-init" depends="ivy-probe-antlib, ivy-init-antlib,myproxy" description="--> initialise Ivy settings">
    <ivy:settings file="${ivy.dir}/ivysettings.xml" />
</target>

2. 设置hadoop环境

若只是构建单机模式,此步骤省略!

否则需要设置并export HADOOP_HOME环境变量:

export HADOOP_HOME=${HADOOP_HOME:-/opt/hadoop}
echo $HADOOP_HOME

3. nutch配置

在编译之前首先需要配置nutch,每次更新配置文件,都需要重新编译! 配置文件位于$NUTCH_HOME/conf下,务必设置http.agent.name,否则编译后不能运行, 编辑conf/nutch-site.xml, 内容为:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <property>
        <name>http.agent.name</name>
        <value>My Spider</value>
    </property>
</configuration>

如果需要设置nutch运行时代理,需要配置代理,修改文件$NUTCH_HOME/conf/nutch-default.xml, 修改http.proxyHosthttp.proxyPort值,内容如下:

<property>
  <name>http.proxy.host</name>
  <value>proxy.xxx.com</value>
  <description>The proxy hostname.  If empty, no proxy is used.</description>
</property>

<property>
  <name>http.proxy.port</name>
  <value>xxx</value>
  <description>The proxy port.</description>
</property>

4.编译构建nutch

在此之前,请确保以上步骤已经正确完成,否则会导致编译失败或者运行失败! 编译:

ant -v runtime

构建开始,大约需要30分钟的时间

二、集成solr

目前solr的版本是5.x,但好像5.x版本差别较大,nutch没有集成支持!因此我们使用当前的4.x版本,目前该版本的latest是4.10.4,点击此处下载.

1.初始化

解压缩,复制example/solr/collection1下的core collection1nutch:

cp -rf collection1 nutch

并修改$SOLR_HOME/example/solr/nutch/core.properties文件,设置name为nutch:

name=nutch

$NUTCH_HOME/conf/schema-solr4.xml复制到$SOLR_HOME/example/solr/nutch/conf下,并重命名为schema.xml:

cp $NUTCH_HOME/conf/schema-solr4.xml  $SOLR_HOME/example/solr/nutch/conf/schema.xml

2. 修改配置

此时启动solr会出现以下错误:

org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: copyField dest :'location' is not an explicit field and doesn't match a dynamicField

应该是配置文件schema.xml的一个bug, 修复办法为在<fields>下增加一个location field, 内容为:

  <field name="location" type="location" stored="false" indexed="true" multiValued="true"/>

若没有_version属性,则增加_version_属性:

<field name="_version_" type="long" indexed="true" stored="true"/>

3. 增加中文分词

首先从google code上下载IKAnalyzer,下载版本为IK Analyzer 2012FF_hf1.zip,如果被GFW, 点击此处下载. 解压缩文件,把IKAnalyzer2012FF_u1.jar文件复制到$SOLR_HOME/example/solr-webapp/webapp/WEB-INF/lib,把IKAnalyzer.cfg.xml和stopword.dic复制到$SOLR_HOME/example/solr/nutch/conf,与schema.xml一个目录下:

cp IKAnalyzer2012FF_u1.jar $SOLR_HOME/example/solr-webapp/webapp/WEB-INF/lib
cp IKAnalyzer.cfg.xml stopword.dic $SOLR_HOME/example/solr/nutch/conf

修改core的schema.xml,在配置项间加一段如下配置:

<fieldType name="text_cn" class="solr.TextField">   
     <analyzer class="org.wltea.analyzer.lucene.IKAnalyzer"/>   
</fieldType>

在这个core的schema.xml里面配置field类型的时候就可以使用text_cn

<field name="name"      type="text_cn"   indexed="true"  stored="true"  multiValued="true" /> 

启动solr服务

cd $SOLR_HOME/example
java -jar start.jar

浏览器访问http://172.16.182.23:8983/solr/#/nutch,在左下边点击Analysis连接,选择Analyse Fieldname / FieldTypetext_cn,在Field Value (Index)下输入:

我喜欢solr

然后点击蓝色按钮Analyse Values,查看效果,看是否正确分词!

三、单机运行测试

具体过程可以查看官方教程 。

总结过程如下:

1. 创建种子列表

mkdir urls
cd urls
touch seeds.txt
echo "http://nutch.apache.org/" >> seeds.txt # 每行一个URL

2.使用crawl脚本运行

bin/crawl -i -D solr.server.url=http://localhost:8983/solr/nutch urls/ TestCrawl/  2

3. 验证结果

打开http://localhost:8983/solr/,点击solr admin, 在Querty String输入nutch, 点击Search查看效果

四、分布式运行

与单机过程类型,首先创建urls目录作为种子url,然后上传到hdfs上:

mkdir urls
cd urls
echo "http://apache.org" >>seeds.txt
hdfs dfs -put urls

假定solr已经配置完成,url为localhost:8983

运行以下命令运行:

cd $NUTCH_HOME/runtime/runtime/deploy/
bin/crawl -i -D solr.server.url=http://localhost:8983/solr/nutch urls/ TestCrawl/  2

 

转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

sqoop2安装

 

1. 下载解压缩

此次安装版本为1.99.6

# Decompress Sqoop distribution tarball
tar -xvf sqoop-<version>-bin-hadoop<hadoop-version>.tar.gz


ln -s sqoop-<version>-bin-hadoop<hadoop version>.tar.gz sqoop

export SQOOP_HOME=`pwd`/sqoop
# Change working directory
cd $SQOOP_HOME

2. 配置服务

配置文件在$SQOOP_HOME/server/conf下,大多数默认配置即可。需要注意的配置是catalina.properties文件下common_loader需要正确配置hadoop库文件,包括hdfs、mapreduce、hive等所有jar包. 我的配置为:

common.loader=${catalina.base}/lib,\ 
${catalina.base}/lib/*.jar,\ 
${catalina.home}/lib,\ 
${catalina.home}/lib/*.jar,\ 
${catalina.home}/../lib/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop/lib/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop-hdfs/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop-hdfs/lib/*.jar,\ /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop-yarn/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hadoop-yarn/lib/*.jar,\ 
/opt/cloudera/parcels/CDH/lib/hive/lib/*.jar

sqoop.properties文件的org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/配置hadoop 配置文件路径,默认为/etc/hadoop/conf

3. 测试配置是否正确

运行

sqoop2-tool verify

若正确,则输出为:

Verification was successful.
Tool class org.apache.sqoop.tools.tool.VerifyTool has finished correctly

4. 下载derby 包

自带derby版本过低运行时会由于不兼容而出现以下错误:

org.apache.sqoop.common.SqoopException: JDBCREPO_0007:Unable to lease link
at org.apache.sqoop.repository.JdbcRepositoryTransaction.begin(JdbcRepositoryTransaction.java:63)
    at org.apache.sqoop.repository.JdbcRepository.doWithConnection(JdbcRepository.java:85)
    at org.apache.sqoop.repository.JdbcRepository.doWithConnection(JdbcRepository.java:61)
    at org.apache.sqoop.repository.JdbcRepository.createOrUpgradeRepository(JdbcRepository.java:127)
    at org.apache.sqoop.repository.RepositoryManager.initialize(RepositoryManager.java:123)
    at org.apache.sqoop.tools.tool.UpgradeTool.runToolWithConfiguration(UpgradeTool.java:39)
    at org.apache.sqoop.tools.ConfiguredTool.runTool(ConfiguredTool.java:35)
    at org.apache.sqoop.tools.ToolRunner.main(ToolRunner.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.sqoop.tomcat.TomcatToolRunner.main(TomcatToolRunner.java:77)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.catalina.startup.Tool.main(Tool.java:225)
Caused by: java.sql.SQLException: No suitable driver found for 

解决办法为下载derby最新版本,先删除/sqoop-1.99.6-bin-hadoop200/server/webapps/sqoop/WEB-INF/lib下的derby旧包,然后把新下载的derby目录下的lib下的jar包拷贝到/sqoop-1.99.6-bin-hadoop200/server/webapps/sqoop/WEB-INF/lib

5. 验证

运行bin/sqoop2-shell,进入sqoop shell模式, 运行show version --all,若能正确输出server版本,则安装成功:

sqoop:000> show version --all
client version:
  Sqoop 1.99.6 source revision 07244c3915975f26f03d9e1edf09ab7d06619bb8
  Compiled by root on Wed Apr 29 10:40:43 CST 2015
server version:
  Sqoop 1.99.6 source revision 07244c3915975f26f03d9e1edf09ab7d06619bb8
  Compiled by root on Wed Apr 29 10:40:43 CST 2015
API versions:
  [v1]
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

配置kafka集群

 

虽然很简单,但会遇到很多奇怪的坑,而且网上解决方法搜不到。 首先下载kafka包,解压缩后,修改conf/server.properties文件,基本配置项如下(省略了部分默认配置项 :

broker.id=0
advertised.host.name=master
zookeeper.connect=master:2181,node1:2181,node2:2181

以上需要注意的是advertised.host.name必须修改为主机名,否则会导致很多问题。 每个主机的broker.id必须不一样。zookeeper.connect需要填写所有的zookeeper服务器地址端口,并且以上的主机名对应的node1,node2,...必须和/etc/hosts一致,并且集群外可以ping通(集群内可以使用内部ip,集群外使用外部ip,但主机名对应的机器必须一一对应,否则会出现Leader not local for partition错误,这是其中一个坑,搞了很久没有搞清楚.

配置修改后,创建一个topic(topic一旦创建不能删除?只能标记为已删除?):

bin/kafka-topics.sh --create --partitions 5 --replication-factor 3 --topic test3 --zookeeper master,node1,node2

获取主题信息

bin/kafka/bin$ ./kafka-topics.sh --describe --topic test3 --zookeeper master,node1

输出:

Topic:test3     PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: test3    Partition: 0    Leader: 4       Replicas: 4,2,3 Isr: 4,2,3
        Topic: test3    Partition: 1    Leader: 5       Replicas: 5,3,4 Isr: 5,3,4
        Topic: test3    Partition: 2    Leader: 6       Replicas: 6,4,5 Isr: 6,4,5
        Topic: test3    Partition: 3    Leader: 7       Replicas: 7,5,6 Isr: 7,5,6
        Topic: test3    Partition: 4    Leader: 0       Replicas: 0,6,7 Isr: 0,6,7

以上的Replicas和lsr必须一样,否则说明对应的broker down掉了。

转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

构建hadoop2.5.2环境下的spark1.1.1

当前已编译的spark二进制包只有hadoop2.4和cdh4版本的,如果搭的是hadoop2.5.x,则需要自己从源码中构建。

下载源码

官网中下载源码,在Chose a package type 中选择Source Code, 下载后解压缩。

tar xvf spark-1.1.1.tgz

编译

Step1 设置maven内存限制

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

Step2 增加hadoop-2.5的profile

注意hadoop版本2.x.x需要指定profile,在根下的pom.xml下只有2.4.0的profile, 如果要在hadoop-2.5.x下,需要手动加上hadoop-2.5的profile,即添加pom.xml:

 <profile>
      <id>hadoop-2.5</id>
      <properties>
        <hadoop.version>2.5.2</hadoop.version>
        <protobuf.version>2.5.0</protobuf.version>
        <jets3t.version>0.9.0</jets3t.version>
      </properties>
    </profile>

否则编译的结果得到的protobuf版本不对,无法读取hdfs上的文件,抛java.lang.VerifyError: class org.apache.hadoop.hdfs .protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto overrides final method getUnknownFields.() Lcom/google/protobuf/UnknownFieldSet;

Step3 编译

运行:

mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.2 -Phive -DskipTests clean package

 

转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

hive单机安装

安装要求

  1. java1.7+
  2. hadoop2.x
  3. mysql5.5+(非必须,建议使用mysql存储元数据)

安装前的环境

  1. JAVA_HOME: java安装目录
  2. HADOOP_HOME: hadoop安装目录
  3. CLASSPATH: 除了hadoop和hive的必须包,还需要包括mysql java驱动,这里使用的是mysql-connector-java-5.1.25.jar, 并把它放入到lib下

安装过程

Step 1 下载tar包

hive官网下载最新的tar包,当前最新包为apache-hive-0.14.0-bin.tar.gz。

Step 2 解压包

假定安装路径为/opt/hive:

sudo mv apache-hive-0.14.0-bin.tar.gz /opt
sudo tar xvf apache-hive-0.14.0-bin.tar.gz
sudo ln -s apache-hive-0.14.0-bin hive
sudo mv mysql-connector-java-5.1.25.jar /opt/hive/lib

Step 3 配置

  • 创建配置文件,直接从模板文件创建即可
sudo rename 's/\.template//' *
sudo touch hive-site.xml
  • 编辑hive-env.sh文件,设置HADOOP_HOME=${HADOOP_HOME-:/opt/hadoop}
  • 创建hive-site-xml文件,添加以下内容:
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true</value>
  <description>the URL of the MySQL database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>HIVE_DBPASS</value>
</property>

<property>
  <name>datanucleus.autoCreateSchema</name>
  <value>true</value>
</property>

<property>
  <name>datanucleus.fixedDatastore</name>
  <value>false</value>
</property>

<property>
  <name>datanucleus.autoCreateTables</name>
  <value>true</value>
</property>

<property>
  <name>datanucleus.autoCreateColumns</name>
  <value>true</value>
</property>

<property>
  <name>datanucleus.autoStartMechanism</name> 
  <value>SchemaTable</value>
</property> 

<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
</property>

<!--
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://localhost:9083</value>
  <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
</property>

<property>
  <name>hive.aux.jars.path</name>
  <value>file:///opt/hive/lib/zookeeper-3.4.5.jar,file:///opt/hive/lib/hive-hbase-handler-0.14.0.jar,file:///opt/hive/lib/guava-11.0.2.jar</value>
</property>
-->

<property>
 <name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>

<property>
  <name>hive.support.concurrency</name>
  <description>Enable Hive's Table Lock Manager Service</description>
  <value>true</value>
</property>

</configuration>
  • mysql设置。修改/etc/mysql/my.cnf,修改bind-address 为0.0.0.0,重启mysql服务。
  • 在hdfs创建必要目录:
$HADOOP_HOME/bin/hdfs dfs -mkdir /tmp
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/hive
$HADOOP_HOME/bin/hdfs dfs -chown hive /user/hive
$HADOOP_HOME/bin/hdfs dfs -mkdir  /user/hive/warehouse
$HADOOP_HOME/bin/hdfs dfs -chmod g+w   /tmp
$HADOOP_HOME/bin/hdfs dfs -chmod 777   /user/hive/warehouse
$HADOOP_HOME/bin/hdfs dfs -chmod a+t /user/hive/warehouse
  • 运行$HIVE_HOME/bin/hive, OK!
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!