import os import subprocess import sys from utils.common_util import CommonUtil from utils.db_util import DBUtil sys.path.append(os.path.dirname(sys.path[0])) def generate_conf_file(db_name, source_tb, sink_tb, date_info, week): conf_content = f""" env {{ execution.parallelism = 1 job.mode = "BATCH" }} source {{ Jdbc {{ url = "jdbc:mysql://rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/{db_name}?characterEncoding=UTF-8&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false&zeroDateTimeBehavior=convertToNull" driver = "com.mysql.cj.jdbc.Driver" user = "adv_yswg" password = "HCL1zcUgQesaaXNLbL37O5KhpSAy0c" partition_column="id" partition_num=100 fetch_size=40960 parallelism=30 query = "select id,search_term,1 as state,`week`,now() as created_time,now() as updated_time,'{date_info}' as date_info from {source_tb} where `week` = {week} and LENGTH(search_term) > 2" }} }} transform {{ }} sink {{ jdbc {{ driver = "org.postgresql.Driver" parallelism = 60 batch_size = 40960 user = "postgres" password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" url = "jdbc:postgresql://192.168.10.223:5432/{db_name}" generate_sink_sql = true database = "{db_name}" table = "public.{sink_tb}" }} }} """ file_name = f"{sink_tb}.conf" with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file: conf_file.write(conf_content) return file_name def execute_cmd(file): cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'" print(f"当前执行的命令是:{cmd}") result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("命令执行成功!") print("标准输出:") print(result.stdout) else: print("命令执行失败!") print("错误输出:") print(result.stderr) raise Exception(f"{cmd},命令执行失败!") def remove_conf(file): cmd = f"rm -f /home/chenyuanjie/conf/{file}" os.system(cmd) def remove_old_data(sink_tb, date_info_pre, site_name): sql = f""" DELETE FROM {sink_tb} WHERE date_info < '{date_info_pre}'; """ pg14_engine = DBUtil.get_db_engine("postgresql_14", site_name) DBUtil.engine_exec_sql(pg14_engine, sql) def get_pre_week(date_info): engine = DBUtil.get_db_engine("mysql", "us") with engine.connect() as connection: sql = f""" select year_week from date_20_to_30 where year_week < '{date_info}' order by year_week desc limit 1 """ result = connection.execute(sql) pre_week = result.cursor.fetchone()[0] return pre_week if __name__ == "__main__": date_info = CommonUtil.get_sys_arg(1, None) site_name = CommonUtil.get_sys_arg(2, None) assert date_info is not None, "date_info 不能为空!" assert site_name is not None, "site_name 不能为空!" year, week = CommonUtil.split_month_week_date("week", date_info) date_info_pre = get_pre_week(date_info) db_map = { "uk": "selection_uk", "de": "selection_de", "es": "selection_es", "fr": "selection_fr", "it": "selection_it" } source_tb_map = { "uk": f"uk_brand_analytics_{year}", "de": f"de_brand_analytics_{year}", "es": f"es_brand_analytics_{year}", "fr": f"fr_brand_analytics_{year}", "it": f"it_brand_analytics_{year}" } sink_tb_map = { "uk": "uk_search_term", "de": "de_search_term", "es": "es_search_term", "fr": "fr_search_term", "it": "it_search_term" } db_name = db_map[site_name] source_tb = source_tb_map[site_name] sink_tb = sink_tb_map[site_name] # 删除历史数据 remove_old_data(sink_tb, date_info_pre, site_name) # 生成配置文件 file = generate_conf_file(db_name, source_tb, sink_tb, date_info, week) # 执行seatunnel启动命令 try: execute_cmd(file) except Exception as e: print(str(e)) CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步失败】", f"{file}执行失败,错误信息:{str(e)}") # 删除配置文件 remove_conf(file) CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步成功】", "悉知") pass