网站流量日志项目Day02

如何得到数据

​ 现在的WEB服务器基本都带有日志记录功能,记录的日志就是我们的数据源,有的时候我们想要的数据在日志中没有我们就可以通过埋点的方法来得到数据,前提要分析的网站时自己部门开发的。本项目就是通过采集WEB服务器记录的日志,来对网站进行分析的。

FLUME如何采集数据

使用flume采集数据前提是要知道我们的源数据存放在哪里,知道数据存放在哪里后我们就可以利用 taildir来监控我们源数据及其存放位置的变化,且其具有断点续传的功能并支持正则匹配。

FLUME按需配置如下:

  • 需求:在使用flume采集数据到hdfs的时候 以文件大小进行控制滚动,大小为:128M

    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
  • 当通过flume上传文件至hdfs当中 如果控制文件滚动的条件不满足怎么办?

    如果不满足 位于hdfs上的文件将会一直处于临时状态 xxx.tmp

    a1.sinks.k1.hdfs.rollInterval = 0     时间间隔
    a1.sinks.k1.hdfs.rollSize = 134217728 文件的大小
    a1.sinks.k1.hdfs.rollCount = 0        event数量
    

    解决:基于文件空闲时间滚动

    hdfs.idleTimeout   默认值是0  如果配置指定的时间  比如30s 
    意味着如果30s之内 文件没有数据写入 即是其他的滚动条件不满足 此时已然进行文件的滚动
    避免文件一致处于所谓的临时状态
    

详细配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件

a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /weblog/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /weblog/example.log
a1.sources.r1.filegroups.f2 = /usr/local/nginx/logs/user_defined.log

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /weblog/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.idleTimeout = 20

a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

源数据预处理

预处理

  1. 为什么要进行源数据预处理
    • 采集到的原始数据往往是杂乱不堪的,这些垃圾数据如果不处理掉会影响我们后续的数据分析,更会影响我们数据分析结果的准确性
  2. 进行源数据预处理目的
    • 把不干净的数据 格式不规则的数据 通过预处理变成格式统一规整的结构化数据
  3. 使用什么技术进行源数据预处理
    • MAPRDUCE

编程思路

  • 分区—->key哈希 % reducetasknums
  • 分组—->key相同分为一组
  • 排序—->按照key的字典序排序

mapreduce技巧

  • 涉及多属性数据传递 通常采用建立javabean携带数据 并且需要实现hadoop的序列化机制 Writable
  • 有意识的重写对象toString方法 并且以\001进行字段 的分割,便于后续的数据入库操作
  • 针对本次分析无效的数据 通过采用建立标记位的形式进行逻辑删除

