Commit 6d8c9baa by fangxingjun

no message

parent f966e87d
...@@ -18,6 +18,27 @@ from utils.db_util import DbTypes, DBUtil ...@@ -18,6 +18,27 @@ from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil from utils.common_util import CommonUtil
from datetime import datetime, timedelta from datetime import datetime, timedelta
from utils.hdfs_utils import HdfsUtils from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
# 数字/虚拟类目排除列表 — 筛选条件用
EXCLUDE_CATEGORIES = (
"audible", "books", "digital-text", "dmusic",
"mobile-apps", "movies-tv", "music", "software", "videogames"
)
# asin_is_need 判断中需要过滤的类目(对齐 dwt.handle_asin_is_hide)
NEED_FILTER_CATEGORIES = (
'mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text',
'magazines', 'movies-tv', 'software', 'videogames',
'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed'
)
# asin_is_hide 中 category_id 级别的隐藏节点
HIDE_CATEGORY_IDS = (
'21393128011', '21377129011', '21377127011',
'21377130011', '21388218011', '21377132011'
)
class DwdNsrBsrKeepaAsin(Templates): class DwdNsrBsrKeepaAsin(Templates):
...@@ -37,6 +58,10 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -37,6 +58,10 @@ class DwdNsrBsrKeepaAsin(Templates):
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5) self.reset_partitions(partitions_num=5)
self.get_date_info_tuple() self.get_date_info_tuple()
self.engine_mysql = get_remote_engine(
site_name=self.site_name,
db_type='mysql'
)
@staticmethod @staticmethod
def get_date_30_days_ago(date_str: str, date_format: str = "%Y-%m-%d") -> str: def get_date_30_days_ago(date_str: str, date_format: str = "%Y-%m-%d") -> str:
...@@ -70,28 +95,89 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -70,28 +95,89 @@ class DwdNsrBsrKeepaAsin(Templates):
else: else:
thirty_days_ago = '' thirty_days_ago = ''
print(f"1.1 读取最近30天的bsr的asin") print(f"1.1 读取最近30天的bsr的asin")
sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_bsr = f"select asin, date_info, 1 as asin_cate_flag, category_id from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month': if self.date_type == 'month':
sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}" sql_bsr = f"select asin, date_info, 1 as asin_cate_flag, category_id from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_bsr:", sql_bsr) print("sql_bsr:", sql_bsr)
self.df_asin_bsr = self.spark.sql(sqlQuery=sql_bsr).cache() self.df_asin_bsr = self.spark.sql(sqlQuery=sql_bsr).cache()
self.df_asin_bsr.show(10, truncate=False) self.df_asin_bsr.show(10, truncate=False)
print(f"1.2 读取最近30天的nsr的asin") print(f"1.2 读取最近30天的nsr的asin")
sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_nsr = f"select asin, date_info, 2 as asin_cate_flag, category_id from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month': if self.date_type == 'month':
sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}" sql_nsr = f"select asin, date_info, 2 as asin_cate_flag, category_id from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_nsr:", sql_nsr) print("sql_nsr:", sql_nsr)
self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache() self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache()
self.df_asin_nsr.show(10, truncate=False) self.df_asin_nsr.show(10, truncate=False)
print(f"1.3 读取最近30天的keepa的asin") print(f"1.3 读取最近30天的keepa的asin")
sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month': if self.date_type == 'month':
sql_keepa = f"select asin, date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}" sql_keepa = f"select asin, date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_keepa:", sql_keepa) print("sql_keepa:", sql_keepa)
self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache() self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache()
self.df_asin_keepa.show(10, truncate=False) self.df_asin_keepa.show(10, truncate=False)
print(f"2.1 读取最近bsr分类树")
sql_bsr = f"select category_id, category_first_id, lower(en_name) as en_name from dim_bsr_category_tree WHERE site_name ='{self.site_name}';"
self.df_bsr_tree = self.spark.sql(sqlQuery=sql_bsr).cache()
self.df_bsr_tree.show(10, truncate=False)
print(f"2.2 读取最近nsr分类树")
sql_nsr = f"select category_id, category_first_id, lower(en_name) as en_name from dim_nsr_category_tree WHERE site_name ='{self.site_name}';"
self.df_nsr_tree = self.spark.sql(sqlQuery=sql_nsr).cache()
self.df_nsr_tree.show(10, truncate=False)
print(f"2.3 从mysql读取隐藏分类")
sql_hide = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"
pdf_hide_cate = self.engine_mysql.read_sql(sql_hide)
if pdf_hide_cate.shape[0]:
schema = StructType([
StructField('category_id', StringType(), True),
StructField('hide_flag', IntegerType(), True),
])
self.df_hide_cate = self.spark.createDataFrame(pdf_hide_cate, schema=schema).cache()
print(f"self.df_hide_cate: {self.df_hide_cate.count()}")
self.df_hide_cate.show(10, truncate=False)
def handle_data_hide_category(self):
df_bsr_cate = self.df_asin_bsr.select("asin", "asin_cate_flag", "category_id", "date_info")
df_bsr_cate = df_bsr_cate.join(self.df_bsr_tree, on='category_id', how='left').join(self.df_hide_cate, on='category_id', how='left')
df_bsr_cate.show(10, truncate=False)
df_nsr_cate = self.df_asin_nsr.select("asin", "asin_cate_flag", "category_id", "date_info")
df_nsr_cate = df_nsr_cate.join(self.df_nsr_tree, on='category_id', how='left').join(self.df_hide_cate, on='category_id', how='left')
df_nsr_cate.show(10, truncate=False)
df_keepa_cate = self.df_asin_keepa.select("asin", "asin_cate_flag", "category_id", "date_info")
df_union_cate = df_bsr_cate.unionByName(df_nsr_cate).unionByName(df_keepa_cate, allowMissingColumns=True)
df_union_cate.show(10, truncate=False)
print(f"df_union_cate-去重前: {df_union_cate.count()}")
window = Window.partitionBy(['asin']).orderBy(F.asc_nulls_last("asin_cate_flag"))
df_union_cate = df_union_cate.withColumn(
"rk", F.row_number().over(window=window)
).filter("rk=1").drop("rk").cache()
df_union_cate.show(10, truncate=False)
print(f"df_union_cate-去重后: {df_union_cate.count()}")
df_union_cate = df_union_cate.withColumn(
"hide_first",
F.when(F.col("category_first_id").isin(*NEED_FILTER_CATEGORIES), 1).otherwise(0)
)
df_union_cate = df_union_cate.withColumn(
"hide_flag",
F.coalesce(F.col("hide_flag"), F.lit(0))
)
df_union_cate = df_union_cate.withColumn(
"asin_type",
F.when(F.col("hide_first")==1, 1).when(F.col('hide_first')==1, 2).otherwise(0)
)
df_union_cate.show(10, truncate=False)
df_union_cate.groupby(["asin_type"]).agg(
F.count("asin")
).show(10)
print(f"最终满足要导出的数量为: {df_union_cate.filter('asin_type=0').count()}")
df_union_cate = df_union_cate.select("asin", "category_first_id", "asin_type")
return df_union_cate
def handle_data(self): def handle_data(self):
df_union_cate = self.handle_data_hide_category()
df_bsr = self.df_asin_bsr.select("asin", "asin_cate_flag") df_bsr = self.df_asin_bsr.select("asin", "asin_cate_flag")
df_nsr = self.df_asin_nsr.select("asin", "asin_cate_flag") df_nsr = self.df_asin_nsr.select("asin", "asin_cate_flag")
df_keepa = self.df_asin_keepa.select("asin", "asin_cate_flag") df_keepa = self.df_asin_keepa.select("asin", "asin_cate_flag")
...@@ -159,6 +245,8 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -159,6 +245,8 @@ class DwdNsrBsrKeepaAsin(Templates):
self.df_save = self.df_save.join( self.df_save = self.df_save.join(
df_result, on=['asin'], how='left' df_result, on=['asin'], how='left'
).join(
df_union_cate, on=['asin'], how='left'
) )
self.df_save.show(10, truncate=False) self.df_save.show(10, truncate=False)
# print(self.df_save.count()) # print(self.df_save.count())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment