import os import random import sys import time import traceback import pandas as pd from pyspark.storagelevel import StorageLevel sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates from pyspark.sql.types import StructType, StructField, IntegerType, StringType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from utils.db_util import DbTypes, DBUtil from utils.common_util import CommonUtil class DwdAsinToPg(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwd_asin_to_pg" self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_save = self.spark.sql(f"select 1+1;") self.df_st_asin_today = self.spark.sql(f"select 1+1;") self.df_st_asin_last_5_day = self.spark.sql(f"select 1+1;") self.df_asin_variation = self.spark.sql(f"select 1+1;") self.df_asin_stable = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.date_info_tuple = "('2022-11-02')" self.date_info_tuple_last_5_day = "('2022-11-02')" self.reset_partitions(partitions_num=1) # self.date_info_tuple = ('2022-11-01', '2022-11-02', '2022-11-03', '2022-11-04', '2022-11-05', '2022-11-06') self.date_today = '' self.date_last_5_day_tuple = tuple() self.get_date_info_tuple() self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name) self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=self.site_name) def truncate_or_update_table_syn(self): table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" year, month = self.date_info.split("-") sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;" df = pd.read_sql(sql, con=self.engine_mysql) print("sql:", sql, df.shape) if list(df.st_count)[0] >= 1_0000: # sql = f"select asin from {self.site_name}_all_syn_st_month_{year} where date_info='{self.date_info}'" sql = f"select asin from {table} where date_info='{self.date_info}';" print("sql:", sql, df.shape) pdf_asin = pd.read_sql(sql, con=self.engine_pg14) schema = StructType([ StructField('asin', StringType(), True), ]) df_asin = self.spark.createDataFrame(pdf_asin, schema=schema) # self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti") df_save_alias = self.df_save.alias("df_save") df_asin_alias = df_asin.alias("df_asin") self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti") self.df_save.show(10, truncate=False) print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}") else: while True: try: with self.engine_pg14.begin() as conn: sql_truncate = f"truncate {table};" print("月搜索词没有导入进来, 需要先清空表, sql_truncate:", sql_truncate) conn.execute(sql_truncate) break except Exception as e: print(e, traceback.format_exc()) time.sleep(random.randint(3, 10)) self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name) continue def truncate_or_update_table_syn_old(self): table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}" if site_name == 'us': year, month = self.date_info.split("-") sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;" # else: # year, week = self.date_info.split("-") # sql = f"select count(*) from {self.site_name}_brand_analytics_{year} where week={week};" df = pd.read_sql(sql, con=self.engine_mysql) print("sql:", sql, df.shape) if list(df.st_count)[0] >= 100_0000: sql = f"select asin from us_all_syn_st_month_{year} where date_info='{self.date_info}'" pdf_asin = pd.read_sql(sql, con=self.engine_pg14) schema = StructType([ StructField('asin', StringType(), True), ]) df_asin = self.spark.createDataFrame(pdf_asin, schema=schema) # self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti") df_save_alias = self.df_save.alias("df_save") df_asin_alias = df_asin.alias("df_asin") self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti") self.df_save.show(10, truncate=False) print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}") else: while True: try: with self.engine_pg14.begin() as conn: sql_truncate = f"truncate {table};" print("sql_truncate:", sql_truncate) conn.execute(sql_truncate) break except Exception as e: print(e, traceback.format_exc()) time.sleep(random.randint(3, 10)) self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name) continue else: while True: try: with self.engine_pg14.begin() as conn: sql_truncate = f"truncate {table};" print("sql_truncate:", sql_truncate) conn.execute(sql_truncate) break except Exception as e: print(e, traceback.format_exc()) time.sleep(random.randint(3, 10)) self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name) continue def get_date_info_tuple(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;") df = self.df_date.toPandas() if self.date_type == 'day': df_today = df.loc[df.date == f'{self.date_info}'] id_today = list(df_today.id)[0] id_last_5_day = id_today - 4 print("id_today, id_last_5_day:", id_today, id_last_5_day) df_last_5_day = df.loc[(df.id < id_today) & (df.id >= id_last_5_day)] self.date_last_5_day_tuple = tuple(df_last_5_day.date) def read_data(self): # 测试月流程用 # sql = f"select asin from us_all_syn_st_month_{2024} where date_info='{2023-12}' limit 100000" # print("sql===:", sql) # pdf_asin = pd.read_sql(sql, con=self.engine_pg14) # schema = StructType([ # StructField('asin', StringType(), True), # ]) # df_asin = self.spark.createDataFrame(pdf_asin, schema=schema) # df_asin.show(10, truncate=False) if self.date_type == 'day': print("1.1 读取dim_st_asin_info表(当前日)") sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" print("sql:", sql) self.df_st_asin_today = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin_today.show(10, truncate=False) self.df_st_asin_today = self.df_st_asin_today.drop_duplicates(["asin"]) print("self.df_st_asin_today:", self.df_st_asin_today.count()) print("1.2 读取dim_st_asin_info表(当前日的前6天)") sql = f"select asin, 1 as asin_isin_flag from {self.db_save} where site_name='{self.site_name}' and date_type='{self.date_type}' " \ f"and date_info in {self.date_last_5_day_tuple} and date_info >= '2022-11-02';" print("sql:", sql) self.df_st_asin_last_5_day = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin_last_5_day.show(10, truncate=False) print("self.df_st_asin_last_5_day去重前:", self.df_st_asin_last_5_day.count()) self.df_st_asin_last_5_day = self.df_st_asin_last_5_day.drop_duplicates(["asin"]) print("self.df_st_asin_last_5_day去重后:", self.df_st_asin_last_5_day.count()) else: sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin = self.df_st_asin.drop_duplicates(["asin"]) print("self.df_st_asin.count:", self.df_st_asin.count()) print("2. 读取dim_asin_variation_info表") sql = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";""" self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache() self.df_asin_variation.show(10, truncate=False) self.df_asin_variation = self.df_asin_variation.drop_duplicates(["asin"]) print("3. 读取dim_asin_stable_info表") sql = f"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";""" self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache() self.df_asin_stable.show(10, truncate=False) self.df_asin_stable = self.df_asin_stable.drop_duplicates(["asin"]) def handle_data(self): if self.date_type == 'day': if self.date_info >= '2022-11-02': self.df_st_asin_today = self.df_st_asin_today.join( self.df_st_asin_last_5_day, on='asin', how='left' ) print("self.df_st_asin_today合并:", self.df_st_asin_today.count()) self.df_st_asin_today = self.df_st_asin_today.filter("asin_isin_flag is null") print("self.df_st_asin_today新出现:", self.df_st_asin_today.count()) self.df_save = self.df_st_asin_today.join( self.df_asin_variation, on='asin', how='left' ) else: self.df_save = self.df_st_asin.join( self.df_asin_variation, on='asin', how='left' ).join( self.df_asin_stable, on='asin', how='left' ) self.df_save = self.df_save.drop("asin_isin_flag") self.handle_temp() self.truncate_or_update_table_syn() # 清空表/更新表數據 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save = self.df_save.fillna({"asin_is_variation": 0}) self.df_save.show(10, truncate=False) print("self.df_save.count:", self.df_save.count()) users = ["fangxingjun", "chenyuanjie", "pengyanbing"] title = f"dwd_asin_to_pg: {self.site_name}, {self.date_type}, {self.date_info}" content = f"整合asin完成--等待导出到pg提供爬虫使用--self.df_save.count: {self.df_save.count()}" CommonUtil().send_wx_msg(users=users, title=title, content=content) # quit() def handle_temp(self): if self.site_name == 'us' and self.date_type == 'week' and self.date_info == '2023-44': sql = f"select asin from dwd_asin_to_pg where site_name='{self.site_name}' and date_type='month' and date_info='2023-11';" print("sql:", sql) self.df_asin_month = self.spark.sql(sqlQuery=sql).cache() self.df_asin_month = self.df_asin_month.drop_duplicates(["asin"]) self.df_save = self.df_save.withColumn("data_type", F.lit(100)) # self.df_save = self.df_save.join( # self.df_asin_month, on='asin' # ) result_df = self.df_asin_month.join(self.df_save, self.df_asin_month.asin == self.df_save.asin, "left_anti") print("result_df.count:", result_df.count()) # 确保两个 DataFrame 有相同的列 columns1 = self.df_save.columns columns2 = result_df.columns print(f"columns1:{columns1}, columns2:{columns2}") # 为 df1 添加在 df2 中存在但 df1 中缺失的列 for col in set(columns2) - set(columns1): self.df_save = self.df_save.withColumn(col, F.lit(None)) # 为 df2 添加在 df1 中存在但 df2 中缺失的列 for col in set(columns1) - set(columns2): result_df = result_df.withColumn(col, F.lit(None)) # self.df_save = self.df_save.join( # result_df # ) print("self.df_save.count11:", self.df_save.count()) self.df_save = self.df_save.unionByName(result_df) print("self.df_save.count22:", self.df_save.count()) self.site_name = "us" self.date_type = "month" self.date_info = "2023-11" else: self.df_save = self.df_save.withColumn("data_type", F.lit(1)) if __name__ == "__main__": site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdAsinToPg(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()