Hive数据清除工具

Posted by JD on June 27, 2022

1. 背景

Hive表数据需要进行删除,不同表有不同的保留策略。因此,项目中需要有个统一管理的程序。

2. 流程图

3. 详细设计

3.1 配置参数

以表名粒度来进行配置,主要有

序号 字段名 注释 实例
1 TABLE_NAME 表名(包含SCHEMA) TML.MS_GEN_ACCT_DAY
2 DATE_COLUMN 日期字段名称 BIZ_DATE
3 DATE_FORMAT_TYPE 日期类型 1:yyyy-MM-dd 2:yyyyMMdd 1
4 IS_PARTITIONED 是否分区表 0:非分区 1:分区 1
5 START_DATE 开始时间 yyyy-MM-dd 2020-01-01
6 END_DATE 结束时间 yyyy-MM-dd 2022-06-27
7 MONTH_FIRST_N_DAY 保留月初N天,-1: 无需保留 1
8 MONTH_LAST_N_DAY 保留月底N天,-1: 无需保留 2
9 LASTEST_N_DAY 保留最近N天,-1: 无需保留 40
10 MONTH_N_DAY 保留每月第N天,-1: 无需保留 -1
11 IS_USING 是否使用 0:停用 1:使用 1

数据库建表语句,如下:

 CREATE TABLE T_HIVE_GC_CONF(
 TABLE_NAME          VARCHAR2(128) NOT NULL   
,DATE_COLUMN         VARCHAR2(32)  NOT NULL   
,DATE_FORMAT_TYPE    VARCHAR2(10)  NOT NULL   
,IS_PARTITIONED      VARCHAR2(10)  NOT NULL   
,START_DATE          VARCHAR2(10)  NOT NULL   
,END_DATE            VARCHAR2(10)  NOT NULL   
,MONTH_FIRST_N_DAY   INT           NOT NULL   
,MONTH_LAST_N_DAY    INT           NOT NULL   
,LASTEST_N_DAY       INT           NOT NULL   
,MONTH_N_DAY         INT           NOT NULL   
,CREATE_TIME         DATE          DEFAULT SYSDATE
,IS_USING            VARCHAR2(10)  NOT NULL   
);

COMMENT ON TABLE T_HIVE_GC_CONF IS '按日期删除数据配置表';

COMMENT ON COLUMN T_HIVE_GC_CONF.TABLE_NAME        IS '表名(包含SCHEMA)'                 ;
COMMENT ON COLUMN T_HIVE_GC_CONF.DATE_COLUMN       IS '日期字段名称'                     ;
COMMENT ON COLUMN T_HIVE_GC_CONF.DATE_FORMAT_TYPE  IS '日期类型 1:yyyy-MM-dd 2:yyyyMMdd' ;
COMMENT ON COLUMN T_HIVE_GC_CONF.IS_PARTITIONED    IS '是否分区表 0:非分区 1:分区'       ;
COMMENT ON COLUMN T_HIVE_GC_CONF.START_DATE        IS '开始时间 yyyy-MM-dd'              ;
COMMENT ON COLUMN T_HIVE_GC_CONF.END_DATE          IS '结束时间 yyyy-MM-dd'              ;
COMMENT ON COLUMN T_HIVE_GC_CONF.MONTH_FIRST_N_DAY IS '保留月初N天,-1: 无需保留'         ;
COMMENT ON COLUMN T_HIVE_GC_CONF.MONTH_LAST_N_DAY  IS '保留月底N天,-1: 无需保留'         ;
COMMENT ON COLUMN T_HIVE_GC_CONF.LASTEST_N_DAY     IS '保留最近N天,-1: 无需保留'         ;
COMMENT ON COLUMN T_HIVE_GC_CONF.MONTH_N_DAY       IS '保留每月第N天,-1: 无需保留'       ;
COMMENT ON COLUMN T_HIVE_GC_CONF.CREATE_TIME       IS '创建时间'                         ;
COMMENT ON COLUMN T_HIVE_GC_CONF.IS_USING          IS '是否使用 0:停用 1:使用'           ;

3.2 传入参数

可通过2种方式来传入参数,分别是手动Trigger DAG和其他DAG进行调用。

3.2.1 手动Trigger DAG

在Airflow页面进行搜索garbage_collection_unit,然后点击Trigger DAG w/params,会要求填入Hive表名(TABLE_NAME)。

3.2.2 自动Trigger DAG

通过其他DAG通过TriggerDagRunOperator来调用garbage_collection_unit。主要三个参数,分别是task_id、trigger_dag_id和conf,其中conf要传入我们要删除数据的表名。例子如下

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger = TriggerDagRunOperator(
	task_id="test_trigger_dagrun",
	trigger_dag_id="garbage_collection_unit",
    wait_for_completion=True,
	conf={"trigger_table_name": "TML.MS_GEN_ACCT_DAY"},
	dag=dag
)

3.3 清理数据

清理数据有2种情况,分别是日期维度的分区表和非分区表。

3.3.1 分区表

