site_search_term.py 4.46 KB
import os
import subprocess
import sys

from utils.common_util import CommonUtil
from utils.db_util import DBUtil

sys.path.append(os.path.dirname(sys.path[0]))

def generate_conf_file(db_name, source_tb, sink_tb, date_info, week):
    conf_content = f"""
env {{
    execution.parallelism = 1
    job.mode = "BATCH"
}}

source {{
    Jdbc {{
        url = "jdbc:mysql://rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/{db_name}?characterEncoding=UTF-8&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false&zeroDateTimeBehavior=convertToNull"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "adv_yswg"
        password = "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"
        partition_column="id"
        partition_num=100
        fetch_size=40960
        parallelism=30
        query = "select id,search_term,1 as state,`week`,now() as created_time,now() as updated_time,'{date_info}' as date_info from {source_tb} where `week` = {week} and LENGTH(search_term) > 2"
    }}
}}

transform {{

}}

sink {{
    jdbc {{
        driver = "org.postgresql.Driver"
        parallelism = 60
        batch_size = 40960
        user = "postgres"
        password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
        url = "jdbc:postgresql://192.168.10.223:5432/{db_name}"
        generate_sink_sql = true
        database = "{db_name}"
        table = "public.{sink_tb}"
    }}
}}
"""
    file_name = f"{sink_tb}.conf"

    with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file:
        conf_file.write(conf_content)

    return file_name


def execute_cmd(file):
    cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'"
    print(f"当前执行的命令是:{cmd}")
    result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode == 0:
        print("命令执行成功!")
        print("标准输出:")
        print(result.stdout)
    else:
        print("命令执行失败!")
        print("错误输出:")
        print(result.stderr)
        raise Exception(f"{cmd},命令执行失败!")


def remove_conf(file):
    cmd = f"rm -f /home/chenyuanjie/conf/{file}"
    os.system(cmd)


def remove_old_data(sink_tb, date_info_pre, site_name):
    sql = f"""
    DELETE FROM {sink_tb} WHERE date_info < '{date_info_pre}';
    """
    pg14_engine = DBUtil.get_db_engine("postgresql_14", site_name)
    DBUtil.engine_exec_sql(pg14_engine, sql)


def get_pre_week(date_info):
    engine = DBUtil.get_db_engine("mysql", "us")
    with engine.connect() as connection:
        sql = f"""
        select year_week
        from date_20_to_30
        where year_week < '{date_info}'
        order by year_week desc
        limit 1  """
        result = connection.execute(sql)
        pre_week = result.cursor.fetchone()[0]
    return pre_week


if __name__ == "__main__":
    date_info = CommonUtil.get_sys_arg(1, None)
    site_name = CommonUtil.get_sys_arg(2, None)
    assert date_info is not None, "date_info 不能为空!"
    assert site_name is not None, "site_name 不能为空!"

    year, week = CommonUtil.split_month_week_date("week", date_info)
    date_info_pre = get_pre_week(date_info)

    db_map = {
        "uk": "selection_uk",
        "de": "selection_de",
        "es": "selection_es",
        "fr": "selection_fr",
        "it": "selection_it"
    }
    source_tb_map = {
        "uk": f"uk_brand_analytics_{year}",
        "de": f"de_brand_analytics_{year}",
        "es": f"es_brand_analytics_{year}",
        "fr": f"fr_brand_analytics_{year}",
        "it": f"it_brand_analytics_{year}"
    }
    sink_tb_map = {
        "uk": "uk_search_term",
        "de": "de_search_term",
        "es": "es_search_term",
        "fr": "fr_search_term",
        "it": "it_search_term"
    }

    db_name = db_map[site_name]
    source_tb = source_tb_map[site_name]
    sink_tb = sink_tb_map[site_name]
    # 删除历史数据
    remove_old_data(sink_tb, date_info_pre, site_name)
    # 生成配置文件
    file = generate_conf_file(db_name, source_tb, sink_tb, date_info, week)
    # 执行seatunnel启动命令
    try:
        execute_cmd(file)
    except Exception as e:
        print(str(e))
        CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步失败】", f"{file}执行失败,错误信息:{str(e)}")
    # 删除配置文件
    remove_conf(file)

    CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步成功】", "悉知")

pass