Commit 25966c8b by fangxingjun

no message

parent 76fe262e
...@@ -37,6 +37,7 @@ class DwtAsinSync(Templates): ...@@ -37,6 +37,7 @@ class DwtAsinSync(Templates):
super().__init__() super().__init__()
self.site_name = site_name self.site_name = site_name
self.date_type = date_type self.date_type = date_type
self.date_type = self.judge_date_type()
self.date_info = date_info self.date_info = date_info
self.db_save = f"dwt_asin_sync" self.db_save = f"dwt_asin_sync"
self.spark = self.create_spark_object( self.spark = self.create_spark_object(
...@@ -45,60 +46,67 @@ class DwtAsinSync(Templates): ...@@ -45,60 +46,67 @@ class DwtAsinSync(Templates):
self.df_asin_from_st = self.spark.sql(f"select 1+1;") self.df_asin_from_st = self.spark.sql(f"select 1+1;")
self.df_asin_from_adv = self.spark.sql(f"select 1+1;") self.df_asin_from_adv = self.spark.sql(f"select 1+1;")
self.df_asin_from_day = self.spark.sql(f"select 1+1;") self.df_asin_from_day = self.spark.sql(f"select 1+1;")
self.engine_pg = get_remote_engine( self.df_asin_variation = self.spark.sql(f"select 1+1;")
self.df_asin_stable = self.spark.sql(f"select 1+1;")
self.df_asin_syn = self.spark.sql(f"select 1+1;")
self.engine_pg14 = get_remote_engine(
site_name=self.site_name, site_name=self.site_name,
db_type='postgresql_14' db_type='postgresql_14'
) )
self.engine_mysql = get_remote_engine(
site_name=self.site_name,
db_type='mysql'
)
self.date_info_tuple = self.get_date_info_tuple() self.date_info_tuple = self.get_date_info_tuple()
self.table_syn = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
def judge_date_type(self): def judge_date_type(self):
if self.date_type in ["month", "month_week"]: if self.date_type in ["month", "month_week"]:
sql_read = 1 year, month = self.date_info.split("-")
sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
df = pd.read_sql(sql, con=self.engine_mysql)
print("sql:", sql, df.shape)
if list(df.st_count)[0] >= 100_0000:
self.date_type = "month" # 追加syn爬虫表
else:
self.date_type = "month_week" # 清空syn爬虫表
return self.date_type
def read_data_common(self, sql, content): def read_data_common(self, sql, content):
print(f"sql: {sql}, content: {content}")
df = self.spark.sql(sqlQuery=sql).drop_duplicates(["asin"]).cache() df = self.spark.sql(sqlQuery=sql).drop_duplicates(["asin"]).cache()
df.show(10, truncate=False) df.show(10, truncate=False)
print(f"sql: {sql}, content: {content}, count: {df.count()}")
return df return df
def read_data(self): def read_data(self):
# 读取asin数据源
sql_asin_from_st = f"select asin, asin_flag as 1 from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" sql_asin_from_st = f"select asin, asin_flag as 1 from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
print(f"sql_asin_from_st: {sql_asin_from_st}") self.df_asin_from_st = self.read_data_common(sql=sql_asin_from_st, content="1.1. 读取dim_st_asin_info表的st-asin数据")
self.df_asin_from_st = self.spark.sql(sqlQuery=sql_asin_from_st).cache()
self.df_asin_from_st = self.df_asin_from_st.drop_duplicates(["asin"])
self.df_asin_from_st.show(10, truncate=False)
sql_asin_from_adv = f"select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='month_week' and date_info='{self.date_info}'" sql_asin_from_adv = f"select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='month_week' and date_info='{self.date_info}'"
print(f"sql_asin_from_adv: {sql_asin_from_adv}") self.df_asin_from_adv = self.read_data_common(sql=sql_asin_from_adv, content="1.2. 读取dwt_asin_related_traffic表的asin广告位数据")
self.df_asin_from_adv = self.spark.sql(sqlQuery=sql_asin_from_adv).cache()
# 解析关联asin字段
self.df_asin_from_adv = self.df_asin_from_adv.withColumn( self.df_asin_from_adv = self.df_asin_from_adv.withColumn(
'asin', F.explode(F.split(F.col('related_asin'), ',')) 'asin', F.explode(F.split(F.col('related_asin'), ','))
).select('asin').drop_duplicates(['asin']) ).select('asin').drop_duplicates(['asin'])
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2)) self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2))
self.df_asin_from_adv.show(10, truncate=False)
if self.date_info == 'day': if self.date_info == 'day':
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
else: else:
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}" sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}"
print(f"sql_asin_from_day: {sql_asin_from_day}") self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="1.3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
self.df_asin_from_day = self.spark.sql(sqlQuery=sql_asin_from_adv).cache() # 读取asin属性值
self.df_asin_from_day = self.df_asin_from_day.drop_duplicates(["asin"]) sql_asin_variation = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
self.df_asin_from_day.show(10, truncate=False) self.df_asin_variation = self.read_data_common(sql=sql_asin_variation, content="2.1 读取dim_asin_variation_info表的asin变体属性")
sql_asin_stable = f"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
self.df_asin_from_st = self.read_data_common(sql=sql_asin_from_st, content="1. 读取dim_st_asin_info表的st-asin数据") self.df_asin_stable = self.read_data_common(sql=sql_asin_stable, content="2.2 读取dim_asin_variation_info表的asin重量体积属性")
self.df_asin_from_adv = self.read_data_common(sql=sql_asin_from_adv, content="2. 读取dwt_asin_related_traffic表的asin广告位数据") # 读取syn爬虫表
self.df_asin_from_adv = self.df_asin_from_adv.withColumn( year, month = self.date_info.split("-")
'asin', F.explode(F.split(F.col('related_asin'), ',')) sql_asin_syn = f"select asin from us_all_syn_st_month_{year} where date_info='{self.date_info}'"
).select('asin').drop_duplicates(['asin']) pdf_asin = pd.read_sql(sql_asin_syn, con=self.engine_pg14)
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2)) schema = StructType([
if self.date_info == 'day': StructField('asin', StringType(), True),
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" ])
else: self.df_asin_syn = self.spark.createDataFrame(pdf_asin, schema=schema)
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}" self.df_asin_syn.show(10, truncate=False)
self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
def handle_data(self): def handle_data(self):
if self.date_type in ['month']: if self.date_type in ['month']:
...@@ -114,6 +122,22 @@ class DwtAsinSync(Templates): ...@@ -114,6 +122,22 @@ class DwtAsinSync(Templates):
if self.date_type == 'day': if self.date_type == 'day':
self.df_save = self.df_asin_from_day self.df_save = self.df_asin_from_day
window = Window.partitionBy(['asin']).orderBy(F.desc(f"asin_flag"))
self.df_save = self.df_save.withColumn(f"asin_flag_row_number", F.row_number().over(window)) \
.filter(f'asin_flag_row_number = 1')
self.df_save.groupby(f"asin_flag_count").agg(
F.count("asin").alias("asin_flag_count")
).show()
self.df_save = self.df_save.drop(f"asin_flag_row_number_percent_rank")
self.df_save = self.df_save.join(
self.df_asin_variation, on='asin', how='left'
).join(
self.df_asin_stable, on='asin', how='left'
)
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name)) self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
self.df_save = self.df_save.withColumn('date_type', F.lit(self.date_type)) self.df_save = self.df_save.withColumn('date_type', F.lit(self.date_type))
self.df_save = self.df_save.withColumn('date_info', F.lit(self.date_info)) self.df_save = self.df_save.withColumn('date_info', F.lit(self.date_info))
......
...@@ -21,6 +21,7 @@ def export_data(site_name, date_type, date_info): ...@@ -21,6 +21,7 @@ def export_data(site_name, date_type, date_info):
target_table = f'{site_name}_self_rating_180day' target_table = f'{site_name}_self_rating_180day'
sql_drop = f"drop table if exists {import_table};" sql_drop = f"drop table if exists {import_table};"
sql_create = f"create table {import_table} like {target_table};" sql_create = f"create table {import_table} like {target_table};"
sql_create = f"""CREATE TABLE {import_table} (LIKE {target_table} INCLUDING ALL);""" # pg
engine.execute(sql_drop) engine.execute(sql_drop)
engine.execute(sql_create) engine.execute(sql_create)
print(f"sql_drop: {sql_drop}") print(f"sql_drop: {sql_drop}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment