pg14_to_pg6.py 10.4 KB
import os
import subprocess
import sys

from datetime import datetime, time
import time as sleep_time

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
from utils.db_util import DBUtil

sys.path.append(os.path.dirname(sys.path[0]))

def generate_conf_file(table_name, db_name):
    conf_content = f"""
env {{
    execution.parallelism = 1
    job.mode = "BATCH"
}}

source {{
    Jdbc {{
        url = "jdbc:postgresql://192.168.10.223:5432/{db_name}"
        driver = "org.postgresql.Driver"
        connection_check_timeout_sec = 100
        user = "postgres"
        password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
        partition_column = "id"
        partition_num = 60
        fetch_size = 40960
        parallelism = 10
        query = "select * from public.{table_name}"
    }}
}}

transform {{

}}

sink {{
    jdbc {{
        driver = "org.postgresql.Driver"
        parallelism = 60
        batch_size = 40960
        user = "postgres"
        password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
        url = "jdbc:postgresql://192.168.10.216:5432/{db_name}"
        generate_sink_sql = true
        database = "{db_name}"
        table = "public.{table_name}"
    }}
}}
"""
    file_name = f"{table_name}.conf"

    with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file:
        conf_file.write(conf_content)

    return file_name


def execute_cmd(file):
    cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'"
    print(f"当前执行的命令是:{cmd}")
    result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode == 0:
        print("命令执行成功!")
        print("标准输出:")
        print(result.stdout)
    else:
        print("命令执行失败!")
        print("错误输出:")
        print(result.stderr)
        raise Exception(f"{cmd},命令执行失败!")


def remove_conf(file):
    cmd = f"rm -f /home/chenyuanjie/conf/{file}"
    os.system(cmd)


def judge_is_working_hours():
    work_start_time = time(19, 0)
    work_end_time = time(6, 0)

    # 获取当前时间
    current_time = datetime.now().time()

    # 判断当前时间是否在工作时间范围内
    if work_start_time <= current_time <= work_end_time:
        # 计算距离下班还有多少秒
        sleep_seconds = (work_end_time.hour - current_time.hour) * 3600 + \
                            (work_end_time.minute - current_time.minute) * 60 + \
                            (work_end_time.second - current_time.second)
        print("当前处于工作时间,程序将睡眠到下班后执行")
        sleep_time.sleep(sleep_seconds)
    else:
        print("当前不是工作时间,程序正常执行")


# def update_zero_to_null(table, site_name):
#     sql = f"""
#     BEGIN;
#     UPDATE {table} SET rank = NULL WHERE rank = 0;
#     COMMIT;
#     END;
#     """
#     DBUtil.exec_sql("postgresql", site_name, sql)
#     CommonUtil.send_wx_msg(["chenyuanjie"], f"【检查{table}更新后的rank】")


def get_doris_engine() -> Engine:
    # Doris连接参数
    con = {
        "url": "jdbc:mysql://192.168.10.218:9030/selection",
        "username": "root",
        "pwd": "",
        "host": "192.168.10.218",
        "port": 9030,
        "db_name": "selection"
    }
    # 创建Doris的engine
    engine = create_engine(
        f"mysql+pymysql://{con['username']}:{con['pwd']}@{con['host']}:{con['port']}/{con['db_name']}")
    return engine


def get_pg6_engine() -> Engine:
    # 连接到pg6
    pg6_conn_str = 'postgresql://postgres:fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS@192.168.10.216:5432/selection'
    engine = create_engine(pg6_conn_str)
    return engine


def syn_image(engine):
    # pg14 to doris
    sql1 = """
    INSERT INTO us_asin_image_tmp SELECT asin,img_order_by,data_type,img_url,created_at,updated_at FROM jdbc_pg14.public.us_asin_image;

    ANALYZE TABLE us_asin_image_tmp;
    """
    print("----------------------------------------------------")
    print(sql1)
    with engine.connect() as connection:
        connection.execute(sql1)
        print("us_asin_image_tmp同步完成")

    # us_asin_image_tmp to us_asin_image
    sql2 = """
    TRUNCATE TABLE us_asin_image;

    INSERT INTO us_asin_image 
    SELECT
        asin,
        img_order_by,
        data_type,
        img_url,
        created_at,
        updated_at 
    FROM
        (
        SELECT
            asin,
            img_order_by,
            data_type,
            img_url,
            created_at,
            updated_at,
            RANK() over ( PARTITION BY asin ORDER BY created_at DESC ) AS rk 
        FROM
            us_asin_image_tmp
        ) t 
    WHERE
        rk = 1;

    ANALYZE TABLE us_asin_image;
    """
    print("----------------------------------------------------")
    print(sql2)
    # 同步数据
    with engine.connect() as connection:
        connection.execute(sql2)
        print("us_asin_image同步完成")

    # 关闭数据库engine
    engine.dispose()


def syn_st_asin(doris_engine, pg6_engine):
    # pg14 to Doris
    sql1 = """
    INSERT INTO us_all_syn_st_asin SELECT asin,bitmap_to_string(BITMAP_HASH(asin)) AS id,state,updated_at FROM jdbc_pg14.public.us_all_syn_st_asin;

    ANALYZE TABLE us_all_syn_st_asin;
    """
    print("----------------------------------------------------")
    print(sql1)
    with doris_engine.connect() as connection:
        connection.execute(sql1)
        print("pg14到Doris同步完成")

    pg6_conn = pg6_engine.connect()
    sql2 = """
    drop table if exists us_all_syn_st_asin_copy;
    create table if not exists us_all_syn_st_asin_copy 
    (
    like us_all_syn_st_asin_master including ALL       
    );
    """
    pg6_conn.execute(sql2)
    print("pg6已清除copy表")

    cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/us_all_syn_st_asin.conf'"
    print(f"小水滴启动命令:{cmd}")
    result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode == 0:
        print("us_all_syn_st_asin_copy同步成功")
        print("标准输出:")
        print(result.stdout)
    else:
        print("执行失败!")
        print("错误输出:")
        print(result.stderr)
        raise Exception(f"{cmd},命令执行失败!")

    print("导出完成,准备创建索引和交换表名")

    sql3 = f"""
    CREATE UNIQUE INDEX "us_all_syn_st_asin_asin_idx_{datetime.now().date().strftime("%Y_%m_%d")}" ON "public"."us_all_syn_st_asin_copy" USING btree (
      "asin" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
    );
    CREATE INDEX "us_all_syn_st_asin_state_idx_{datetime.now().date().strftime("%Y_%m_%d")}" ON "public"."us_all_syn_st_asin_copy" USING btree (
      "state" "pg_catalog"."int8_ops" ASC NULLS LAST
    );  
    ALTER TABLE "public"."us_all_syn_st_asin_copy" ADD CONSTRAINT "us_all_syn_st_asin_pkey_{datetime.now().date().strftime("%Y_%m_%d")}" PRIMARY KEY ("asin");
    alter table us_all_syn_st_asin_copy rename to us_all_syn_st_asin_tmp;
    alter table us_all_syn_st_asin rename to us_all_syn_st_asin_copy;
    alter table us_all_syn_st_asin_tmp rename to us_all_syn_st_asin;
    """
    pg6_conn.execute(sql3)
    print("创建索引完成")

    connection.close()
    pg6_conn.close()
    pg6_engine.dispose()
    doris_engine.dispose()


if __name__ == "__main__":
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    table_list = CommonUtil.get_sys_arg(3, None)
    flag = CommonUtil.get_sys_arg(4, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_info is not None, "date_info 不能为空!"
    assert table_list is not None, "table_list 不能为空!"
    assert flag is not None, "flag 不能为空!"

    site_db_map = {
        "us": "selection",
        "uk": "selection_uk",
        "de": "selection_de",
        "es": "selection_es",
        "fr": "selection_fr",
        "it": "selection_it"
    }
    db_name = site_db_map[site_name]

    table_list = table_list.split(",")

    # 判断是否为工作时间
    judge_is_working_hours()

    # 循环迁移每一张表
    for table in table_list:
        # 如果是图片表,在Doris做同步
        if table == "us_asin_image":
            # 获取Doris的engine
            doris_engine = get_doris_engine()
            # 同步图片表
            syn_image(doris_engine)
            # OA通知
            CommonUtil.send_wx_msg(["chenyuanjie"], "【us_asin_image同步完成】", "记得清理pg14的表")
        if table == "us_all_syn_st_asin":
            # 获取Doris的engine
            doris_engine = get_doris_engine()
            # 获取pg14的engine
            pg6_engine = get_pg6_engine()
            # 同步us_all_syn_st_asin表
            syn_st_asin(doris_engine, pg6_engine)
            # OA通知
            CommonUtil.send_wx_msg(["chenyuanjie"], "【us_all_syn_st_asin同步完成】", "悉知")
        # 其他表,pg14同步到pg6
        else:
            # 生成配置文件
            file = generate_conf_file(table, db_name)
            # 执行seatunnel启动命令
            try:
                execute_cmd(file)
            except Exception as e:
                print(str(e))
                CommonUtil.send_wx_msg(["chenyuanjie"], "【pg14到pg6数据同步异常】", f"{file}执行失败,错误信息:{str(e)}")
            # 删除配置文件
            remove_conf(file)
    print("数据迁移完成,准备调用计算api")

    # if flag == "asin":
    #     # 迁移完成,调用大数据计算api
    #     DolphinschedulerHelper.start_process_instance(
    #         "big_data_selection",
    #         process_df_name='us-ABA+反查(旧版)+流量选品(旧版)-api',
    #         startParams={
    #             "site_name": site_name,
    #             "date_type": "week",
    #             "date_info": date_info
    #         }
    #     )
    #     print("asin-pg14同步至pg6完成!")
    #     # print("us-ABA+反查(旧版)+流量选品(旧版)-api 启动成功!")
    #     CommonUtil.send_wx_msg(["chenyuanjie"], "【asin-pg14同步至pg6完成】", "悉知")
    # elif flag == "aba":
    #     print("搜索词-pg14同步至pg6完成!")
    #     CommonUtil.send_wx_msg(["chenyuanjie"], "【搜索词-pg14同步至pg6完成】", "悉知")

pass