1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
def export_detail(engine, type, site_name, day):
suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m")
export_flag = len(list(DBUtil.engine_exec_sql(engine,
f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_detail_{suffix}';"))) > 0
# 没有导出的导出
if not export_flag:
print(f"{type}:{site_name}:{day} asin_detail 导出中")
os.system(
f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwt_bsr_nsr_asin_detail_new.py {site_name} {day} {type}")
pass
pass
if __name__ == '__main__':
engine = DBUtil.get_db_engine(DbTypes.postgresql_cluster.name, "us")
for type in ['bsr', 'nsr']:
for site_name in ['us']:
parts = HdfsUtils.read_list(f"/home/big_data_selection/dwd/dwd_{type}_asin_rank/site_name={site_name}/date_type=last30day")
existDays = list(map(lambda item: item[item.find("=") + 1:], parts))
existDays.reverse()
for day in existDays:
if day <= '2023-02-01':
continue
# 判断是否已导出
suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m_%d", )
export_flag = len(list(DBUtil.engine_exec_sql(engine,
f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_rank_{suffix}';"))) > 0
# 没有导出的导出
if not export_flag:
print(f"{type}:{site_name}:{day} asin_rank 导出中")
os.system(
f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day} {type}")
pass
# 导出detail
if day.endswith("-01"):
export_detail(engine, type, site_name, day)
pass
update_sql = f"""
insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type,freeze_flag)
values ('{site_name}', '{day}', '导出PG数据库', 14, '{site_name}_{type}_asin_rank', 'day', '{str(type).upper()}榜单', '是', '{str(type).upper()}榜单对应的TOP100ASIN','postgresql_cluster','enable')
"""
print(update_sql)
DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True)
update_sql = f"""
update workflow_everyday set export_db_type ='postgresql_cluster', freeze_flag='enable'
where table_name = '{site_name}_{type}_asin_rank' and report_date in ('{day}');
"""
DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True)
print(update_sql)
print(f"{type}:{site_name}:{day} success")
pass
pass