个人跟着尚硅谷数仓项目做的笔记,仅供学习使用.
数据仓库
数据仓库概念
数据仓库(Data Warehouse) :是为企业制定决策,提供数据支持的。可以帮助企业改进业务流程、提高产品质量等。
ODS :数据备份,原封不动地把输入的数据存放起来
DWD :主要做数据清洗
DWS :预聚合,避免大表重复join,消耗性能和资源
ADS :存放最终的指标,并输出到各个系统中
数据仓库并不是数据的最终目的地,二十为数据最终的目的地做好准备。这些准备包括对数据的:备份、清洗、聚合、统计 等。
数据仓库需求
离线需求
实时需求
技术选型
技术选型主要考虑因素:数据量大小、业务需求、业内经验、技术成熟度、开发维护成本、总成本预算
数据流程设计
具体版本号选择
用户行为
用户行为概述
用户行为日志的内容,主要包括用户的各项行为信息 以及行为所处的环境信息 。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点 。
目前主流的埋点方式,有代码埋点 (前端/后端)、可视化埋点 、全埋点 等。
代码埋点 是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。
可视化埋点 只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。
全埋点 是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。
用户行为日志
本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。
页面浏览记录
页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。
动作记录
动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。
曝光记录
曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。
启动记录
启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。
错误记录
启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。
集群软件安装
前提准备
准备三台机器,并配置好免密登录
jdk安装
把安装包上传到/opt/software下
1 2 3 4 5 6 # 解压到/opt/module/目录下 tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/ # 进入module目录 cd /opt/module/ # 把解压后的名字修改为jdk /opt/module/
配置环境变量
1 2 # 新建sudo vim /etc/profile.d/my_env.sh sudo vim /etc/profile.d/my_env.sh
添加如下内容:
1 2 3 # JAVA_HOME export JAVA_HOME=/opt/module/jdk export PATH=$PATH:$JAVA_HOME/bin
环境变量生效
1 source /etc/profile.d/my_env.sh
三台机器都需要安装
集群查看所有进程脚本
在~/bin目录下创建脚本
添加如下内容:
1 2 3 4 5 6 7 # ! /bin/bash for i in hadoop102 hadoop103 hadoop104 do echo --------- $i ---------- ssh $i "$*" done
修改执行权限
启动脚本
Hadoop部署
集群规划
hadoop133
hadoop134
hadoop135
HDFS
NameNode、DataNode
DataNode
SecondaryNameNode、DataNode
YARN
NodeManager
ResourceManager、NodeManager
NodeManager
上传安装包
将安装包上传到 /opt/software/目录下
解压
1 2 3 4 5 # 解压到/opt/moudle目录 tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/ # 重命名 mv /opt/module/hadoop-3.1.3 /opt/module/hadoop
添加环境变量
1 2 # 打开/etc/profile.d/my_env.sh文件 sudo vim /etc/profile.d/my_env.sh
追加hadoop路径
1 2 3 4 # HADOOP_HOME export HADOOP_HOME=/opt/module/hadoop export PATH=$PATH:$HADOOP_HOME/bin export PATH=$PATH:$HADOOP_HOME/sbin
环境变量生效
1 source /etc/profile.d/my_env.sh
配置集群
配置core-site.xml
1 2 cd $HADOOP_HOME/etc/hadoop vim core-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://hadoop133:8020</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/module/hadoop/data</value > </property > <property > <name > hadoop.http.staticuser.user</name > <value > atguigu</value > </property > <property > <name > hadoop.proxyuser.atguigu.hosts</name > <value > *</value > </property > <property > <name > hadoop.proxyuser.atguigu.groups</name > <value > *</value > </property > <property > <name > hadoop.proxyuser.atguigu.users</name > <value > *</value > </property > </configuration >
配置hdfs-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <?xml version="1.0" encoding="UTF-8" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > dfs.namenode.http-address</name > <value > hadoop133:9870</value > </property > <property > <name > dfs.namenode.secondary.http-address</name > <value > hadoop135:9868</value > </property > <property > <name > dfs.replication</name > <value > 3</value > </property > </configuration >
配置yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 <?xml version="1.0" encoding="UTF-8" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.resourcemanager.hostname</name > <value > hadoop134</value > </property > <property > <name > yarn.nodemanager.env-whitelist</name > <value > JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value > </property > <property > <name > yarn.scheduler.minimum-allocation-mb</name > <value > 512</value > </property > <property > <name > yarn.scheduler.maximum-allocation-mb</name > <value > 4096</value > </property > <property > <name > yarn.nodemanager.resource.memory-mb</name > <value > 4096</value > </property > <property > <name > yarn.nodemanager.pmem-check-enabled</name > <value > true</value > </property > <property > <name > yarn.nodemanager.vmem-check-enabled</name > <value > false</value > </property > </configuration >
配置mapred-site.xml
1 2 3 4 5 6 7 8 9 10 <?xml version="1.0" encoding="UTF-8" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > </configuration >
配置workers
1 vim /opt/module/hadoop/etc/hadoop/workers
添加集群信息
1 2 3 hadoop133 hadoop134 hadoop135
注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:
配置mapred-site.xml
增加如下配置:
1 2 3 4 5 6 7 8 9 10 11 <property > <name > mapreduce.jobhistory.address</name > <value > hadoop133:10020</value > </property > <property > <name > mapreduce.jobhistory.webapp.address</name > <value > hadoop133:19888</value > </property >
配置日志聚集
日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上。
日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。
注意:开启日志聚集功能,需要重新启动NodeManager 、ResourceManager和HistoryManager。
配置yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <property > <name > yarn.log-aggregation-enable</name > <value > true</value > </property > <property > <name > yarn.log.server.url</name > <value > http://hadoop133:19888/jobhistory/logs</value > </property > <property > <name > yarn.log-aggregation.retain-seconds</name > <value > 604800</value > </property >
启动集群
如果是第一次启动,需要先格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)
格式化NameNode
1 2 # $hadoop_home 下执行bin/hdfs namenode -format
出现以上日志表示格式化成功
启动HDFS
启动YARN
ResourceManager是部署在hadoop134上面的
我们在各个服务器上查看一下jps
每个节点都有一个DateNode和NodeManager,然后NameNode、ResourceManager、SecondaryNameNode每个节点一个
集群脚本
添加如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # !/bin/bash if [ $# -lt 1 ] then echo "No Args Input..." exit ; fi case $1 in "start") echo " =================== 启动 hadoop集群 ===================" echo " --------------- 启动 hdfs ---------------" ssh hadoop102 "/opt/module/hadoop/sbin/start-dfs.sh" echo " --------------- 启动 yarn ---------------" ssh hadoop103 "/opt/module/hadoop/sbin/start-yarn.sh" echo " --------------- 启动 historyserver ---------------" ssh hadoop102 "/opt/module/hadoop/bin/mapred --daemon start historyserver" ;; "stop") echo " =================== 关闭 hadoop集群 ===================" echo " --------------- 关闭 historyserver ---------------" ssh hadoop102 "/opt/module/hadoop/bin/mapred --daemon stop historyserver" echo " --------------- 关闭 yarn ---------------" ssh hadoop103 "/opt/module/hadoop/sbin/stop-yarn.sh" echo " --------------- 关闭 hdfs ---------------" ssh hadoop102 "/opt/module/hadoop/sbin/stop-dfs.sh" ;; *) echo "Input Args Error..." ;; esac
修改脚本权限:
Zookeper部署
集群规划
每个节点都部署一个Zookeeper即可
解压安装
把安装包上传到/opt/software目录下
1 2 3 4 5 # 解压到/opt/module下 tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/ # 修改名称为zookeeper mv apache-zookeeper-3.5.7-bin/ zookeeper
配置服务器编号
1 2 3 4 5 6 7 8 # 在zookeeper目录下创建zkData mkdir zkData # 在zkData目录下创建一个myid的文件 vim myid # 在文件中添加与server对应的编号: 3
配置zoo.cfg
1 2 3 4 5 # 重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg mv zoo_sample.cfg zoo.cfg # 修改zoo.cfg文件 vim zoo.cfg
zoo.cfg修改内容:
1 2 3 4 5 6 7 # 修改数据存储路径配置 /opt/module/zookeeper/zkData # server.3=hadoop133:2888:3888 server.4=hadoop134:2888:3888 server.5=hadoop135:2888:3888
集群启停脚本
在hadoop133机器上创建脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # 进入~/bin cd ~/bin # 创建zk.sh vim zk.sh # 添加如下内容 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh start" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop" done };; "status"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh status" done };; esac # 增加脚本执行权限 chmod 777 zk.sh
Kafka部署
集群规划
每个节点一个即可
解压安装
把安装包上传到/opt/software目录下
1 2 3 4 5 # 解压到/opt/module tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/ # 重命名 mv kafka_2.12-3.0.0/ kafka
修改配置
1 2 3 4 5 # 进入kafka目录 cd config/ # 修改server.properties文件 vim server.properties
修改内容:
1 2 3 broker.id=0 log.dirs=/opt/module/kafka/datas zookeeper.connect=hadoop133:2181,hadoop134:2181,hadoop135:2181/kafka
配置环境变量
在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置
1 2 3 4 5 6 7 8 9 10 sudo vim /etc/profile.d/my_env.sh # 增加如下内容 # KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin # 然后应用生效 source /etc/profile.d/my_env.sh
集群启停脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 cd ~/bin vim kf.sh # 脚本如下: # ! /bin/bash case $1 in "start"){ for i in hadoop133 hadoop134 hadoop135 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in hadoop133 hadoop134 hadoop135 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac # 添加执行权限 chmod +x kf.sh
Flume部署
解压安装
1 2 3 4 5 # 解压 tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ # 修改名称 mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
解决jar包冲突
1 rm /opt/module/flume/lib/guava-11.0.2.jar
日志采集Flume
按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可以选择TailDirSource和KafkaChannel,并配置日志校验拦截器。
选择TailSource和KafkaChannel的原因如下:
①TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断电续传、多目录。Flume1.6以前需要自己定义Source记录每次读取文件位置,实现断点续传。
EXECSource:可以实时搜集数据,但是再Flume不运行或者Shell命令出错的情况下,数据会丢失
SpoolingDirectorySource:监控目录,支持断点续传
②KafkaChannel
采用KafkaChannel,省去了Sink,提高了效率
日志采集Flume配置:
日志采集配置实操
①创建Flume配置文件
在flume目录下创建job文件夹,并创建配置文件
1 2 3 mkdir job vim job/file_to_kafka.conf
②配置文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #定义组件 a1.sources = r1 a1.channels = c1 #配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder #配置channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #组装 a1.sources.r1.channels = c1
③编写Flume拦截器
pom依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 <dependencies > <dependency > <groupId > org.apache.flume</groupId > <artifactId > flume-ng-core</artifactId > <version > 1.9.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.62</version > </dependency > </dependencies > <build > <plugins > <plugin > <artifactId > maven-compiler-plugin</artifactId > <version > 2.3.2</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <artifactId > maven-assembly-plugin</artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
④JSONUtil类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.atguigu.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.JSONException;public class JSONUtil { public static boolean isJSONValidate (String log) { try { JSONObject.parseObject(log); return true ; }catch (JSONException e){ return false ; } } }
⑤ETLInterceptor类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.atguigu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { byte [] body = event.getBody(); String log = new String (body, StandardCharsets.UTF_8); if (JSONUtil.isJSONValidate(log)) { return event; } else { return null ; } } @Override public List<Event> intercept (List<Event> list) { Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if (intercept(next)==null ){ iterator.remove(); } } return list; } public static class Builder implements Interceptor .Builder{ @Override public Interceptor build () { return new ETLInterceptor (); } @Override public void configure (Context context) { } } @Override public void close () { } }
⑥打包
将打好的包放到flume/lib文件夹下面
日志采集测试
①启动Flume
1 bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
②启动KafkaConsumer
1 bin/kafka-console-consumer.sh --bootstrap-server hadoop133:9092 --topic topic_log
Flume启停脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac
1 2 # 增加脚本执行权限 chmod 777 f1.sh