From 7e1f38dcf45f57912af85217eb6551153582a9ef Mon Sep 17 00:00:00 2001 From: chenyuanjie <chenyuanjie0703@foxmail.com> Date: Wed, 5 Mar 2025 10:53:03 +0800 Subject: [PATCH] no message --- Pyspark_job/sqoop_ct/__init__.py | 0 Pyspark_job/sqoop_ct/asin_state.py | 85 ------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/de_st_month_2022_9_old.py | 59 ----------------------------------------------------------- Pyspark_job/sqoop_ct/export_dwt_st_top_asin_info.py | 79 ------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/st_2110_2208_in.py | 93 --------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/st_2110_2208_out.py | 65 ----------------------------------------------------------------- Pyspark_job/sqoop_ct/st_2209_2303_in.py | 106 ---------------------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/st_2209_2303_out.py | 78 ------------------------------------------------------------------------------ Pyspark_job/sqoop_ct/test.py | 5 ----- Pyspark_job/sqoop_ct/test_import.py | 91 ------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/test_utils.py | 20 -------------------- Pyspark_job/sqoop_ct/us_asin_image_pg14.py | 91 ------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/us_asin_image_to_pg14.py | 40 ---------------------------------------- Pyspark_job/sqoop_ct/z_asin_b09_mysql_to_pg.py | 107 ----------------------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/z_asin_detail_trend_month_mysql_to_pg.py | 122 -------------------------------------------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/z_asin_state_copy.py | 53 ----------------------------------------------------- Pyspark_job/sqoop_ct/z_de_asin_image.py | 61 ------------------------------------------------------------- Pyspark_job/sqoop_ct/z_us_asin_image.py | 64 ---------------------------------------------------------------- Pyspark_job/sqoop_ct/z_us_asin_image_mysql_to_pg.py | 89 ----------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/z_us_bs_category_asin_mysql_to_pg.py | 87 --------------------------------------------------------------------------------------- Pyspark_job/sqoop_ct/z_us_st_year_week.py | 35 ----------------------------------- 21 files changed, 1430 deletions(-) delete mode 100644 Pyspark_job/sqoop_ct/__init__.py delete mode 100644 Pyspark_job/sqoop_ct/asin_state.py delete mode 100644 Pyspark_job/sqoop_ct/de_st_month_2022_9_old.py delete mode 100644 Pyspark_job/sqoop_ct/export_dwt_st_top_asin_info.py delete mode 100644 Pyspark_job/sqoop_ct/st_2110_2208_in.py delete mode 100644 Pyspark_job/sqoop_ct/st_2110_2208_out.py delete mode 100644 Pyspark_job/sqoop_ct/st_2209_2303_in.py delete mode 100644 Pyspark_job/sqoop_ct/st_2209_2303_out.py delete mode 100644 Pyspark_job/sqoop_ct/test.py delete mode 100644 Pyspark_job/sqoop_ct/test_import.py delete mode 100644 Pyspark_job/sqoop_ct/test_utils.py delete mode 100644 Pyspark_job/sqoop_ct/us_asin_image_pg14.py delete mode 100644 Pyspark_job/sqoop_ct/us_asin_image_to_pg14.py delete mode 100644 Pyspark_job/sqoop_ct/z_asin_b09_mysql_to_pg.py delete mode 100644 Pyspark_job/sqoop_ct/z_asin_detail_trend_month_mysql_to_pg.py delete mode 100644 Pyspark_job/sqoop_ct/z_asin_state_copy.py delete mode 100644 Pyspark_job/sqoop_ct/z_de_asin_image.py delete mode 100644 Pyspark_job/sqoop_ct/z_us_asin_image.py delete mode 100644 Pyspark_job/sqoop_ct/z_us_asin_image_mysql_to_pg.py delete mode 100644 Pyspark_job/sqoop_ct/z_us_bs_category_asin_mysql_to_pg.py delete mode 100644 Pyspark_job/sqoop_ct/z_us_st_year_week.py diff --git a/Pyspark_job/sqoop_ct/__init__.py b/Pyspark_job/sqoop_ct/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/Pyspark_job/sqoop_ct/__init__.py +++ /dev/null diff --git a/Pyspark_job/sqoop_ct/asin_state.py b/Pyspark_job/sqoop_ct/asin_state.py deleted file mode 100644 index 11e57fc..0000000 --- a/Pyspark_job/sqoop_ct/asin_state.py +++ /dev/null @@ -1,85 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - assert site_name is not None, "site_name 不能为空!" - - # hive_tb = "tmp_asin_state" - # - # partition_dict = { - # "site_name": site_name - # } - # - # hdfs_path = CommonUtil.build_hdfs_path(hive_tb , partition_dict) - # print(f"hdfs_path is {hdfs_path}") - # - # query = f""" - # select - # asin, - # state, - # created_at, - # updated_at, - # 3 as flag - # from us_all_syn_st_history_2022 - # where 1 = 1 - # and \$CONDITIONS - # """ - # print(query) - # db_type = "mysql" - # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - # site_name=site_name, - # query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - # - # if not empty_flag: - # sh = CommonUtil.build_import_sh_v2(site_name=site_name, - # db_type=db_type, - # query=query, - # hdfs_path=hdfs_path, - # map_num=15, - # key="state" - # ) - # - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - # client.close() - # - # # 导入后检测--检测数据一致性 - # CommonUtil.check_import_sync_num(db_type=db_type, - # partition_dict=partition_dict, - # import_query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = "us_all_syn_st_asin" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_asin_state_copy", - export_tb=export_tb, - col=[ - "asin", - "state" - ], - partition_dict={ - "site_name": site_name - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/de_st_month_2022_9_old.py b/Pyspark_job/sqoop_ct/de_st_month_2022_9_old.py deleted file mode 100644 index 5290ea5..0000000 --- a/Pyspark_job/sqoop_ct/de_st_month_2022_9_old.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - - # 导出到pg数据库 - db_type = "postgresql_cluster" - export_tb = "de_st_month_2022_9_old" - - sh = CommonUtil.build_export_sh( - site_name="de", - db_type=db_type, - hive_tb="tmp_st_month_2110_2208", - export_tb=export_tb, - col=[ - "week", - "asin", - "search_term", - "ao_val", - "orders", - "orders_sum", - "flow", - "order_flow", - "search_num", - "search_rank", - "quantity_being_sold", - "adv_compet", - "zr_page_rank", - "zr_page", - "zr_page_row", - "sp_page", - "sp_page_rank", - "sp_page_row", - "sb1_page", - "sb2_page", - "sb3_page", - "ac_page", - "bs_page", - "er_page", - "tr_page", - "search_term_type", - "created_at", - "updated_at" - ], - partition_dict={ - "site_name": "de", - "year_month": "2022-9-old" - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/export_dwt_st_top_asin_info.py b/Pyspark_job/sqoop_ct/export_dwt_st_top_asin_info.py deleted file mode 100644 index 24dc69c..0000000 --- a/Pyspark_job/sqoop_ct/export_dwt_st_top_asin_info.py +++ /dev/null @@ -1,79 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.db_util import DBUtil - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - date_type = CommonUtil.get_sys_arg(2, None) - date_info = CommonUtil.get_sys_arg(3, None) - - print(f"执行参数为{sys.argv}") - - db_type = "postgresql" - print("导出到PG库中") - - year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None) - suffix = str(date_info).replace("-", "_") - base_tb = f"{site_name}_aba_last_top_asin" - export_master_tb = f"{base_tb}_{year_str}" - export_tb = f"{base_tb}_{suffix}" - next_month = CommonUtil.get_month_offset(date_info, 1) - - engine = DBUtil.get_db_engine(db_type, site_name) - with engine.connect() as connection: - sql = f""" - drop table if exists {export_tb}; - create table if not exists {export_tb} - ( - like {export_master_tb} including comments - ); - """ - print("================================执行sql================================") - print(sql) - connection.execute(sql) - - # 导出表名 - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="dwt_st_top_asin_info", - export_tb=export_tb, - col=[ - "site_name", - "search_term_id", - "search_term", - "asin", - "date_info", - "data_type", - "zr_rank", - "created_time", - "updated_time" - ], - partition_dict={ - "site_name": site_name, - "date_type": date_type, - "date_info": date_info - } - ) - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - # 创建索引并交换分区 - DBUtil.add_pg_part( - engine, - source_tb_name=export_tb, - part_master_tb=export_master_tb, - part_val={ - "from": [date_info], - "to": [next_month] - }, - cp_index_flag=True, - ) - print("success") - - print("success") diff --git a/Pyspark_job/sqoop_ct/st_2110_2208_in.py b/Pyspark_job/sqoop_ct/st_2110_2208_in.py deleted file mode 100644 index f0382b2..0000000 --- a/Pyspark_job/sqoop_ct/st_2110_2208_in.py +++ /dev/null @@ -1,93 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - year_month = CommonUtil.get_sys_arg(2, None) - assert site_name is not None, "site_name 不能为空!" - assert year_month is not None, "year_month 不能为空!" - - year,month = year_month.split("-") - - hive_tb = "tmp_st_month_2110_2208" - - partition_dict = { - "site_name": site_name, - "year_month": year_month - } - - hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict) - print(f"hdfs_path is {hdfs_path}") - - query = f""" - select - week, - asin, - search_term, - ao_val, - orders, - orders_sum, - flow, - order_flow, - search_num, - search_rank, - quantity_being_sold, - adv_compet, - zr_page_rank, - zr_page, - zr_page_row, - sp_page, - sp_page_rank, - sp_page_row, - sb1_page, - sb2_page, - sb3_page, - ac_page, - bs_page, - er_page, - tr_page, - search_term_type, - created_at, - updated_at, - id - from {site_name}_st_month_{year}_{month} - where 1 = 1 - and \$CONDITIONS - """ - print(query) - db_type = "mysql" - empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - site_name=site_name, - query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - if not empty_flag: - sh = CommonUtil.build_import_sh_v2(site_name=site_name, - db_type=db_type, - query=query, - hdfs_path=hdfs_path, - map_num=10, - key="id" - ) - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - client.close() - - # 导入后检测--检测数据一致性 - CommonUtil.check_import_sync_num(db_type=db_type, - partition_dict=partition_dict, - import_query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/st_2110_2208_out.py b/Pyspark_job/sqoop_ct/st_2110_2208_out.py deleted file mode 100644 index 8da9584..0000000 --- a/Pyspark_job/sqoop_ct/st_2110_2208_out.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - year_month = CommonUtil.get_sys_arg(2, None) - assert site_name is not None, "site_name 不能为空!" - assert year_month is not None, "year_month 不能为空!" - - year,month = year_month.split("-") - - # 导出到pg数据库 - db_type = "postgresql_cluster" - export_tb = f"{site_name}_st_month_{year}_{month}" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_st_month_2110_2208", - export_tb=export_tb, - col=[ - "week", - "asin", - "search_term", - "ao_val", - "orders", - "orders_sum", - "flow", - "order_flow", - "search_num", - "search_rank", - "quantity_being_sold", - "adv_compet", - "zr_page_rank", - "zr_page", - "zr_page_row", - "sp_page", - "sp_page_rank", - "sp_page_row", - "sb1_page", - "sb2_page", - "sb3_page", - "ac_page", - "bs_page", - "er_page", - "tr_page", - "search_term_type", - "created_at", - "updated_at" - ], - partition_dict={ - "site_name": site_name, - "year_month": year_month - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/st_2209_2303_in.py b/Pyspark_job/sqoop_ct/st_2209_2303_in.py deleted file mode 100644 index 3eb3714..0000000 --- a/Pyspark_job/sqoop_ct/st_2209_2303_in.py +++ /dev/null @@ -1,106 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - year_month = CommonUtil.get_sys_arg(2, None) - assert site_name is not None, "site_name 不能为空!" - assert year_month is not None, "year_month 不能为空!" - - year,month = year_month.split("-") - - hive_tb = "tmp_st_month_2209_2303" - - partition_dict = { - "site_name": site_name, - "year_month": year_month - } - - hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict) - print(f"hdfs_path is {hdfs_path}") - - query = f""" - select - id, - search_term, - st_ao_val, - st_type, - st_rank, - st_rank_avg, - st_search_num, - st_search_rate, - st_search_sum, - st_adv_counts, - st_quantity_being_sold, - asin, - asin_st_zr_orders, - asin_st_zr_orders_sum, - asin_st_zr_flow, - asin_st_sp_orders, - asin_st_sp_orders_sum, - asin_st_sp_flow, - st_asin_zr_page, - st_asin_zr_page_row, - st_asin_zr_page_rank, - st_asin_zr_updated_at, - st_asin_sp_page, - st_asin_sp_page_rank, - st_asin_sp_page_row, - st_asin_sp_updated_at, - st_asin_sb1_page, - st_asin_sb1_updated_at, - st_asin_sb2_page, - st_asin_sb2_updated_at, - st_asin_sb3_page, - st_asin_sb3_updated_at, - st_asin_ac_page, - st_asin_ac_updated_at, - st_asin_bs_page, - st_asin_bs_updated_at, - st_asin_er_page, - st_asin_er_updated_at, - st_asin_tr_page, - st_asin_tr_updated_at, - created_at, - updated_at - from {site_name}_st_month_{year}_{month} - where 1 = 1 - and \$CONDITIONS - """ - print(query) - db_type = "mysql" - empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - site_name=site_name, - query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - if not empty_flag: - sh = CommonUtil.build_import_sh_v2(site_name=site_name, - db_type=db_type, - query=query, - hdfs_path=hdfs_path, - map_num=10, - key="id" - ) - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - client.close() - - # 导入后检测--检测数据一致性 - CommonUtil.check_import_sync_num(db_type=db_type, - partition_dict=partition_dict, - import_query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/st_2209_2303_out.py b/Pyspark_job/sqoop_ct/st_2209_2303_out.py deleted file mode 100644 index 93c7b86..0000000 --- a/Pyspark_job/sqoop_ct/st_2209_2303_out.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - year_month = CommonUtil.get_sys_arg(2, None) - assert site_name is not None, "site_name 不能为空!" - assert year_month is not None, "year_month 不能为空!" - - year,month = year_month.split("-") - - # 导出到pg数据库 - db_type = "postgresql_cluster" - export_tb = f"{site_name}_st_month_{year}_{month}" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_st_month_2209_2303", - export_tb=export_tb, - col=[ - "search_term", - "st_ao_val", - "st_type", - "st_rank", - "st_rank_avg", - "st_search_num", - "st_search_rate", - "st_search_sum", - "st_adv_counts", - "st_quantity_being_sold", - "asin", - "asin_st_zr_orders", - "asin_st_zr_orders_sum", - "asin_st_zr_flow", - "asin_st_sp_orders", - "asin_st_sp_orders_sum", - "asin_st_sp_flow", - "st_asin_zr_page", - "st_asin_zr_page_row", - "st_asin_zr_page_rank", - "st_asin_zr_updated_at", - "st_asin_sp_page", - "st_asin_sp_page_rank", - "st_asin_sp_page_row", - "st_asin_sp_updated_at", - "st_asin_sb1_page", - "st_asin_sb1_updated_at", - "st_asin_sb2_page", - "st_asin_sb2_updated_at", - "st_asin_sb3_page", - "st_asin_sb3_updated_at", - "st_asin_ac_page", - "st_asin_ac_updated_at", - "st_asin_bs_page", - "st_asin_bs_updated_at", - "st_asin_er_page", - "st_asin_er_updated_at", - "st_asin_tr_page", - "st_asin_tr_updated_at", - "created_at", - "updated_at" - ], - partition_dict={ - "site_name": site_name, - "year_month": year_month - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/test.py b/Pyspark_job/sqoop_ct/test.py deleted file mode 100644 index 9b54440..0000000 --- a/Pyspark_job/sqoop_ct/test.py +++ /dev/null @@ -1,5 +0,0 @@ - - -if __name__ == '__main__': - num = int('2024-05'.split('-')[-1]) - print(num) diff --git a/Pyspark_job/sqoop_ct/test_import.py b/Pyspark_job/sqoop_ct/test_import.py deleted file mode 100644 index 309be38..0000000 --- a/Pyspark_job/sqoop_ct/test_import.py +++ /dev/null @@ -1,91 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - year_month = CommonUtil.get_sys_arg(2, None) - assert site_name is not None, "site_name 不能为空!" - assert year_month is not None, "year_month 不能为空!" - - year,month = year_month.split("-") - - hive_tb = "tmp_st_month_2110_2208" - - partition_dict = { - "site_name": site_name, - "year_month": year_month - } - - hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict) - print(f"hdfs_path is {hdfs_path}") - - query = f""" - select - week, - asin, - search_term, - ao_val, - orders, - orders_sum, - flow, - order_flow, - search_num, - search_rank, - quantity_being_sold, - adv_compet, - zr_page_rank, - zr_page, - zr_page_row, - sp_page, - sp_page_rank, - sp_page_row, - sb1_page, - sb2_page, - sb3_page, - ac_page, - bs_page, - er_page, - tr_page, - search_term_type, - created_at, - updated_at, - id - from {site_name}_st_month_{year}_{month} - where 1 = 1 - and \$CONDITIONS - """ - print(query) - db_type = "mysql" - empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - site_name=site_name, - query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - if not empty_flag: - sh = CommonUtil.build_import_sh_v2(site_name=site_name, - db_type=db_type, - query=query, - hdfs_path=hdfs_path, - ) - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - client.close() - - # 导入后检测--检测数据一致性 - CommonUtil.check_import_sync_num(db_type=db_type, - partition_dict=partition_dict, - import_query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/test_utils.py b/Pyspark_job/sqoop_ct/test_utils.py deleted file mode 100644 index 810e67d..0000000 --- a/Pyspark_job/sqoop_ct/test_utils.py +++ /dev/null @@ -1,20 +0,0 @@ -import json -import subprocess -from datetime import datetime, time -import sys -from pyspark.sql import SparkSession -from Pyspark_job.utils import common_util -from Pyspark_job.utils import DolphinschedulerHelper -from yswg_utils.common_df import get_asin_unlanuch_df -from Pyspark_job.utils.spark_util import SparkUtil -import script.pg14_to_pg6 as sc -from Pyspark_job.script import post_to_dolphin -import subprocess - -if __name__ == '__main__': - # date_info = '2023_34' - # table_names = f"us_search_term_rank_er_{date_info}," \ - # f"us_search_term_rank_hr_{date_info},us_search_term_rank_tr_{date_info},us_other_search_term_{date_info}," \ - # f"us_brand_analytics_{date_info}" - # post_to_dolphin.DolphinschedulerHelper.start_process_instance('us', '2023-34', table_names, 'aba') - str.upper("seatunnel") \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/us_asin_image_pg14.py b/Pyspark_job/sqoop_ct/us_asin_image_pg14.py deleted file mode 100644 index 7b57fa1..0000000 --- a/Pyspark_job/sqoop_ct/us_asin_image_pg14.py +++ /dev/null @@ -1,91 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - site_name = "us" - - hive_tb = f"tmp_asin_image" - - partition_dict = { - "site_name": "us14", - } - hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) - print(f"hdfs_path is {hdfs_path}") - - query = f""" - select - asin, - img_url, - img_order_by, - created_at, - updated_at, - data_type - from {site_name}_asin_image_pyb_copy - where 1 = 1 - and \$CONDITIONS - """ - print(query) - db_type = "postgresql_14" - empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - site_name=site_name, - query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - if not empty_flag: - sh = CommonUtil.build_import_sh(site_name=site_name, - db_type=db_type, - query=query, - hdfs_path=hdfs_path, - map_num=10, - key='id') - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - client.close() - - #导入后检测--检测数据一致性 - CommonUtil.check_import_sync_num(db_type=db_type, - partition_dict=partition_dict, - import_query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - - # # 导出到pg数据库 - # db_type = "postgresql" - # export_tb = f"{site_name}_asin_image_copy" - # - # # 导出表名 - # sh = CommonUtil.build_export_sh( - # site_name=site_name, - # db_type=db_type, - # hive_tb="tmp_asin_image_copy", - # export_tb=export_tb, - # col=[ - # "asin", - # "img_url", - # "img_order_by", - # "created_at", - # "updated_at", - # "data_type" - # ], - # partition_dict={ - # "site_name": site_name - # } - # ) - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # client.close() - - pass diff --git a/Pyspark_job/sqoop_ct/us_asin_image_to_pg14.py b/Pyspark_job/sqoop_ct/us_asin_image_to_pg14.py deleted file mode 100644 index ebf1bd8..0000000 --- a/Pyspark_job/sqoop_ct/us_asin_image_to_pg14.py +++ /dev/null @@ -1,40 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - site_name = "de" - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = "de_asin_image_copy" - - # 导出表名 - sh = CommonUtil.build_export_sh( - site_name="de", - db_type=db_type, - hive_tb="tmp_asin_image_lzo", - export_tb=export_tb, - col=[ - "asin", - "img_url", - "img_order_by", - "created_at", - "updated_at", - "data_type" - ], - partition_dict={ - "site_name": "de" - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass diff --git a/Pyspark_job/sqoop_ct/z_asin_b09_mysql_to_pg.py b/Pyspark_job/sqoop_ct/z_asin_b09_mysql_to_pg.py deleted file mode 100644 index 87757ff..0000000 --- a/Pyspark_job/sqoop_ct/z_asin_b09_mysql_to_pg.py +++ /dev/null @@ -1,107 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - assert site_name is not None, "site_name 不能为空!" - - # hive_tb = f"tmp_asin_b09" - - # partition_dict = { - # "site_name": site_name, - # } - # hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) - # print(f"hdfs_path is {hdfs_path}") - # - # query = f""" - # select - # asin, - # price, - # rating, - # total_comments, - # page_inventory, - # `rank`, - # img_num, - # ao_val, - # bsr_orders, - # sales, - # data_at, - # created_at, - # updated_at, - # year_week, - # '{site_name}' as site_name - # from {site_name}_asin_b09 - # where 1 = 1 - # and \$CONDITIONS - # """ - # print(query) - # db_type = "mysql" - # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - # site_name=site_name, - # query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - # - # if not empty_flag: - # sh = CommonUtil.build_import_sh(site_name=site_name, - # db_type=db_type, - # query=query, - # hdfs_path=hdfs_path) - # # 导入前先删除 - # HdfsUtils.delete_hdfs_file(hdfs_path) - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - # client.close() - # - # # 导入后检测--检测数据一致性 - # CommonUtil.check_import_sync_num(db_type=db_type, - # partition_dict=partition_dict, - # import_query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = f"{site_name}_asin_b09" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_asin_b09", - export_tb=export_tb, - col=[ - "asin", - "price", - "rating", - "total_comments", - "page_inventory", - "rank", - "img_num", - "ao_val", - "bsr_orders", - "sales", - "data_at", - "created_at", - "updated_at", - "year_week" - ], - partition_dict={ - "site_name": site_name, - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/z_asin_detail_trend_month_mysql_to_pg.py b/Pyspark_job/sqoop_ct/z_asin_detail_trend_month_mysql_to_pg.py deleted file mode 100644 index cf783bb..0000000 --- a/Pyspark_job/sqoop_ct/z_asin_detail_trend_month_mysql_to_pg.py +++ /dev/null @@ -1,122 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - site_name = CommonUtil.get_sys_arg(1, None) - assert site_name is not None, "site_name 不能为空!" - - # hive_tb = f"tmp_asin_detail_trend_month" - # - # partition_dict = { - # "site_name": site_name, - # } - # hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) - # print(f"hdfs_path is {hdfs_path}") - # - # query = f""" - # select - # asin, - # ym, - # rank_rise, - # rank_change, - # ao_rise, - # ao_change, - # price_rise, - # price_change, - # orders_rise, - # orders_change, - # rating_rise, - # rating_change, - # comments_rise, - # comments_change, - # bsr_orders_rise, - # bsr_orders_change, - # sales_rise, - # sales_change, - # variation_num, - # variation_rise, - # variation_change, - # created_at, - # updated_at - # from {site_name}_asin_detail_trend_month - # where 1 = 1 - # and \$CONDITIONS - # """ - # print(query) - # db_type = "mysql" - # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - # site_name=site_name, - # query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - # - # if not empty_flag: - # sh = CommonUtil.build_import_sh(site_name=site_name, - # db_type=db_type, - # query=query, - # hdfs_path=hdfs_path) - # - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - # client.close() - # - # # 导入后检测--检测数据一致性 - # CommonUtil.check_import_sync_num(db_type=db_type, - # partition_dict=partition_dict, - # import_query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = f"{site_name}_asin_detail_trend_month" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_asin_detail_trend_month", - export_tb=export_tb, - col=[ - "asin", - "ym", - "rank_rise", - "rank_change", - "ao_rise", - "ao_change", - "price_rise", - "price_change", - "orders_rise", - "orders_change", - "rating_rise", - "rating_change", - "comments_rise", - "comments_change", - "bsr_orders_rise", - "bsr_orders_change", - "sales_rise", - "sales_change", - "variation_num", - "variation_rise", - "variation_change", - "created_at", - "updated_at" - ], - partition_dict={ - "site_name": site_name, - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/z_asin_state_copy.py b/Pyspark_job/sqoop_ct/z_asin_state_copy.py deleted file mode 100644 index a664135..0000000 --- a/Pyspark_job/sqoop_ct/z_asin_state_copy.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -import sys - -from pyspark.storagelevel import StorageLevel - -sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 -from utils.templates import Templates -# from ..utils.templates import Templates -# from AmazonSpider.pyspark_job.utils.templates import Templates -from pyspark.sql.types import StringType -# 分组排序的udf窗口函数 -from pyspark.sql.window import Window -from pyspark.sql import functions as F - - -class AsinState(Templates): - - def __init__(self): - super().__init__() - self.site_name = "us" - self.db_save = f"tmp_asin_state_copy" - self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") - self.df_save = self.spark.sql(f"select 1+1;") - self.df = self.spark.sql(f"select 1+1;") - self.partitions_by = ['site_name'] - self.reset_partitions(partitions_num=1) - - def read_data(self): - sql = f""" - select - asin, - state, - updated_at, - flag, - site_name - from - tmp_asin_state - where - site_name = 'us'; - """ - self.df = self.spark.sql(sqlQuery=sql).cache() - - def handle_data(self): - df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc(), self.df.updated_at.desc()) - - self.df = self.df.withColumn("rk", F.row_number().over(window=df_window)) - self.df_save = self.df.filter("rk = 1") - self.df_save = self.df_save.drop("flag").drop("rk").drop("updated_at") - - -if __name__ == "__main__": - handle_obj = AsinState() - handle_obj.run() diff --git a/Pyspark_job/sqoop_ct/z_de_asin_image.py b/Pyspark_job/sqoop_ct/z_de_asin_image.py deleted file mode 100644 index 237033c..0000000 --- a/Pyspark_job/sqoop_ct/z_de_asin_image.py +++ /dev/null @@ -1,61 +0,0 @@ -import os -import sys - -from pyspark.storagelevel import StorageLevel -sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 -from utils.templates import Templates -# from ..utils.templates import Templates -# from AmazonSpider.pyspark_job.utils.templates import Templates -from pyspark.sql.types import StringType -# 分组排序的udf窗口函数 -from pyspark.sql.window import Window -from pyspark.sql import functions as F - - -class DeAsinImage(Templates): - - def __init__(self): - super().__init__() - self.site_name = "de" - self.db_save = f"tmp_asin_image_copy" - self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") - self.df_save = self.spark.sql(f"select 1+1;") - self.df1 = self.spark.sql(f"select 1+1;") - self.df2 = self.spark.sql(f"select 1+1;") - self.df = self.spark.sql(f"select 1+1;") - self.partitions_by = ['site_name'] - self.reset_partitions(partitions_num=1) - - def read_data(self): - sql1 = f""" - select - *, - 1 as flag - from - tmp_asin_image - where - site_name = 'de1'; - """ - - sql2 = f""" - select - *, - 2 as flag - from - tmp_asin_image - where - site_name = 'de'; - """ - self.df1 = self.spark.sql(sqlQuery=sql1).cache() - self.df2 = self.spark.sql(sqlQuery=sql2).cache() - - def handle_data(self): - self.df = self.df1.unionAll(self.df2) - df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc()) - self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window)) - self.df_save = self.df.filter("rk = 1") - self.df_save = self.df_save.drop("flag").drop("rk") - -if __name__ == "__main__": - handle_obj = DeAsinImage() - handle_obj.run() \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/z_us_asin_image.py b/Pyspark_job/sqoop_ct/z_us_asin_image.py deleted file mode 100644 index f3bf7a2..0000000 --- a/Pyspark_job/sqoop_ct/z_us_asin_image.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import sys - -from pyspark.storagelevel import StorageLevel -sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 -from utils.templates import Templates -# from ..utils.templates import Templates -# from AmazonSpider.pyspark_job.utils.templates import Templates -from pyspark.sql.types import StringType -# 分组排序的udf窗口函数 -from pyspark.sql.window import Window -from pyspark.sql import functions as F - -class UsAsinImage(Templates): - - def __init__(self): - super().__init__() - self.site_name = "uk" - self.db_save = f"tmp_asin_image_lzo" - self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") - self.df_save = self.spark.sql(f"select 1+1;") - self.df1 = self.spark.sql(f"select 1+1;") - self.df2 = self.spark.sql(f"select 1+1;") - self.df = self.spark.sql(f"select 1+1;") - self.partitions_by = ['site_name'] - self.reset_partitions(partitions_num=1) - - def read_data(self): - sql1 = f""" - select - *, - 1 as flag - from - tmp_asin_image_copy - where - site_name = 'us14' - limit 10; - """ - - sql2 = f""" - select - *, - 2 as flag - from - tmp_asin_image_copy - where - site_name = 'us6' - limit 10; - """ - self.df1 = self.spark.sql(sqlQuery=sql1).cache() - self.df2 = self.spark.sql(sqlQuery=sql2).cache() - - self.df = self.df1.unionAll(self.df2) - df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.desc()) - self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window)) - self.df_save = self.df.filter("rk = 1") - self.df_save.withColumn("site_name",F.lit("us")) - self.df_save = self.df_save.drop("flag").drop("rk").show() - - - -if __name__ == "__main__": - obj = UsAsinImage() - obj.read_data() diff --git a/Pyspark_job/sqoop_ct/z_us_asin_image_mysql_to_pg.py b/Pyspark_job/sqoop_ct/z_us_asin_image_mysql_to_pg.py deleted file mode 100644 index 1f5aa9d..0000000 --- a/Pyspark_job/sqoop_ct/z_us_asin_image_mysql_to_pg.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - site_name = "us" - - hive_tb = f"tmp_asin_image" - - partition_dict = { - "site_name": "us6", - } - hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) - print(f"hdfs_path is {hdfs_path}") - - query = f""" - select - asin, - img_url, - img_order_by, - created_at, - updated_at, - data_type - from {site_name}_asin_image - where 1 = 1 - and \$CONDITIONS - """ - print(query) - db_type = "postgresql" - empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - site_name=site_name, - query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - if not empty_flag: - sh = CommonUtil.build_import_sh(site_name=site_name, - db_type=db_type, - query=query, - hdfs_path=hdfs_path) - - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - client.close() - - #导入后检测--检测数据一致性 - CommonUtil.check_import_sync_num(db_type=db_type, - partition_dict=partition_dict, - import_query=query, - hive_tb_name=hive_tb, - msg_usr=['chenyuanjie'] - ) - - - # # 导出到pg数据库 - # db_type = "postgresql" - # export_tb = f"{site_name}_asin_image_copy" - # - # # 导出表名 - # sh = CommonUtil.build_export_sh( - # site_name=site_name, - # db_type=db_type, - # hive_tb="tmp_asin_image_copy", - # export_tb=export_tb, - # col=[ - # "asin", - # "img_url", - # "img_order_by", - # "created_at", - # "updated_at", - # "data_type" - # ], - # partition_dict={ - # "site_name": site_name - # } - # ) - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # client.close() - - pass diff --git a/Pyspark_job/sqoop_ct/z_us_bs_category_asin_mysql_to_pg.py b/Pyspark_job/sqoop_ct/z_us_bs_category_asin_mysql_to_pg.py deleted file mode 100644 index cf916cc..0000000 --- a/Pyspark_job/sqoop_ct/z_us_bs_category_asin_mysql_to_pg.py +++ /dev/null @@ -1,87 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - - site_name = "us" - # - # hive_tb = "tmp_bs_category_asin" - # - # hdfs_path = CommonUtil.build_hdfs_path(hive_tb) - # print(f"hdfs_path is {hdfs_path}") - # - # query = """ - # select - # asin, - # cate_1_id, - # cate_current_id, - # week, - # `year_month`, - # created_at, - # updated_at - # from us_bs_category_asin - # where 1 = 1 - # and \$CONDITIONS - # """ - # print(query) - # db_type = "mysql" - # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, - # site_name=site_name, - # query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - # - # if not empty_flag: - # sh = CommonUtil.build_import_sh(site_name=site_name, - # db_type=db_type, - # query=query, - # hdfs_path=hdfs_path) - # # 导入前先删除 - # HdfsUtils.delete_hdfs_file(hdfs_path) - # client = SSHUtil.get_ssh_client() - # SSHUtil.exec_command_async(client, sh, ignore_err=False) - # CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) - # client.close() - # - # # 导入后检测--检测数据一致性 - # CommonUtil.check_import_sync_num(db_type=db_type, - # import_query=query, - # hive_tb_name=hive_tb, - # msg_usr=['chenyuanjie'] - # ) - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = "us_bs_category_asin" - - sh = CommonUtil.build_export_sh( - site_name=site_name, - db_type=db_type, - hive_tb="tmp_bs_category_asin", - export_tb=export_tb, - col=[ - "asin", - "cate_1_id", - "cate_current_id", - "week", - "year_month", - "created_at", - "updated_at" - ], - partition_dict={ - "site_name": site_name, - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file diff --git a/Pyspark_job/sqoop_ct/z_us_st_year_week.py b/Pyspark_job/sqoop_ct/z_us_st_year_week.py deleted file mode 100644 index 266374b..0000000 --- a/Pyspark_job/sqoop_ct/z_us_st_year_week.py +++ /dev/null @@ -1,35 +0,0 @@ -import os -import sys - -sys.path.append(os.path.dirname(sys.path[0])) -from utils.ssh_util import SSHUtil -from utils.common_util import CommonUtil -from utils.hdfs_utils import HdfsUtils -from utils.db_util import DBUtil - - -if __name__ == '__main__': - - # 导出到pg数据库 - db_type = "postgresql" - export_tb = f"us_st_year_week" - - sh = CommonUtil.build_export_sh( - site_name="us", - db_type=db_type, - hive_tb="dim_st_year_week", - export_tb=export_tb, - col=[ - "search_term", - "st_key", - "year_week" - ], - partition_dict={ - "site_name": "us", - } - ) - client = SSHUtil.get_ssh_client() - SSHUtil.exec_command_async(client, sh, ignore_err=False) - client.close() - - pass \ No newline at end of file -- libgit2 0.26.0