Drop Partitions方式来删除数据,语句如下

ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...]
 ``[IGNORE PROTECTION] [PURGE];      -- (Note: PURGE available in Hive ``1.2``.``0` `and later, IGNORE PROTECTION not available ``2.0``.``0` `and later)

3.3.2 非分区表

Insert Overwrite方式来删除数据,语句如下

INSERT OVERWRITE TABLE tablename1 select_statement1 FROM from_statement;

3.4 返回结果

3.4.1 手动执行结果

直接查看Airflow的DAG的运行状态。

3.4.2 自动执行结果

通过设置wait_for_completion=True来获取garbage_collection_unit的运行状态。

4. 代码详解

4.1 start

dummy_steps = [
    {'task_id': 'start','trigger_rule': TriggerRule.ALL_SUCCESS, 'doc': '开始'},
    {'task_id': 'clean_success','trigger_rule': TriggerRule.ALL_SUCCESS, 'doc': '数据清除成功'},
    {'task_id': 'end','trigger_rule': TriggerRule.ONE_SUCCESS, 'doc': '结束'}
]

for dummy_step in dummy_steps:
    globals()[dummy_step['task_id']] = DummyOperator(task_id=dummy_step['task_id'],
                                                     trigger_rule=dummy_step['trigger_rule'],                                                                    doc=dummy_step['doc'],
                                                     dag=dag)

DummyOperator并未做什么,只是衔接任务作用。其中task_id=’end’,上游是BranchPythonOperator,会有skip状态,需要将触发条件改成TriggerRule.ONE_SUCCESS

4.2 receiving_parameter

def receiving_parameter(**kwargs):
    """接收外部DAG或本DAG的传入参数"""
    hive_table_name = kwargs.get('dag_run').conf.get('hive_table_name')
    trigger_table_name = kwargs.get('dag_run').conf.get('trigger_table_name')
    if not hive_table_name and not trigger_table_name:
        raise AirflowFailException('缺少表名,请注意填写参数')
    table_name = trigger_table_name if trigger_table_name else hive_table_name
    kwargs['ti'].xcom_push(key='table_name', value=table_name)

rp = PythonOperator(task_id='receiving_parameter', python_callable=receiving_parameter, dag=dag)
  • 有2种方式来传入参数,hive_table_name是从手动调用DAG的时候传参,trigger_table_name是从其他DAG调用此DAG的时候传参。
  • 若2种参数都为空,则抛出错误异常,AirflowFailException(‘缺少表名,请注意填写参数’)
  • 优先取trigger_table_name,若为空,则取hive_table_name
  • xcom_push推送表名变量

4.3 data_cleaning

class OracleDictHook(OracleHook):

    def get_first(self, sql: str) -> Dict[str, Any]:
        from cx_Oracle import LOB
        with self.get_conn() as conn:
            cursor = conn.cursor()
            cursor.execute(sql)
            columns = [i[0].lower() for i in cursor.description]
            cursor.rowfactory = lambda *args: dict(
                zip(columns, (i.read() if isinstance(i, LOB) else i for i in args)))
            return cursor.fetchone()

OracleDictHook继承OracleHook类,不过重写了get_first方法。主要是将结果转换成key-value的键值对,便于引用从配置表查询的数据。

ti = kwargs['ti']
table_name = ti.xcom_pull(key='table_name', task_ids='receiving_parameter')

获取从receiving_parameter推送出来的表名参数。

hiveconfs = {
    'hive.exec.dynamic.partition.mode': 'nonstrict',
    'mapred.max.split.size': 134217728,
    'mapred.min.split.size.per.node': 67108864,
    'mapred.min.split.size.per.rack': 67108864,
    'hive.input.format': 'org.apache.hadoop.hive.ql.io.CombineHiveInputFormat',
    'hive.exec.reducers.bytes.per.reducer': 134217728,
    'hive.merge.mapfiles': 'true',
    'hive.merge.mapredfiles': 'true',
    'hive.merge.size.per.task': 134217728,
    'hive.merge.smallfiles.avgsize': 67108864
}

设置hive的参数。

oracle_hook = OracleDictHook(oracle_conn_id='oracle_cognos_ods')

获取连接Oracle的钩子,oracle_conn_id为airflow设置的conn id。

conf_sql = f"""SELECT TABLE_NAME, DATE_COLUMN, DATE_FORMAT_TYPE, IS_PARTITIONED, START_DATE, END_DATE,                             MONTH_FIRST_N_DAY,MONTH_LAST_N_DAY, LASTEST_N_DAY, MONTH_N_DAY, IS_USING 
                 FROM COGNOS_ODS.T_HIVE_GC_CONF WHERE TABLE_NAME='{table_name}' AND IS_USING='1'"""
result = oracle_hook.get_first(sql=conf_sql)

得到T_HIVE_GC_CONF配置表里清除的具体数据,主要有表名、日期字段名称、日期格式、是否分区、删除开始日期,删除结束日期、保留月初N天、保留月底N天、保留最近N天、保留每月第N天。

if not result:
    raise AirflowFailException(f'{table_name}不在删除配置表')
