个人跟着尚硅谷数仓项目做的笔记,仅供学习使用.

数据仓库

数据仓库概念

数据仓库(Data Warehouse):是为企业制定决策,提供数据支持的。可以帮助企业改进业务流程、提高产品质量等。

ODS:数据备份,原封不动地把输入的数据存放起来

DWD:主要做数据清洗

DWS:预聚合,避免大表重复join,消耗性能和资源

ADS:存放最终的指标,并输出到各个系统中

数据仓库并不是数据的最终目的地,二十为数据最终的目的地做好准备。这些准备包括对数据的:备份、清洗、聚合、统计等。

数据仓库需求

离线需求

实时需求

技术选型

技术选型主要考虑因素:数据量大小、业务需求、业内经验、技术成熟度、开发维护成本、总成本预算

数据流程设计

具体版本号选择

用户行为

用户行为概述

用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点

目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点全埋点等。

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

用户行为日志

本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。

页面浏览记录

页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

动作记录

动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

曝光记录

曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

启动记录

启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

错误记录

启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

集群软件安装

前提准备

准备三台机器,并配置好免密登录

jdk安装

把安装包上传到/opt/software下

1
2
3
4
5
6
#解压到/opt/module/目录下
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
#进入module目录
cd /opt/module/
#把解压后的名字修改为jdk
/opt/module/

配置环境变量

1
2
#新建sudo vim /etc/profile.d/my_env.sh
sudo vim /etc/profile.d/my_env.sh

添加如下内容:

1
2
3
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk
export PATH=$PATH:$JAVA_HOME/bin

环境变量生效

1
source /etc/profile.d/my_env.sh

三台机器都需要安装

集群查看所有进程脚本

在~/bin目录下创建脚本

1
vim xcall

添加如下内容:

1
2
3
4
5
6
7
#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done

修改执行权限

1
chmod 777 xcall

启动脚本

1
xcall jps

Hadoop部署

集群规划

hadoop133 hadoop134 hadoop135
HDFS NameNode、DataNode DataNode SecondaryNameNode、DataNode
YARN NodeManager ResourceManager、NodeManager NodeManager

上传安装包

将安装包上传到 /opt/software/目录下

解压

1
2
3
4
5
#解压到/opt/moudle目录
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

#重命名
mv /opt/module/hadoop-3.1.3 /opt/module/hadoop

添加环境变量

1
2
#打开/etc/profile.d/my_env.sh文件
sudo vim /etc/profile.d/my_env.sh

追加hadoop路径

1
2
3
4
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

环境变量生效

1
source /etc/profile.d/my_env.sh

配置集群

配置core-site.xml
1
2
cd $HADOOP_HOME/etc/hadoop
vim core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop133:8020</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop/data</value>
</property>

<!-- 配置HDFS网页登录使用的静态用户为atguigu -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>atguigu</value>
</property>

<!-- 配置该atguigu(superUser)允许通过代理访问的主机节点 -->
<property>
<name>hadoop.proxyuser.atguigu.hosts</name>
<value>*</value>
</property>
<!-- 配置该atguigu(superUser)允许通过代理用户所属组 -->
<property>
<name>hadoop.proxyuser.atguigu.groups</name>
<value>*</value>
</property>
<!-- 配置该atguigu(superUser)允许通过代理的用户-->
<property>
<name>hadoop.proxyuser.atguigu.users</name>
<value>*</value>
</property>
</configuration>
配置hdfs-site.xml
1
vim hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- nn web端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop133:9870</value>
</property>

<!-- 2nn web端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop135:9868</value>
</property>

<!-- 测试环境指定HDFS副本的数量1 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
配置yarn-site.xml
1
vim yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定MR走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 指定ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop134</value>
</property>

<!-- 环境变量的继承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>

<!--yarn单个容器允许分配的最大最小内存 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>

<!-- yarn容器允许管理的物理内存大小 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>

<!-- 关闭yarn对物理内存和虚拟内存的限制检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
配置mapred-site.xml
1
vim mapred-site.xml
1
2
3
4
5
6
7
8
9
10
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定MapReduce程序运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
配置workers
1
vim /opt/module/hadoop/etc/hadoop/workers

添加集群信息

1
2
3
hadoop133
hadoop134
hadoop135

注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。

配置历史服务器

为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:

配置mapred-site.xml

增加如下配置:

1
2
3
4
5
6
7
8
9
10
11
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop133:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop133:19888</value>
</property>
配置日志聚集

日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上。

日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。

注意:开启日志聚集功能,需要重新启动NodeManager 、ResourceManager和HistoryManager。

配置yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop133:19888/jobhistory/logs</value>
</property>

<!-- 设置日志保留时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

启动集群

如果是第一次启动,需要先格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)

格式化NameNode
1
2
#$hadoop_home下执行
bin/hdfs namenode -format

