由于我们使用sbt
构建项目,因此所有依赖库写入build.sbt
的libraryDependencies
即可,格式 为groupId % artifactId % version
,具体字段含义建议参考maven.
我们这次代码除了需要spark-core
外,还需要第三方库spark-streaming-flume
,因此build.sbt
大致内容为:
name := "FlumeEventCount"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.4.0"
注意build.sbt
行与行之间要有空行,这是语法要求!
通过调用FlumeUtils.createStream()
方法创建flume流,本次测试仅仅统计每次(每隔2秒)获取 的数据行数(事件数),代码为:
package com.huawei.test
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
/**
* Produces a count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
* Your Flume AvroSink should be pointed to this address.
*
* Usage: FlumeEventCount <host> <port>
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
*/
object FlumeEventCount{
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
}
}
只需要把sink配置成SparkSink即可
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel
## 4.打包程序
cd $PROJECT_ROOT # PROJECT_ROOT为项目根路径,即build.sbt的位置
ant package
## 5.运行
注意:除了spark-core
依赖包会由spark-submit
自动引入,其他依赖包比如 spark-streaming-flume
必须手动引入:
CLASSPATH
,把依赖包放入CLASSPATH中--jars
参数手动加入
此次测试采用后种方法,即使用--jars
参数。这个工程只差spark-streaming-flume
包,sbt编译 时已经自动下载到本地,位于~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars
, 把所有的jar包复制到工程的lib目录下.
cp ~/.ivy2/cache/org.apache.spark/spark-streaming-flume_2.10/jars/*.jar lib
使用spark-submit 提交程序,为了避免每次重复输入命令,写一个脚本用于提交:
#!/bin/sh
spark-submit --master local[*] --class com.huawei.test.FlumeEventCount\
--jars lib/*.jar\
target/scala-2.10/flumeeventcount_2.10-1.0.jar localhost 50000
其中localhost
表示flume写入的主机名,50000
表示flume写入端口
当flume有数据流时,程序会捕捉事件,统计每次的事件总数。
当flume有数据流时,程序会捕捉事件,统计每次的事件总数。
-------------------------------------------
Time: 1436942874000 ms
-------------------------------------------
Received 1345 flume events.
-------------------------------------------
Time: 1436942876000 ms
-------------------------------------------
Received 2132 flume events.
-------------------------------------------
Time: 1436942878000 ms
-------------------------------------------
Received 0 flume events.
由于nutch使用ant构建,ant调用ivy,会从maven仓库中下载依赖包,因此若公司需要代理才能上网,需要设置代理,如果公司不需要代理即可上网,此步骤直接省略.
总结设置代理遇到的几个坑:
失败原因: proxyHost
只需要包括主机名,而不需要指定协议
成功编译为修改build.xml
文件,设置代理,内容如下:
<target name="myproxy">
<setproxy proxyhost="proxy.xxx.com" proxyport="913" />
</target>
并修改ivy-init
指令,如下:
<target name="ivy-init" depends="ivy-probe-antlib, ivy-init-antlib,myproxy" description="--> initialise Ivy settings">
<ivy:settings file="${ivy.dir}/ivysettings.xml" />
</target>
若只是构建单机模式,此步骤省略!
否则需要设置并export HADOOP_HOME
环境变量:
export HADOOP_HOME=${HADOOP_HOME:-/opt/hadoop}
echo $HADOOP_HOME
在编译之前首先需要配置nutch,每次更新配置文件,都需要重新编译! 配置文件位于$NUTCH_HOME/conf
下,务必设置http.agent.name
,否则编译后不能运行, 编辑conf/nutch-site.xml
, 内容为:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>http.agent.name</name>
<value>My Spider</value>
</property>
</configuration>
如果需要设置nutch运行时代理,需要配置代理,修改文件$NUTCH_HOME/conf/nutch-default.xml
, 修改http.proxyHost
和http.proxyPort
值,内容如下:
<property>
<name>http.proxy.host</name>
<value>proxy.xxx.com</value>
<description>The proxy hostname. If empty, no proxy is used.</description>
</property>
<property>
<name>http.proxy.port</name>
<value>xxx</value>
<description>The proxy port.</description>
</property>
在此之前,请确保以上步骤已经正确完成,否则会导致编译失败或者运行失败! 编译:
ant -v runtime
构建开始,大约需要30分钟的时间
目前solr的版本是5.x,但好像5.x版本差别较大,nutch没有集成支持!因此我们使用当前的4.x版本,目前该版本的latest是4.10.4,点击此处下载.
解压缩,复制example/solr/collection1下的core collection1
为nutch
:
cp -rf collection1 nutch
并修改$SOLR_HOME/example/solr/nutch/core.properties
文件,设置name为nutch:
name=nutch
把$NUTCH_HOME/conf/schema-solr4.xml
复制到$SOLR_HOME/example/solr/nutch/conf
下,并重命名为schema.xml
:
cp $NUTCH_HOME/conf/schema-solr4.xml $SOLR_HOME/example/solr/nutch/conf/schema.xml
此时启动solr会出现以下错误:
org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: copyField dest :'location' is not an explicit field and doesn't match a dynamicField
应该是配置文件schema.xml的一个bug, 修复办法为在<fields>
下增加一个location field
, 内容为:
<field name="location" type="location" stored="false" indexed="true" multiValued="true"/>
若没有_version
属性,则增加_version_
属性:
<field name="_version_" type="long" indexed="true" stored="true"/>
首先从google code上下载IKAnalyzer,下载版本为IK Analyzer 2012FF_hf1.zip
,如果被GFW, 点击此处下载. 解压缩文件,把IKAnalyzer2012FF_u1.jar文件复制到$SOLR_HOME/example/solr-webapp/webapp/WEB-INF/lib
,把IKAnalyzer.cfg.xml和stopword.dic复制到$SOLR_HOME/example/solr/nutch/conf
,与schema.xml一个目录下:
cp IKAnalyzer2012FF_u1.jar $SOLR_HOME/example/solr-webapp/webapp/WEB-INF/lib
cp IKAnalyzer.cfg.xml stopword.dic $SOLR_HOME/example/solr/nutch/conf
修改core的schema.xml,在配置项间加一段如下配置:
<fieldType name="text_cn" class="solr.TextField">
<analyzer class="org.wltea.analyzer.lucene.IKAnalyzer"/>
</fieldType>
在这个core的schema.xml里面配置field类型的时候就可以使用text_cn
<field name="name" type="text_cn" indexed="true" stored="true" multiValued="true" />
启动solr服务
cd $SOLR_HOME/example
java -jar start.jar
浏览器访问http://172.16.182.23:8983/solr/#/nutch,在左下边点击Analysis
连接,选择Analyse Fieldname / FieldType
为text_cn
,在Field Value (Index)
下输入:
我喜欢solr
然后点击蓝色按钮Analyse Values
,查看效果,看是否正确分词!
具体过程可以查看官方教程 。
总结过程如下:
mkdir urls
cd urls
touch seeds.txt
echo "http://nutch.apache.org/" >> seeds.txt # 每行一个URL
crawl
脚本运行
bin/crawl -i -D solr.server.url=http://localhost:8983/solr/nutch urls/ TestCrawl/ 2
打开http://localhost:8983/solr/,点击solr admin
, 在Querty String
输入nutch
, 点击Search
查看效果
与单机过程类型,首先创建urls
目录作为种子url,然后上传到hdfs上:
mkdir urls
cd urls
echo "http://apache.org" >>seeds.txt
hdfs dfs -put urls
假定solr已经配置完成,url为localhost:8983
运行以下命令运行:
cd $NUTCH_HOME/runtime/runtime/deploy/
bin/crawl -i -D solr.server.url=http://localhost:8983/solr/nutch urls/ TestCrawl/ 2
此次安装版本为1.99.6
# Decompress Sqoop distribution tarball
tar -xvf sqoop-<version>-bin-hadoop<hadoop-version>.tar.gz
ln -s sqoop-<version>-bin-hadoop<hadoop version>.tar.gz sqoop
export SQOOP_HOME=`pwd`/sqoop
# Change working directory
cd $SQOOP_HOME
配置文件在$SQOOP_HOME/server/conf
下,大多数默认配置即可。需要注意的配置是catalina.properties
文件下common_loader
需要正确配置hadoop库文件,包括hdfs、mapreduce、hive等所有jar包. 我的配置为:
common.loader=${catalina.base}/lib,\
${catalina.base}/lib/*.jar,\
${catalina.home}/lib,\
${catalina.home}/lib/*.jar,\
${catalina.home}/../lib/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop/lib/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop-hdfs/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop-hdfs/lib/*.jar,\ /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop-yarn/*.jar,\
/opt/cloudera/parcels/CDH/lib/hadoop-yarn/lib/*.jar,\
/opt/cloudera/parcels/CDH/lib/hive/lib/*.jar
sqoop.properties
文件的org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/
配置hadoop 配置文件路径,默认为/etc/hadoop/conf
运行
sqoop2-tool verify
若正确,则输出为:
Verification was successful.
Tool class org.apache.sqoop.tools.tool.VerifyTool has finished correctly
自带derby版本过低运行时会由于不兼容而出现以下错误:
org.apache.sqoop.common.SqoopException: JDBCREPO_0007:Unable to lease link
at org.apache.sqoop.repository.JdbcRepositoryTransaction.begin(JdbcRepositoryTransaction.java:63)
at org.apache.sqoop.repository.JdbcRepository.doWithConnection(JdbcRepository.java:85)
at org.apache.sqoop.repository.JdbcRepository.doWithConnection(JdbcRepository.java:61)
at org.apache.sqoop.repository.JdbcRepository.createOrUpgradeRepository(JdbcRepository.java:127)
at org.apache.sqoop.repository.RepositoryManager.initialize(RepositoryManager.java:123)
at org.apache.sqoop.tools.tool.UpgradeTool.runToolWithConfiguration(UpgradeTool.java:39)
at org.apache.sqoop.tools.ConfiguredTool.runTool(ConfiguredTool.java:35)
at org.apache.sqoop.tools.ToolRunner.main(ToolRunner.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.sqoop.tomcat.TomcatToolRunner.main(TomcatToolRunner.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.catalina.startup.Tool.main(Tool.java:225)
Caused by: java.sql.SQLException: No suitable driver found for
解决办法为下载derby最新版本,先删除/sqoop-1.99.6-bin-hadoop200/server/webapps/sqoop/WEB-INF/lib
下的derby旧包,然后把新下载的derby目录下的lib下的jar包拷贝到/sqoop-1.99.6-bin-hadoop200/server/webapps/sqoop/WEB-INF/lib
运行bin/sqoop2-shell,进入sqoop shell模式, 运行show version --all
,若能正确输出server版本,则安装成功:
sqoop:000> show version --all
client version:
Sqoop 1.99.6 source revision 07244c3915975f26f03d9e1edf09ab7d06619bb8
Compiled by root on Wed Apr 29 10:40:43 CST 2015
server version:
Sqoop 1.99.6 source revision 07244c3915975f26f03d9e1edf09ab7d06619bb8
Compiled by root on Wed Apr 29 10:40:43 CST 2015
API versions:
[v1]
虽然很简单,但会遇到很多奇怪的坑,而且网上解决方法搜不到。 首先下载kafka包,解压缩后,修改conf/server.properties
文件,基本配置项如下(省略了部分默认配置项 :
broker.id=0
advertised.host.name=master
zookeeper.connect=master:2181,node1:2181,node2:2181
以上需要注意的是advertised.host.name
必须修改为主机名,否则会导致很多问题。 每个主机的broker.id
必须不一样。zookeeper.connect
需要填写所有的zookeeper服务器地址端口,并且以上的主机名对应的node1,node2,...必须和/etc/hosts
一致,并且集群外可以ping通(集群内可以使用内部ip,集群外使用外部ip,但主机名对应的机器必须一一对应,否则会出现Leader not local for partition
错误,这是其中一个坑,搞了很久没有搞清楚.
配置修改后,创建一个topic(topic一旦创建不能删除?只能标记为已删除?):
bin/kafka-topics.sh --create --partitions 5 --replication-factor 3 --topic test3 --zookeeper master,node1,node2
获取主题信息
bin/kafka/bin$ ./kafka-topics.sh --describe --topic test3 --zookeeper master,node1
输出:
Topic:test3 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: test3 Partition: 0 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
Topic: test3 Partition: 1 Leader: 5 Replicas: 5,3,4 Isr: 5,3,4
Topic: test3 Partition: 2 Leader: 6 Replicas: 6,4,5 Isr: 6,4,5
Topic: test3 Partition: 3 Leader: 7 Replicas: 7,5,6 Isr: 7,5,6
Topic: test3 Partition: 4 Leader: 0 Replicas: 0,6,7 Isr: 0,6,7
以上的Replicas和lsr必须一样,否则说明对应的broker down掉了。
当前已编译的spark二进制包只有hadoop2.4和cdh4版本的,如果搭的是hadoop2.5.x,则需要自己从源码中构建。
从官网中下载源码,在Chose a package type 中选择Source Code, 下载后解压缩。
tar xvf spark-1.1.1.tgz
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
注意hadoop版本2.x.x需要指定profile,在根下的pom.xml下只有2.4.0的profile, 如果要在hadoop-2.5.x下,需要手动加上hadoop-2.5的profile,即添加pom.xml:
<profile>
<id>hadoop-2.5</id>
<properties>
<hadoop.version>2.5.2</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<jets3t.version>0.9.0</jets3t.version>
</properties>
</profile>
否则编译的结果得到的protobuf版本不对,无法读取hdfs上的文件,抛java.lang.VerifyError: class org.apache.hadoop.hdfs .protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto overrides final method getUnknownFields.() Lcom/google/protobuf/UnknownFieldSet;
运行:
mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.2 -Phive -DskipTests clean package
在hive官网下载最新的tar包,当前最新包为apache-hive-0.14.0-bin.tar.gz。
假定安装路径为/opt/hive:
sudo mv apache-hive-0.14.0-bin.tar.gz /opt
sudo tar xvf apache-hive-0.14.0-bin.tar.gz
sudo ln -s apache-hive-0.14.0-bin hive
sudo mv mysql-connector-java-5.1.25.jar /opt/hive/lib
sudo rename 's/\.template//' *
sudo touch hive-site.xml
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true</value>
<description>the URL of the MySQL database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>HIVE_DBPASS</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.autoCreateTables</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateColumns</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoStartMechanism</name>
<value>SchemaTable</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!--
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>IP address (or fully-qualified domain name) and port of the metastore host</description>
</property>
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/hive/lib/zookeeper-3.4.5.jar,file:///opt/hive/lib/hive-hbase-handler-0.14.0.jar,file:///opt/hive/lib/guava-11.0.2.jar</value>
</property>
-->
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>hive.support.concurrency</name>
<description>Enable Hive's Table Lock Manager Service</description>
<value>true</value>
</property>
</configuration>
$HADOOP_HOME/bin/hdfs dfs -mkdir /tmp
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/hive
$HADOOP_HOME/bin/hdfs dfs -chown hive /user/hive
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/hive/warehouse
$HADOOP_HOME/bin/hdfs dfs -chmod g+w /tmp
$HADOOP_HOME/bin/hdfs dfs -chmod 777 /user/hive/warehouse
$HADOOP_HOME/bin/hdfs dfs -chmod a+t /user/hive/warehouse