if not result['date_column'] or not result['date_format_type']:
    raise AirflowFailException(f'时间字段名称或类型未配置')
if result['is_partitioned'] not in ['0', '1']:
    raise AirflowFailException('判断分区枚举值设置有问题')
  • 先检查配置表里是否有传入表的信息,若不存在,则抛出错误异常
  • 日期字段名称和类型必须存在,不然无法删除数据
  • 是否分区字段必须正确,0是非分区,1是分区
keep_days_sql = f"""select day_char{result.get('date_format_type')} as date_col from dim_date 
                where day_char1 between '{result.get('start_date')}' and '{result.get('end_date')}'
                and (day_num<={result.get('month_first_n_day')} 
                or days_of_month-day_num<{result.get('month_last_n_day')} 
                or day_char1>=to_char(to_date('{result.get('end_date')}','YYYY-MM-DD')-{result.get('lastest_n_day')},'yyyy-mm-dd')
                or day_num={result.get('month_n_day')})"""
keep_days_res = oracle_hook.get_records(sql=keep_days_sql)
keep_days = [keep_day[0] for keep_day in keep_days_res]

按照删数策略,查询dim_date表来获取具体应该保留的天数,get_records方法返回需要保留的日期。keep_days是已经转成List[str]。

hive_cli = HiveCliHook(hive_cli_conn_id='hive_cli_default')

通过HiveCliHook类来连接hive。

dates_str = "','".join(keep_days)
overwrite_table = f"""INSERT OVERWRITE TABLE {result.get('table_name')}
                      SELECT * FROM {result.get('table_name')} 
                      WHERE {result.get('date_column')}<'{result.get('start_date')}'
                      OR {result.get('date_column')}>'{result.get('end_date')}'
                      UNION ALL
                      SELECT * FROM {result.get('table_name')} 
                      WHERE {result.get('date_column')} BETWEEN '{result.get('start_date')}' and '{result.get('end_date')}'
                      AND {result.get('date_column')} IN ('{dates_str}');"""
print('overwrite_table: ', overwrite_table)
exec_res = hive_cli.run_cli(hql=overwrite_table, hive_conf=hiveconfs)

删数的表是非分区表,需要两部分通过UNION ALL拼接,一分部是在开始日期和结束日期区间之外的日期,另一部分是在开始日期和结束日期区间内需要保留的日期。然后通过run_cli方法执行hql。

dates_str = ''.join([f""",{result.get('date_column')}<>'{keep_day[0]}'""" for keep_day in keep_days_res])
drop_partition = f"""ALTER TABLE {result.get('table_name')} DROP IF EXISTS PARTITION                                              ({result.get('date_column')}>='{result.get('start_date')}',
                     {result.get('date_column')}<='{result.get('end_date')}' {dates_str});"""
exec_res = hive_cli.run_cli(hql=drop_partition)
print('drop_partition: ', drop_partition)

删数的表是分区表,通过限制日期是在开始日期和结束日期区间内,并且不等于要保留的日期,其他都是要删除日期。

execute_result = 'clean_success' if 'OK' in exec_res else 'clean_fail'

若运行成功,run_cli方法会返回带有OK的字符串。

ti.xcom_push(key='execute_result', value=execute_result)

推送最终运行hql的结果。

4.4 branch_result

def branch_result(**kwargs):
    """判断程序执行结果 失败/成功"""
    return kwargs['ti'].xcom_pull(key='execute_result', task_ids='data_cleaning')

br = BranchPythonOperator(task_id='branch_result', python_callable=branch_result, dag=dag)

拉取data_cleaning中运行删数的hql结果,若成功,则下游到clean_success,否则下游到clean_fail。

4.5 clean_success

可看4.1 start部分。

4.6 clean_fail

def clean_fail_fn(**kwargs):
    raise AirflowFailException

clean_fail = PythonOperator(task_id='clean_fail', python_callable=clean_fail_fn, dag=dag)

直接抛出错误异常,提示执行删数失败。

4.7 end

可看4.1 start部分。

4.8 依赖关系

start >> receiving_parameter >> data_cleaning >> branch_result >> [clean_success, clean_fail] >> end

5. 使用说明

例:

5.1 配置参数表

INSERT INTO T_HIVE_GC_CONF VALUES 
('TML.MS_GEN_ACCT_DAY','BIZ_DATE','1','1','2022-01-01','2022-06-23',1,2,40,-1,SYSDATE,'1');
COMMIT;

5.2 新增任务

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger = TriggerDagRunOperator(
	task_id="test_trigger_dagrun",
	trigger_dag_id="garbage_collection_unit",
    wait_for_completion=True,
	conf={"trigger_table_name": "TML.MS_GEN_ACCT_DAY"},
	dag=dag
)

6. 待优化点

  1. 日期格式单一,可扩展到自适应日期格式
  2. 只针对日期分区,可扩展到适应不同分区
  3. 目前必须配置数据库的配置表(T_HIVE_GC_CONF),只要传入表名。未有修改配置表的记录日志