出现以上日志表示格式化成功

启动HDFS
1
sbin/start-dfs.sh
启动YARN

ResourceManager是部署在hadoop134上面的

1
sbin/start-yarn.sh

我们在各个服务器上查看一下jps

每个节点都有一个DateNode和NodeManager,然后NameNode、ResourceManager、SecondaryNameNode每个节点一个

集群脚本

1
2
3
cd ~/bin

vim hdp.sh

添加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="

echo " --------------- 启动 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="

echo " --------------- 关闭 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac

修改脚本权限:

1
chmod 777 hdp.sh

Zookeper部署

集群规划

每个节点都部署一个Zookeeper即可

解压安装

把安装包上传到/opt/software目录下

1
2
3
4
5
#解压到/opt/module下
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

#修改名称为zookeeper
mv apache-zookeeper-3.5.7-bin/ zookeeper

配置服务器编号

1
2
3
4
5
6
7
8
#在zookeeper目录下创建zkData
mkdir zkData

#在zkData目录下创建一个myid的文件
vim myid

#在文件中添加与server对应的编号:
3

配置zoo.cfg

1
2
3
4
5
#重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg
mv zoo_sample.cfg zoo.cfg

#修改zoo.cfg文件
vim zoo.cfg

zoo.cfg修改内容:

1
2
3
4
5
6
7
#修改数据存储路径配置
/opt/module/zookeeper/zkData

#######################cluster##########################
server.3=hadoop133:2888:3888
server.4=hadoop134:2888:3888
server.5=hadoop135:2888:3888

集群启停脚本

在hadoop133机器上创建脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#进入~/bin
cd ~/bin

#创建zk.sh
vim zk.sh

#添加如下内容
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
done
};;
esac

#增加脚本执行权限
chmod 777 zk.sh

Kafka部署

集群规划

每个节点一个即可

解压安装

把安装包上传到/opt/software目录下

1
2
3
4
5
#解压到/opt/module
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

#重命名
mv kafka_2.12-3.0.0/ kafka

修改配置

1
2
3
4
5
#进入kafka目录
cd config/

#修改server.properties文件
vim server.properties

修改内容:

1
2
3
broker.id=0
log.dirs=/opt/module/kafka/datas
zookeeper.connect=hadoop133:2181,hadoop134:2181,hadoop135:2181/kafka

配置环境变量

在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置

1
2
3
4
5
6
7
8
9
10
sudo vim /etc/profile.d/my_env.sh

#增加如下内容

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

#然后应用生效
source /etc/profile.d/my_env.sh

集群启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cd ~/bin

vim kf.sh

#脚本如下:

#! /bin/bash

case $1 in
"start"){
for i in hadoop133 hadoop134 hadoop135
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop133 hadoop134 hadoop135
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac

#添加执行权限
chmod +x kf.sh

Flume部署

解压安装

1
2
3
4
5
#解压
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

#修改名称
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

解决jar包冲突

1
rm /opt/module/flume/lib/guava-11.0.2.jar

日志采集Flume

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可以选择TailDirSource和KafkaChannel,并配置日志校验拦截器。
选择TailSource和KafkaChannel的原因如下:

①TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势

TailDirSource:断电续传、多目录。Flume1.6以前需要自己定义Source记录每次读取文件位置,实现断点续传。

EXECSource:可以实时搜集数据,但是再Flume不运行或者Shell命令出错的情况下,数据会丢失

SpoolingDirectorySource:监控目录,支持断点续传

②KafkaChannel

采用KafkaChannel,省去了Sink,提高了效率

日志采集Flume配置:

日志采集配置实操

①创建Flume配置文件

在flume目录下创建job文件夹,并创建配置文件

1
2
3
mkdir job

vim job/file_to_kafka.conf

②配置文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装
a1.sources.r1.channels = c1

③编写Flume拦截器

pom依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

④JSONUtil类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.atguigu.gmall.flume.utils;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true 不是:返回false
* */
public static boolean isJSONValidate(String log){
try {
JSONObject.parseObject(log);
return true;
}catch (JSONException e){
return false;
}
}
}

⑤ETLInterceptor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.atguigu.gmall.flume.interceptor;

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

//1、获取body当中的数据并转成字符串
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
if (JSONUtil.isJSONValidate(log)) {
return event;
} else {
return null;
}
}

@Override
public List<Event> intercept(List<Event> list) {

Iterator<Event> iterator = list.iterator();

while (iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}

return list;
}

public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {

}

}

@Override
public void close() {

}
}

⑥打包
将打好的包放到flume/lib文件夹下面

日志采集测试

①启动Flume

1
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

②启动KafkaConsumer

1
bin/kafka-console-consumer.sh --bootstrap-server hadoop133:9092 --topic topic_log

Flume启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done

};;
esac
1
2
#增加脚本执行权限
chmod 777 f1.sh