Commit 814141c3 by chenyuanjie

流量选品-卖家所在地增加HK和TW类型

parent 332d6eb2
...@@ -202,8 +202,8 @@ class DwtFlowAsin(Templates): ...@@ -202,8 +202,8 @@ class DwtFlowAsin(Templates):
print("5.获取dim_fd_asin_info,得到卖家相关信息") print("5.获取dim_fd_asin_info,得到卖家相关信息")
if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'):
sql = f""" sql = f"""
select fd_unique as account_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info select fd_unique as account_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name""" from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null"""
else: else:
sql = f""" sql = f"""
select account_id, account_name, seller_country_name, asin select account_id, account_name, seller_country_name, asin
...@@ -384,14 +384,40 @@ class DwtFlowAsin(Templates): ...@@ -384,14 +384,40 @@ class DwtFlowAsin(Templates):
# 处理配送方式、卖家所在地以及卖家所在地类型 # 处理配送方式、卖家所在地以及卖家所在地类型
def handle_seller_country(self): def handle_seller_country(self):
if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'):
self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['account_id'], how='left') # df1: account_id + seller_country_name 去重
df_seller_country = self.df_fd_asin_info.select('account_id', 'seller_country_name').dropDuplicates(['account_id'])
# df2: asin 去重,保留最新的 account_id + account_name
window = Window.partitionBy('asin').orderBy(F.col('updated_at').desc())
df_asin_account = self.df_fd_asin_info.select('asin', 'account_id', 'account_name', 'updated_at') \
.withColumn('rank', F.row_number().over(window)) \
.filter(F.col('rank') == 1) \
.drop('rank', 'updated_at') \
.withColumnRenamed('account_id', 'fd_account_id') \
.withColumnRenamed('account_name', 'fd_account_name')
# 1. 关联df2,用于填充dim表中account_id和account_name为空的情况
self.df_asin_detail = self.df_asin_detail.join(df_asin_account, on=['asin'], how='left')
# 2. 优先使用dim表中的数据(已从seller_json提取),为空则使用df2的数据
self.df_asin_detail = self.df_asin_detail.withColumn(
"account_id", F.coalesce(F.col("account_id"), F.col("fd_account_id"))
).withColumn(
"account_name", F.coalesce(F.col("account_name"), F.col("fd_account_name"))
).drop("fd_account_id", "fd_account_name")
# 3. 关联df1获取seller_country_name
self.df_asin_detail = self.df_asin_detail.join(df_seller_country, on=['account_id'], how='left')
else: else:
self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name") self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name")
self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left')
self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type", F.expr(""" self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type", F.expr("""
CASE WHEN asin_buy_box_seller_type = 1 THEN 4 CASE WHEN asin_buy_box_seller_type = 1 THEN 4
WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1
WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 ELSE 3 END""")) WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2
WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%HK%' THEN 5
WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%TW%' THEN 6
ELSE 3 END"""))
self.df_fd_asin_info.unpersist() self.df_fd_asin_info.unpersist()
# 处理asin的lqs评分 # 处理asin的lqs评分
......
...@@ -87,6 +87,8 @@ class KafkaFlowAsinDetail(Templates): ...@@ -87,6 +87,8 @@ class KafkaFlowAsinDetail(Templates):
# DataFrame初始化 # DataFrame初始化
self.df_previous_flow_asin = self.spark.sql("select 1+1;") self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;") self.df_seller_info = self.spark.sql("select 1+1;")
self.df_seller_country = self.spark.sql("select 1+1;")
self.df_asin_seller = self.spark.sql("select 1+1;")
self.df_self_asin_info = self.spark.sql("select 1+1;") self.df_self_asin_info = self.spark.sql("select 1+1;")
self.df_alarm_brand_info = self.spark.sql("select 1+1;") self.df_alarm_brand_info = self.spark.sql("select 1+1;")
self.df_asin_label_info = self.spark.sql("select 1+1;") self.df_asin_label_info = self.spark.sql("select 1+1;")
...@@ -302,11 +304,26 @@ class KafkaFlowAsinDetail(Templates): ...@@ -302,11 +304,26 @@ class KafkaFlowAsinDetail(Templates):
df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json)) df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn( df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
"account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed") "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed")
df = df.join(self.df_seller_info, on=['seller_id'], how='left')
# 1. 关联全局缓存的df_asin_seller,用于填充seller_id为空的情况
df = df.join(self.df_asin_seller, on=['asin'], how='left')
# 2. 优先使用kafka消息中的seller_id,为空则使用df_asin_seller的数据
df = df.withColumn(
"seller_id", F.coalesce(F.col("seller_id"), F.col("fd_seller_id"))
).withColumn(
"account_name", F.coalesce(F.col("account_name"), F.col("fd_account_name"))
).drop("fd_seller_id", "fd_account_name")
# 3. 关联全局缓存的df_seller_country获取seller_country_name
df = df.join(self.df_seller_country, on=['seller_id'], how='left')
df = df.withColumn("site_name_type", F.expr(""" df = df.withColumn("site_name_type", F.expr("""
CASE WHEN buy_box_seller_type = 1 THEN 4 CASE WHEN buy_box_seller_type = 1 THEN 4
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%HK%' THEN 5
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%TW%' THEN 6
ELSE 3 END""")) ELSE 3 END"""))
return df return df
...@@ -701,12 +718,23 @@ class KafkaFlowAsinDetail(Templates): ...@@ -701,12 +718,23 @@ class KafkaFlowAsinDetail(Templates):
self.df_previous_flow_asin.show(10, truncate=False) self.df_previous_flow_asin.show(10, truncate=False)
print("2. 获取卖家相关信息") print("2. 获取卖家相关信息")
sql = f""" sql = f"""
select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info_30day select fd_unique as seller_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null group by fd_unique, fd_country_name""" from dim_fd_asin_info_30day where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null"""
print("sql=", sql) print("sql=", sql)
self.df_seller_info = self.spark.sql(sqlQuery=sql) self.df_seller_info = self.spark.sql(sqlQuery=sql)
self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_seller_info.show(10, truncate=False) self.df_seller_info.show(10, truncate=False)
# df_seller_country: seller_id + seller_country_name 去重(全局缓存)
self.df_seller_country = self.df_seller_info.select('seller_id', 'seller_country_name').dropDuplicates(['seller_id']).persist(StorageLevel.DISK_ONLY)
# df_asin_seller: asin 去重,保留最新的 seller_id + account_name(全局缓存)
window = Window.partitionBy('asin').orderBy(F.col('updated_at').desc())
self.df_asin_seller = self.df_seller_info.select('asin', 'seller_id', 'account_name', 'updated_at') \
.withColumn('rank', F.row_number().over(window)) \
.filter(F.col('rank') == 1) \
.drop('rank', 'updated_at') \
.withColumnRenamed('seller_id', 'fd_seller_id') \
.withColumnRenamed('account_name', 'fd_account_name') \
.persist(StorageLevel.DISK_ONLY)
print("3. 读取内部asin信息") print("3. 读取内部asin信息")
sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin""" sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin"""
print("sql=", sql) print("sql=", sql)
......
...@@ -86,6 +86,8 @@ class KafkaRankAsinDetail(Templates): ...@@ -86,6 +86,8 @@ class KafkaRankAsinDetail(Templates):
# DataFrame初始化 # DataFrame初始化
self.df_previous_flow_asin = self.spark.sql("select 1+1;") self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;") self.df_seller_info = self.spark.sql("select 1+1;")
self.df_seller_country = self.spark.sql("select 1+1;")
self.df_asin_seller = self.spark.sql("select 1+1;")
self.df_self_asin_info = self.spark.sql("select 1+1;") self.df_self_asin_info = self.spark.sql("select 1+1;")
self.df_alarm_brand_info = self.spark.sql("select 1+1;") self.df_alarm_brand_info = self.spark.sql("select 1+1;")
self.df_asin_label_info = self.spark.sql("select 1+1;") self.df_asin_label_info = self.spark.sql("select 1+1;")
...@@ -301,11 +303,26 @@ class KafkaRankAsinDetail(Templates): ...@@ -301,11 +303,26 @@ class KafkaRankAsinDetail(Templates):
df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json)) df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn( df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
"account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed") "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed")
df = df.join(self.df_seller_info, on=['seller_id'], how='left')
# 1. 关联全局缓存的df_asin_seller,用于填充seller_id为空的情况
df = df.join(self.df_asin_seller, on=['asin'], how='left')
# 2. 优先使用kafka消息中的seller_id,为空则使用df_asin_seller的数据
df = df.withColumn(
"seller_id", F.coalesce(F.col("seller_id"), F.col("fd_seller_id"))
).withColumn(
"account_name", F.coalesce(F.col("account_name"), F.col("fd_account_name"))
).drop("fd_seller_id", "fd_account_name")
# 3. 关联全局缓存的df_seller_country获取seller_country_name
df = df.join(self.df_seller_country, on=['seller_id'], how='left')
df = df.withColumn("site_name_type", F.expr(""" df = df.withColumn("site_name_type", F.expr("""
CASE WHEN buy_box_seller_type = 1 THEN 4 CASE WHEN buy_box_seller_type = 1 THEN 4
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%HK%' THEN 5
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%TW%' THEN 6
ELSE 3 END""")) ELSE 3 END"""))
return df return df
...@@ -700,12 +717,23 @@ class KafkaRankAsinDetail(Templates): ...@@ -700,12 +717,23 @@ class KafkaRankAsinDetail(Templates):
self.df_previous_flow_asin.show(10, truncate=False) self.df_previous_flow_asin.show(10, truncate=False)
print("2. 获取卖家相关信息") print("2. 获取卖家相关信息")
sql = f""" sql = f"""
select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info_30day select fd_unique as seller_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null group by fd_unique, fd_country_name""" from dim_fd_asin_info_30day where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null"""
print("sql=", sql) print("sql=", sql)
self.df_seller_info = self.spark.sql(sqlQuery=sql) self.df_seller_info = self.spark.sql(sqlQuery=sql)
self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_seller_info.show(10, truncate=False) self.df_seller_info.show(10, truncate=False)
# df_seller_country: seller_id + seller_country_name 去重(全局缓存)
self.df_seller_country = self.df_seller_info.select('seller_id', 'seller_country_name').dropDuplicates(['seller_id']).persist(StorageLevel.DISK_ONLY)
# df_asin_seller: asin 去重,保留最新的 seller_id + account_name(全局缓存)
window = Window.partitionBy('asin').orderBy(F.col('updated_at').desc())
self.df_asin_seller = self.df_seller_info.select('asin', 'seller_id', 'account_name', 'updated_at') \
.withColumn('rank', F.row_number().over(window)) \
.filter(F.col('rank') == 1) \
.drop('rank', 'updated_at') \
.withColumnRenamed('seller_id', 'fd_seller_id') \
.withColumnRenamed('account_name', 'fd_account_name') \
.persist(StorageLevel.DISK_ONLY)
print("3. 读取内部asin信息") print("3. 读取内部asin信息")
sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin""" sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin"""
print("sql=", sql) print("sql=", sql)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment