import os import subprocess import sys from datetime import datetime, time import time as sleep_time import pandas as pd from sqlalchemy import create_engine from sqlalchemy.engine import Engine from utils.DolphinschedulerHelper import DolphinschedulerHelper from utils.common_util import CommonUtil from utils.db_util import DBUtil sys.path.append(os.path.dirname(sys.path[0])) def generate_conf_file(table_name, db_name): conf_content = f""" env {{ execution.parallelism = 1 job.mode = "BATCH" }} source {{ Jdbc {{ url = "jdbc:postgresql://192.168.10.223:5432/{db_name}" driver = "org.postgresql.Driver" connection_check_timeout_sec = 100 user = "postgres" password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" partition_column = "id" partition_num = 60 fetch_size = 40960 parallelism = 10 query = "select * from public.{table_name}" }} }} transform {{ }} sink {{ jdbc {{ driver = "org.postgresql.Driver" parallelism = 60 batch_size = 40960 user = "postgres" password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" url = "jdbc:postgresql://192.168.10.216:5432/{db_name}" generate_sink_sql = true database = "{db_name}" table = "public.{table_name}" }} }} """ file_name = f"{table_name}.conf" with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file: conf_file.write(conf_content) return file_name def execute_cmd(file): cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'" print(f"当前执行的命令是:{cmd}") result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("命令执行成功!") print("标准输出:") print(result.stdout) else: print("命令执行失败!") print("错误输出:") print(result.stderr) raise Exception(f"{cmd},命令执行失败!") def remove_conf(file): cmd = f"rm -f /home/chenyuanjie/conf/{file}" os.system(cmd) def judge_is_working_hours(): work_start_time = time(19, 0) work_end_time = time(6, 0) # 获取当前时间 current_time = datetime.now().time() # 判断当前时间是否在工作时间范围内 if work_start_time <= current_time <= work_end_time: # 计算距离下班还有多少秒 sleep_seconds = (work_end_time.hour - current_time.hour) * 3600 + \ (work_end_time.minute - current_time.minute) * 60 + \ (work_end_time.second - current_time.second) print("当前处于工作时间,程序将睡眠到下班后执行") sleep_time.sleep(sleep_seconds) else: print("当前不是工作时间,程序正常执行") # def update_zero_to_null(table, site_name): # sql = f""" # BEGIN; # UPDATE {table} SET rank = NULL WHERE rank = 0; # COMMIT; # END; # """ # DBUtil.exec_sql("postgresql", site_name, sql) # CommonUtil.send_wx_msg(["chenyuanjie"], f"【检查{table}更新后的rank】") def get_doris_engine() -> Engine: # Doris连接参数 con = { "url": "jdbc:mysql://192.168.10.218:9030/selection", "username": "root", "pwd": "", "host": "192.168.10.218", "port": 9030, "db_name": "selection" } # 创建Doris的engine engine = create_engine( f"mysql+pymysql://{con['username']}:{con['pwd']}@{con['host']}:{con['port']}/{con['db_name']}") return engine def get_pg6_engine() -> Engine: # 连接到pg6 pg6_conn_str = 'postgresql://postgres:fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS@192.168.10.216:5432/selection' engine = create_engine(pg6_conn_str) return engine def syn_image(engine): # pg14 to doris sql1 = """ INSERT INTO us_asin_image_tmp SELECT asin,img_order_by,data_type,img_url,created_at,updated_at FROM jdbc_pg14.public.us_asin_image; ANALYZE TABLE us_asin_image_tmp; """ print("----------------------------------------------------") print(sql1) with engine.connect() as connection: connection.execute(sql1) print("us_asin_image_tmp同步完成") # us_asin_image_tmp to us_asin_image sql2 = """ TRUNCATE TABLE us_asin_image; INSERT INTO us_asin_image SELECT asin, img_order_by, data_type, img_url, created_at, updated_at FROM ( SELECT asin, img_order_by, data_type, img_url, created_at, updated_at, RANK() over ( PARTITION BY asin ORDER BY created_at DESC ) AS rk FROM us_asin_image_tmp ) t WHERE rk = 1; ANALYZE TABLE us_asin_image; """ print("----------------------------------------------------") print(sql2) # 同步数据 with engine.connect() as connection: connection.execute(sql2) print("us_asin_image同步完成") # 关闭数据库engine engine.dispose() def syn_st_asin(doris_engine, pg6_engine): # pg14 to Doris sql1 = """ INSERT INTO us_all_syn_st_asin SELECT asin,bitmap_to_string(BITMAP_HASH(asin)) AS id,state,updated_at FROM jdbc_pg14.public.us_all_syn_st_asin; ANALYZE TABLE us_all_syn_st_asin; """ print("----------------------------------------------------") print(sql1) with doris_engine.connect() as connection: connection.execute(sql1) print("pg14到Doris同步完成") pg6_conn = pg6_engine.connect() sql2 = """ drop table if exists us_all_syn_st_asin_copy; create table if not exists us_all_syn_st_asin_copy ( like us_all_syn_st_asin_master including ALL ); """ pg6_conn.execute(sql2) print("pg6已清除copy表") cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/us_all_syn_st_asin.conf'" print(f"小水滴启动命令:{cmd}") result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("us_all_syn_st_asin_copy同步成功") print("标准输出:") print(result.stdout) else: print("执行失败!") print("错误输出:") print(result.stderr) raise Exception(f"{cmd},命令执行失败!") print("导出完成,准备创建索引和交换表名") sql3 = f""" CREATE UNIQUE INDEX "us_all_syn_st_asin_asin_idx_{datetime.now().date().strftime("%Y_%m_%d")}" ON "public"."us_all_syn_st_asin_copy" USING btree ( "asin" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST ); CREATE INDEX "us_all_syn_st_asin_state_idx_{datetime.now().date().strftime("%Y_%m_%d")}" ON "public"."us_all_syn_st_asin_copy" USING btree ( "state" "pg_catalog"."int8_ops" ASC NULLS LAST ); ALTER TABLE "public"."us_all_syn_st_asin_copy" ADD CONSTRAINT "us_all_syn_st_asin_pkey_{datetime.now().date().strftime("%Y_%m_%d")}" PRIMARY KEY ("asin"); alter table us_all_syn_st_asin_copy rename to us_all_syn_st_asin_tmp; alter table us_all_syn_st_asin rename to us_all_syn_st_asin_copy; alter table us_all_syn_st_asin_tmp rename to us_all_syn_st_asin; """ pg6_conn.execute(sql3) print("创建索引完成") connection.close() pg6_conn.close() pg6_engine.dispose() doris_engine.dispose() if __name__ == "__main__": site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) table_list = CommonUtil.get_sys_arg(3, None) flag = CommonUtil.get_sys_arg(4, None) assert site_name is not None, "site_name 不能为空!" assert date_info is not None, "date_info 不能为空!" assert table_list is not None, "table_list 不能为空!" assert flag is not None, "flag 不能为空!" site_db_map = { "us": "selection", "uk": "selection_uk", "de": "selection_de", "es": "selection_es", "fr": "selection_fr", "it": "selection_it" } db_name = site_db_map[site_name] table_list = table_list.split(",") # 判断是否为工作时间 judge_is_working_hours() # 循环迁移每一张表 for table in table_list: # 如果是图片表,在Doris做同步 if table == "us_asin_image": # 获取Doris的engine doris_engine = get_doris_engine() # 同步图片表 syn_image(doris_engine) # OA通知 CommonUtil.send_wx_msg(["chenyuanjie"], "【us_asin_image同步完成】", "记得清理pg14的表") if table == "us_all_syn_st_asin": # 获取Doris的engine doris_engine = get_doris_engine() # 获取pg14的engine pg6_engine = get_pg6_engine() # 同步us_all_syn_st_asin表 syn_st_asin(doris_engine, pg6_engine) # OA通知 CommonUtil.send_wx_msg(["chenyuanjie"], "【us_all_syn_st_asin同步完成】", "悉知") # 其他表,pg14同步到pg6 else: # 生成配置文件 file = generate_conf_file(table, db_name) # 执行seatunnel启动命令 try: execute_cmd(file) except Exception as e: print(str(e)) CommonUtil.send_wx_msg(["chenyuanjie"], "【pg14到pg6数据同步异常】", f"{file}执行失败,错误信息:{str(e)}") # 删除配置文件 remove_conf(file) print("数据迁移完成,准备调用计算api") # if flag == "asin": # # 迁移完成,调用大数据计算api # DolphinschedulerHelper.start_process_instance( # "big_data_selection", # process_df_name='us-ABA+反查(旧版)+流量选品(旧版)-api', # startParams={ # "site_name": site_name, # "date_type": "week", # "date_info": date_info # } # ) # print("asin-pg14同步至pg6完成!") # # print("us-ABA+反查(旧版)+流量选品(旧版)-api 启动成功!") # CommonUtil.send_wx_msg(["chenyuanjie"], "【asin-pg14同步至pg6完成】", "悉知") # elif flag == "aba": # print("搜索词-pg14同步至pg6完成!") # CommonUtil.send_wx_msg(["chenyuanjie"], "【搜索词-pg14同步至pg6完成】", "悉知") pass