公众号:雨中散步撒哈拉
个人博客网站:https://liudongdong.top
个人导航网站:https://1024dh.top
编辑者:雨中散步撒哈拉

资源获取
关注公众号: 雨中散步撒哈拉
回复关键词: 离线数仓4.0

该模块的主要功能为调度数据质量监控流程。数据质量监控工作流也采用Azkaban进行调度。数据质量监控工作流必定依赖数据仓库工作流,此处为了解耦,利用Azkaban API主动监视数据仓库工作流的执行状态,进而触发数据质量监控工作流。
以下是所有脚本内容:

一、 Azkaban REST API 封装脚本

该脚本主要是对Azkaban API的封装,主要有三个方法:
1.login函数可以登录Azkanban并返回session_id
2.get_exec_id函数可以获取正在执行的工作流程的Execution ID
3.wait_node可以等待指定Flow中某一结点执行完毕并判断其是否执行成功
在Idea中创建一个文件azclient.py,在文件中编写如下内容:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import urllib
import urllib2
import json

# Azkaban API 接口地址
az_url = "http://hadoop102:8081/"
# Azkaban用户名
az_username = "liudd"
# Azkaban密码
az_password = "liudd"
# 工程名称
project = "gmall"
# flow名称
flow = "gmall"


def post(url, data):
    """
    发送post请求到指定网址

    :param url: 指定网址
    :param data: 请求参数
    :return: 请求结果
    """
    body = urllib.urlencode(data)
    request = urllib2.Request(url, body)
    urlopen = urllib2.urlopen(request).read().decode('utf-8')
    return json.loads(urlopen)


def get(url, data):
    """
    发送get请求到指定网址

    :param url: 指定网址
    :param data: 请求参数
    :return: 请求结果
    """
    body = urllib.urlencode(data)
    urlopen = urllib2.urlopen(url + body).read().decode('utf-8')
    return json.loads(urlopen)


def login():
    """
    使用`Authenticate`API进行azkaban身份认证,获取session ID

    :return: 返回session_id
    """
    data = {
        "action": "login",
        "username": az_username,
        "password": az_password
    }
    auth = post(az_url, data)
    return str(auth.get(u"session.id"))


def get_exec_id(session_id):
    """
    使用`Fetch Running Executions of a Flow`API获取正在执行的Flow的ExecId

    :param session_id: 和azkaban通讯的session_id
    :param project: 项目名称
    :param flow: 工作流名称
    :return: 执行ID
    """
    data = {
        "session.id": session_id,
        "ajax": "getRunning",
        "project": project,
        "flow": flow
    }
    execs = get(az_url + "executor?", data).get(u"execIds")
    if execs:
        return str(execs[0])
    else:
        return None


def wait_node(session_id, exec_id, node_id):
    """
    循环使用`Fetch a Flow Execution`API获取指定Flow中的某个节点(job)的执行状态,直到其执行完成

    :param session_id: 和azkaban通讯的session_id
    :param exec_id: 执行ID
    :param node_id: 指定节点(job)
    :return: 该节点是否成功执行完毕
    """
    data = {
        "session.id": session_id,
        "ajax": "fetchexecflow",
        "execid": exec_id
    }
    status = None

    # 若指定Flow中的指定Node(job)的执行状态是未完成的状态,就一直循环
    while status not in ["SUCCEEDED", "FAILED", "CANCELLED", "SKIPPED", "KILLED"]:
        # 获取指定Flow的当前的执行信息
        flow_exec = get(az_url + "executor?", data)
        # 从该Flow的执行信息中获取nodes字段的值,并遍历寻找特定的节点(job)信息,进而获取该节点(job)的状态
        for node in flow_exec.get(u"nodes"):
            if unicode(node_id) == node.get(u"id"):
                status = str(node.get(u"status"))
        print " ".join([node_id, status])
        # 等待1s,进入下一轮循环判断
        time.sleep(1)
    return status == "SUCCEEDED"

二、 ODS层调度脚本

该脚本用于检查ODS层数据质量。
在Idea中创建一个文件check_ods.py,在文件中编写如下内容:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from azclient import login,wait_node,get_exec_id
from check_notification import get_yesterday

def check_ods(dt, session_id, exec_id):
    """
    检查ODS层数据质量

    :param dt: 日期
    :param session_id: 和azkaban通讯的session_id
    :param exec_id: 指定的执行ID
    :return: None
    """
    if wait_node(session_id, exec_id, "hdfs_to_ods_db") and wait_node(session_id, exec_id, "hdfs_to_ods_log"):
        os.system("bash check_ods.sh " + dt)

