site_search_term.py 4.46 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
import os
import subprocess
import sys

from utils.common_util import CommonUtil
from utils.db_util import DBUtil

sys.path.append(os.path.dirname(sys.path[0]))

def generate_conf_file(db_name, source_tb, sink_tb, date_info, week):
    conf_content = f"""
env {{
    execution.parallelism = 1
    job.mode = "BATCH"
}}

source {{
    Jdbc {{
        url = "jdbc:mysql://rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/{db_name}?characterEncoding=UTF-8&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false&zeroDateTimeBehavior=convertToNull"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "adv_yswg"
        password = "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"
        partition_column="id"
        partition_num=100
        fetch_size=40960
        parallelism=30
        query = "select id,search_term,1 as state,`week`,now() as created_time,now() as updated_time,'{date_info}' as date_info from {source_tb} where `week` = {week} and LENGTH(search_term) > 2"
    }}
}}

transform {{

}}

sink {{
    jdbc {{
        driver = "org.postgresql.Driver"
        parallelism = 60
        batch_size = 40960
        user = "postgres"
        password = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
        url = "jdbc:postgresql://192.168.10.223:5432/{db_name}"
        generate_sink_sql = true
        database = "{db_name}"
        table = "public.{sink_tb}"
    }}
}}
"""
    file_name = f"{sink_tb}.conf"

    with open("/home/chenyuanjie/conf/" + file_name, "w") as conf_file:
        conf_file.write(conf_content)

    return file_name


def execute_cmd(file):
    cmd = f"sh /opt/module/seatunnel/bin/seatunnel.sh --config '/home/chenyuanjie/conf/{file}'"
    print(f"当前执行的命令是:{cmd}")
    result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode == 0:
        print("命令执行成功!")
        print("标准输出:")
        print(result.stdout)
    else:
        print("命令执行失败!")
        print("错误输出:")
        print(result.stderr)
        raise Exception(f"{cmd},命令执行失败!")


def remove_conf(file):
    cmd = f"rm -f /home/chenyuanjie/conf/{file}"
    os.system(cmd)


def remove_old_data(sink_tb, date_info_pre, site_name):
    sql = f"""
    DELETE FROM {sink_tb} WHERE date_info < '{date_info_pre}';
    """
    pg14_engine = DBUtil.get_db_engine("postgresql_14", site_name)
    DBUtil.engine_exec_sql(pg14_engine, sql)


def get_pre_week(date_info):
    engine = DBUtil.get_db_engine("mysql", "us")
    with engine.connect() as connection:
        sql = f"""
        select year_week
        from date_20_to_30
        where year_week < '{date_info}'
        order by year_week desc
        limit 1  """
        result = connection.execute(sql)
        pre_week = result.cursor.fetchone()[0]
    return pre_week


if __name__ == "__main__":
    date_info = CommonUtil.get_sys_arg(1, None)
    site_name = CommonUtil.get_sys_arg(2, None)
    assert date_info is not None, "date_info 不能为空!"
    assert site_name is not None, "site_name 不能为空!"

    year, week = CommonUtil.split_month_week_date("week", date_info)
    date_info_pre = get_pre_week(date_info)

    db_map = {
        "uk": "selection_uk",
        "de": "selection_de",
        "es": "selection_es",
        "fr": "selection_fr",
        "it": "selection_it"
    }
    source_tb_map = {
        "uk": f"uk_brand_analytics_{year}",
        "de": f"de_brand_analytics_{year}",
        "es": f"es_brand_analytics_{year}",
        "fr": f"fr_brand_analytics_{year}",
        "it": f"it_brand_analytics_{year}"
    }
    sink_tb_map = {
        "uk": "uk_search_term",
        "de": "de_search_term",
        "es": "es_search_term",
        "fr": "fr_search_term",
        "it": "it_search_term"
    }

    db_name = db_map[site_name]
    source_tb = source_tb_map[site_name]
    sink_tb = sink_tb_map[site_name]
    # 删除历史数据
    remove_old_data(sink_tb, date_info_pre, site_name)
    # 生成配置文件
    file = generate_conf_file(db_name, source_tb, sink_tb, date_info, week)
    # 执行seatunnel启动命令
    try:
        execute_cmd(file)
    except Exception as e:
        print(str(e))
        CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步失败】", f"{file}执行失败,错误信息:{str(e)}")
    # 删除配置文件
    remove_conf(file)

    CommonUtil.send_wx_msg(["chenyuanjie"], f"【{site_name}搜索词同步成功】", "悉知")

pass