1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
import subprocess
import sys
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
sys.path.append(os.path.dirname(sys.path[0]))
def generate_conf_file(db_name, source_tb, sink_tb, date_info, week):
conf_content = f"""
env {{
execution.parallelism = 1
job.mode = "BATCH"
}}
source {{
Jdbc {{
url = "jdbc:mysql://rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/{db_name}?characterEncoding=UTF-8&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false&zeroDateTimeBehavior=convertToNull"
driver = "com.mysql.cj.jdbc.Driver"
user = "adv_yswg"
password = "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"
partition_column="id"
partition_num=100
fetch_size=40960
parallelism=30
query = "select id,search_term,1 as state,`week`,now() as created_time,now() as updated_time,'{date_info}' as date_info from {source_tb} where `week` = {week} and LENGTH(search_term) > 2"
}}
}}
transform {{
}}
sink {{
jdbc {{
driver = "org.postgresql.Driver"
parallelism = 60
batch_size = 40960
user = "postgres"
password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
url = "jdbc:postgresql://192.168.10.223:5432/{db_name}"
generate_sink_sql = true
database = "{db_name}"
table = "public.{sink_tb}"
}}
}}
"""
file_name = f"{sink_tb}.conf"
with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file:
conf_file.write(conf_content)
return file_name
def execute_cmd(file):
cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'"
print(f"当前执行的命令是:{cmd}")
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
print("命令执行成功!")
print("标准输出:")
print(result.stdout)
else:
print("命令执行失败!")
print("错误输出:")
print(result.stderr)
raise Exception(f"{cmd},命令执行失败!")
def remove_conf(file):
cmd = f"rm -f /home/chenyuanjie/conf/{file}"
os.system(cmd)
def remove_old_data(sink_tb, date_info_pre, site_name):
sql = f"""
DELETE FROM {sink_tb} WHERE date_info < '{date_info_pre}';
"""
pg14_engine = DBUtil.get_db_engine("postgresql_14", site_name)
DBUtil.engine_exec_sql(pg14_engine, sql)
def get_pre_week(date_info):
engine = DBUtil.get_db_engine("mysql", "us")
with engine.connect() as connection:
sql = f"""
select year_week
from date_20_to_30
where year_week < '{date_info}'
order by year_week desc
limit 1 """
result = connection.execute(sql)
pre_week = result.cursor.fetchone()[0]
return pre_week
if __name__ == "__main__":
date_info = CommonUtil.get_sys_arg(1, None)
site_name = CommonUtil.get_sys_arg(2, None)
assert date_info is not None, "date_info 不能为空!"
assert site_name is not None, "site_name 不能为空!"
year, week = CommonUtil.split_month_week_date("week", date_info)
date_info_pre = get_pre_week(date_info)
db_map = {
"uk": "selection_uk",
"de": "selection_de",
"es": "selection_es",
"fr": "selection_fr",
"it": "selection_it"
}
source_tb_map = {
"uk": f"uk_brand_analytics_{year}",
"de": f"de_brand_analytics_{year}",
"es": f"es_brand_analytics_{year}",
"fr": f"fr_brand_analytics_{year}",
"it": f"it_brand_analytics_{year}"
}
sink_tb_map = {
"uk": "uk_search_term",
"de": "de_search_term",
"es": "es_search_term",
"fr": "fr_search_term",
"it": "it_search_term"
}
db_name = db_map[site_name]
source_tb = source_tb_map[site_name]
sink_tb = sink_tb_map[site_name]
# 删除历史数据
remove_old_data(sink_tb, date_info_pre, site_name)
# 生成配置文件
file = generate_conf_file(db_name, source_tb, sink_tb, date_info, week)
# 执行seatunnel启动命令
try:
execute_cmd(file)
except Exception as e:
print(str(e))
CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步失败】", f"{file}执行失败,错误信息:{str(e)}")
# 删除配置文件
remove_conf(file)
CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步成功】", "悉知")
pass