if __name__ == '__main__':
    argv = sys.argv
    # 获取session_id
    session_id = login()

    # 获取执行ID。只有在原Flow正在执行时才能获取
    exec_id = get_exec_id(session_id)

    # 获取日期,如果不存在取昨天
    if len(argv) >= 2:
        dt = argv[1]
    else:
        dt = get_yesterday()

    # 检查各层数据质量
    if exec_id:
        check_ods(dt, session_id, exec_id)

三、 DWD层调度脚本

该脚本用于检查DWD层数据质量。
在Idea中创建一个文件check_dwd.py,在文件中编写如下内容:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from azclient import login, wait_node, get_exec_id
from check_notification import get_yesterday


def check_dwd(dt, session_id, exec_id):
    """
    检查DWD层数据质量

    :param dt: 日期
    :param session_id: 和azkaban通讯的session_id
    :param exec_id: 指定的执行ID
    :return: None
    """
    if wait_node(session_id, exec_id, "ods_to_dwd_db") and wait_node(session_id, exec_id, "ods_to_dwd_log"):
        os.system("bash check_dwd.sh " + dt)


if __name__ == '__main__':
    argv = sys.argv
    # 获取session_id
    session_id = login()

    # 获取执行ID。只有在原Flow正在执行时才能获取
    exec_id = get_exec_id(session_id)

    # 获取日期,如果不存在取昨天
    if len(argv) >= 2:
        dt = argv[1]
    else:
        dt = get_yesterday()

    # 检查各层数据质量
    if exec_id:
        check_dwd(dt, session_id, exec_id)

四、 DIM层调度脚本

该脚本用于检查DIM层数据质量。
在Idea中创建一个文件check_dim.py,在文件中编写如下内容:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from azclient import login, wait_node, get_exec_id
from check_notification import get_yesterday


def check_dim(dt, session_id, exec_id):
    """
    检查DIM层数据质量

    :param dt: 日期
    :param session_id: 和azkaban通讯的session_id
    :param exec_id: 指定的执行ID
    :return: None
    """
    if wait_node(session_id, exec_id, "ods_to_dim_db"):
        os.system("bash check_dim.sh " + dt)


if __name__ == '__main__':
    argv = sys.argv
    # 获取session_id
    session_id = login()

    # 获取执行ID。只有在原Flow正在执行时才能获取
    exec_id = get_exec_id(session_id)

    # 获取日期,如果不存在取昨天
    if len(argv) >= 2:
        dt = argv[1]
    else:
        dt = get_yesterday()

    # 检查各层数据质量
    if exec_id:
        check_dim(dt, session_id, exec_id)

五、Azkaban工作流配置文件

1.在Idea中创建一个文件azkaban.project,在文件中编写如下内容:

azkaban-flow-version: 2.0

2.在Idea中创建一个文件data_supervisor.flow,在文件中编写如下内容:

nodes:
  - name: check_ods
    type: command
    config:
     command: python check_ods.py ${dt}

  - name: check_dwd
    type: command
    config:
     command: python check_dwd.py ${dt}

  - name: check_dim
    type: command
    config:
     command: python check_dim.py ${dt}
    
  - name: check_notification
    type: command
    dependsOn:
         - check_ods
         - check_dwd
         - check_dim
    config:
     command: python check_notification.py ${alert} ${dt}

3.将所有文件打包成data_supervisor.zip文件
image.png

4.在Azkaban框架中新建项目并上传该文件,可看到如下图所示工作流。
image.png

5.先启动数仓工作流,在执行过程中,启动质量监控工作流,并传入如下参数
image.png

等待任务执行完毕,观察邮箱是否有告警邮件

六、可视化模块

该模块的主要作用是对数据质量监控结果进行可视化展示。
检测结果可以采用Superset进行可视化展示。具体配置步骤如下:
1.在Superset中新建数据库连接
image.png

image.png

注:mysql://root:000000@hadoop102:3306/data_supervisor?charset=utf8
2.点击“Datasets”,然后点击“+Dataset”,将所有数据表格都导入为dataset
image.png

image.png

image.png

五张表格全部添加后,如下图所示。
image.png

3.新建一个dashboard,并命名,如下图所示。
image.png

image.png

4.新建一张图表并保存到dashboard,在chart页面中选择新建chart,如下图所示。
image.png

image.png

配置chart内容,如下图所示。
image.png

在Metrics中添加value,value_min, value_max三列,然后点击“run”,就完成了chart的配置流程,如下图所示。
image.png

点击“save”,保存chart到dashboard,如下图所示。
image.png

5.为所有监控的指标创建图表,并保存到dashboard,如下图所示。
image.png

上一篇 下一篇