预处理结果处理

  • 源数据处理代码

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            LogsBean logsBean = LogsParser.parser(line);//使用封装好的过滤方法
            if (logsBean != null) {
                // 过滤js/图片/css等静态资源
                LogsParser.filtStaticResource(logsBean, pages);
                /* if (!webLogBean.isValid()) return; */
                k.set(logsBean.toString());
                context.write(k, v);
            }
        }
    
  • 点击流模型的概述(基于mapreduce处理后的原始数据)

    点击流模式是业务模型 客观并不存在 其模式是由一堆业务指标堆积而成。

    点击流模式所描述的是用户在网站持续访问的一条轨迹 是一个线的概念。

  • 点击流模式和原始日志数据区别

    • 原始访问日志是站在网站的角度看待用户访问行为 数据按照时间追加的 是散点状的数据
    • 点击流模型是站在用户的角度看待用户的访问行为 数据一条持续的轨迹线
    • 点击流模型数据可以通过原始日志数据梳理而来
      @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
                String line = value.toString();
      
                String[] fields = line.split("\001");
                if (fields.length < 9) {
                    return;
                }
                //将切分出来的各字段set到weblogbean中
                //fields[0].equals("true")
                v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
                //只有有效记录才进入后续处理
                if (v.isValid()) {
                    //此处用ip地址来标识用户
                    k.set(v.getRemote_addr());
                    context.write(k, v);
                }
            }
        }
                @Override
            protected void reduce(Text key, Iterable<LogsBean> values, Context context) throws IOException, InterruptedException {
                ArrayList<LogsBean> beans = new ArrayList<LogsBean>();
      
    //			for (WebLogBean b : values) {
    //				beans.add(b);
    //			}
      
                // 先将一个用户的所有访问记录中的时间拿出来排序
                try {
                    for (LogsBean bean : values) {
                        LogsBean logsBean = new LogsBean();
                        logsBean.set(bean.isValid(), bean.getRemote_addr(), bean.getRemote_user(), bean.getTime_local(), bean.getRequest(), bean.getStatus(), bean.getBody_bytes_sent(), bean.getHttp_referer(), bean.getHttp_user_agent());
                        beans.add(logsBean);
                    }
      
      
                    //将bean按时间先后顺序排序
                    Collections.sort(beans, new Comparator<LogsBean>() {
      
                        @Override
                        public int compare(LogsBean o1, LogsBean o2) {
                            try {
                                Date d1 = toDate(o1.getTime_local());
                                Date d2 = toDate(o2.getTime_local());
                                if (d1 == null || d2 == null) {
                                    return 0;
                                }
                                return d1.compareTo(d2);
                            } catch (Exception e) {
                                e.printStackTrace();
                                return 0;
                            }
                        }
      
                    });
      
                    /**
                     * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
                     * 核心思想:
                     * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
                     * 否则,就属于不同的session
                     *
                     */
      
                    int step = 1;
                    String session = UUID.randomUUID().toString();
                    for (int i = 0; i < beans.size(); i++) {
                        LogsBean bean = beans.get(i);
                        // 如果仅有1条数据,则直接输出
                        if (1 == beans.size()) {
      
                            // 设置默认停留时长为60s
                            v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
                                    + bean.getStatus());
                            context.write(NullWritable.get(), v);
                            session = UUID.randomUUID().toString();
                            break;
                        }
      
                        // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
                        if (i == 0) {
                            continue;
                        }
      
                        // 求近两次时间差
                        long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
                        // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
      
                        if (timeDiff < 30 * 60 * 1000) {
      
                            v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                                    + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                            context.write(NullWritable.get(), v);
                            step++;
                        } else {
      
                            // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
                            v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                                    + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                            context.write(NullWritable.get(), v);
                            // 输出完上一条之后,重置step编号
                            step = 1;
                            session = UUID.randomUUID().toString();
                        }
      
                        // 如果此次遍历的是最后一条,则将本条直接输出
                        if (i == beans.size() - 1) {
                            // 设置默认停留市场为60s
                            v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
                            context.write(NullWritable.get(), v);
                        }
                    }
      
                } catch (ParseException e) {
                    e.printStackTrace();
      
                }
      
            }
    
  • 会话(session)(基于点击流模型处理后的数据)

    通常业界以前后两条的记录的时间差是否在30分钟以内作为会话判断的标准

    如果小于30分钟 就属于同一个会话

    如果大于30分钟 就是一个新的会话开始

    所谓点击流模型指的是在一个会话内的持续访问轨迹线。

  @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = line.split("\001");
            int step = Integer.parseInt(fields[5]);
            pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
            k.set(pvBean.getSession());
            context.write(k, pvBean);

        }

    }
 
        @Override
        protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {

            // 将pvBeans按照step排序
            ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
            for (PageViewsBean pvBean : pvBeans) {
                PageViewsBean bean = new PageViewsBean();
                try {
                    BeanUtils.copyProperties(bean, pvBean);
                    pvBeansList.add(bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {

                @Override
                public int compare(PageViewsBean o1, PageViewsBean o2) {

                    return o1.getStep() > o2.getStep() ? 1 : -1;
                }
            });

            // 取这次visit的首尾pageview记录,将数据放入VisitBean中
            VisitBean visitBean = new VisitBean();
            // 取visit的首记录
            visitBean.setInPage(pvBeansList.get(0).getRequest());
            visitBean.setInTime(pvBeansList.get(0).getTimestr());
            // 取visit的尾记录
            visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
            visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
            // visit访问的页面数
            visitBean.setPageVisits(pvBeansList.size());
            // 来访者的ip
            visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
            // 本次visit的referal
            visitBean.setReferal(pvBeansList.get(0).getReferal());
            visitBean.setSession(session.toString());

            context.write(NullWritable.get(), visitBean);

        }