Commit fd30a28c by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents c007b3d1 aa8471e3
......@@ -98,6 +98,10 @@ class DwtStDetailWeek(object):
'dt_rank', F.row_number().over(window=window)
).filter('dt_rank=1').drop('dt_rank', 'updated_time').cache()
# 对数据列清洗 有些是\0的数据
for col in ['search_term', 'asin1', 'asin2', 'asin3', 'product_title1', 'product_title2', 'product_title3', 'brand1', 'brand2', 'brand3', 'category1', 'category2', 'category3']:
self.df_st_detail = self.df_st_detail.withColumn(col, F.regexp_replace(F.col(col), '\x00', ''))
self.df_st_detail_last_week = self.df_st_detail.filter(f"date_info = '{self.date_info_last_week}'")
for col in self.cols:
self.df_st_detail_last_week = self.df_st_detail_last_week.withColumnRenamed(
......@@ -105,7 +109,7 @@ class DwtStDetailWeek(object):
)
self.df_st_detail_last_week.cache()
print("1周前数据如下:")
self.df_st_detail_last_week.show(10, True)
# self.df_st_detail_last_week.show(10, True)
self.df_st_detail_last_4_week = self.df_st_detail.filter(f"date_info > '{self.date_info_4_week_ago}'")
for col in self.cols:
......@@ -114,7 +118,7 @@ class DwtStDetailWeek(object):
)
self.df_st_detail_last_4_week.cache()
print("近4周数据如下:")
self.df_st_detail_last_4_week.show(10, True)
# self.df_st_detail_last_4_week.show(10, True)
self.df_st_detail_4_week_ago = self.df_st_detail.filter(f"date_info = '{self.date_info_4_week_ago}'")
for col in self.cols:
......@@ -123,7 +127,7 @@ class DwtStDetailWeek(object):
)
self.df_st_detail_4_week_ago.cache()
print("4周前数据如下:")
self.df_st_detail_4_week_ago.show(10, True)
# self.df_st_detail_4_week_ago.show(10, True)
self.df_st_detail_12_week_ago = self.df_st_detail.filter(f"date_info = '{self.date_info_12_week_ago}'")
for col in self.cols:
......@@ -132,11 +136,11 @@ class DwtStDetailWeek(object):
)
self.df_st_detail_12_week_ago.cache()
print("12周前数据如下:")
self.df_st_detail_12_week_ago.show(10, True)
# self.df_st_detail_12_week_ago.show(10, True)
self.df_st_detail = self.df_st_detail.filter(f"date_info = '{self.date_info}'").cache()
print("本周数据如下:")
self.df_st_detail.show(10, True)
# self.df_st_detail.show(10, True)
sql = f"""
select rank, search_num as search_volume, rate as st_search_rate, date_info from ods_rank_search_rate_repeat where site_name = '{self.site_name}';
......@@ -147,14 +151,14 @@ class DwtStDetailWeek(object):
'date_info_rank', F.row_number().over(window=window)
).filter('date_info_rank=1').drop('date_info_rank', 'date_info').cache()
print("搜索词排名+搜索量+转化率如下:")
self.df_st_rank.show(10, True)
# self.df_st_rank.show(10, True)
sql = f"""
select st_key, search_term from ods_st_key where site_name = '{self.site_name}';
"""
self.df_st_key = self.spark.sql(sql).cache()
print("搜索词key如下:")
self.df_st_key.show(10, True)
# self.df_st_key.show(10, True)
sql = f"""
select search_term, st_bsr_cate_1_id_new as category_id, st_bsr_cate_current_id_new as category_current_id, market_cycle_type, date_info from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = 'month' and date_info <= '{self.year_month}';
......@@ -165,7 +169,7 @@ class DwtStDetailWeek(object):
'date_info_rank', F.row_number().over(window=window)
).filter('date_info_rank=1').drop('date_info_rank', 'date_info').cache()
print("分类、市场周期月数据如下:")
self.df_st_month.show(10, True)
# self.df_st_month.show(10, True)
# 从pgsql获取特殊字符匹配字典表:match_character_dict
pg_sql = f"""
......@@ -196,7 +200,7 @@ class DwtStDetailWeek(object):
'search_term', 'rank_change_1_week_ago', 'rank_rate_1_week_ago'
).cache()
print("1周前排名变化数据如下:")
self.df_st_detail_1_week_ago.show(10, True)
# self.df_st_detail_1_week_ago.show(10, True)
self.df_st_detail_2_week_ago = self.df_st_detail_week.filter(F.col('date_info') == self.date_info_2_week_ago).withColumnRenamed(
'rank_change_last_1_week', 'rank_change_2_week_ago'
......@@ -206,7 +210,7 @@ class DwtStDetailWeek(object):
'search_term', 'rank_change_2_week_ago', 'rank_rate_2_week_ago'
).cache()
print("2周前排名变化数据如下:")
self.df_st_detail_2_week_ago.show(10, True)
# self.df_st_detail_2_week_ago.show(10, True)
self.df_st_detail_3_week_ago = self.df_st_detail_week.filter(F.col('date_info') == self.date_info_3_week_ago).withColumnRenamed(
'rank_change_last_1_week', 'rank_change_3_week_ago'
......@@ -216,7 +220,7 @@ class DwtStDetailWeek(object):
'search_term', 'rank_change_3_week_ago', 'rank_rate_3_week_ago'
).cache()
print("3周前排名变化数据如下:")
self.df_st_detail_3_week_ago.show(10, True)
# self.df_st_detail_3_week_ago.show(10, True)
self.df_st_detail_week.unpersist()
def handle_st_flag(self):
......@@ -229,7 +233,7 @@ class DwtStDetailWeek(object):
'is_search_text', F.lit(1)
).select('search_term', 'is_search_text').cache()
print("热搜词如下:")
df_hot_search_term.show(10, True)
# df_hot_search_term.show(10, True)
# 上升词:本周环比上周排名增长50%的搜索词
df_rising_search_term = self.df_st_detail.join(
......@@ -238,7 +242,7 @@ class DwtStDetailWeek(object):
"is_ascending_text", (((F.col('rank_last_week') - F.col('rank')) / F.col('rank_last_week')) >= 0.5).cast('int')
).select('search_term', 'is_ascending_text').cache()
print("上升词如下:")
df_rising_search_term.show(10, True)
# df_rising_search_term.show(10, True)
# 新增词:本周环比上周新出现的搜索词
df_first_search_term = self.df_st_detail.join(
......@@ -247,7 +251,7 @@ class DwtStDetailWeek(object):
'is_first_text', F.lit(1)
).select('search_term', 'is_first_text').cache()
print("新增词如下:")
df_first_search_term.show(10, True)
# df_first_search_term.show(10, True)
# 高回报词:最近4周都出现且点击占比(总)>转化占比(总)
df_high_return_search_term = self.df_st_detail_last_4_week.groupBy(['search_term', 'date_info']).agg(
......@@ -263,7 +267,7 @@ class DwtStDetailWeek(object):
'is_high_return_text', F.lit(1)
).select('search_term', 'is_high_return_text').cache()
print("高回报词如下:")
df_high_return_search_term.show(10, True)
# df_high_return_search_term.show(10, True)
self.df_st_detail = self.df_st_detail.join(
df_hot_search_term, 'search_term', 'left'
......
......@@ -107,7 +107,7 @@ class EsAsinProfitRate(object):
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "40",
"es.batch.write.concurrency": "10",
"es.write.operation": "index"
}
......@@ -215,7 +215,7 @@ class EsAsinProfitRate(object):
print(f"{'='*60}")
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
self.df_asin_profit_rate.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**self.es_profit_rate_options) \
.mode("append") \
.save()
......@@ -315,8 +315,8 @@ class EsAsinProfitRate(object):
"es.batch.write.abort.on.failure": "false",
"es.update.retry.on.conflict": "3",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "5000",
"es.batch.write.concurrency": "20",
"es.batch.size.entries": "2000",
"es.batch.write.concurrency": "10",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "false"
......@@ -452,21 +452,19 @@ class EsAsinProfitRate(object):
# 8. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# inner join df_es 确保只更新索引中已存在的 asin,避免 doc missing 报错
# asin_source_flag:Hive 存为字符串,ES mapping 为 integer[],需转为 int 数组
if base_date is None:
print(f"[30day] 开始更新 cate_flag 字段:asin_source_flag / bsr / nsr")
df_cate_update = self.df_cate_flag.join(
df_es.select('asin'), on='asin', how='inner'
).select(
'asin', 'asin_source_flag',
'bsr_last_seen_at', 'bsr_seen_count_30d',
'nsr_last_seen_at', 'nsr_seen_count_30d'
).na.fill({
'asin_source_flag': '0',
'bsr_last_seen_at': '1970-01-01',
'bsr_seen_count_30d': 0,
'nsr_last_seen_at': '1970-01-01',
'nsr_seen_count_30d': 0
})
'asin',
F.transform(F.split(F.col('asin_source_flag'), ','), lambda x: x.cast('int')).alias('asin_source_flag'),
F.coalesce(F.col('bsr_last_seen_at'), F.lit('1970-01-01')).alias('bsr_last_seen_at'),
F.coalesce(F.col('bsr_seen_count_30d').cast('int'), F.lit(0)).alias('bsr_seen_count_30d'),
F.coalesce(F.col('nsr_last_seen_at'), F.lit('1970-01-01')).alias('nsr_last_seen_at'),
F.coalesce(F.col('nsr_seen_count_30d').cast('int'), F.lit(0)).alias('nsr_seen_count_30d')
)
self.write_combined_update(df_cate_update, index_name)
df_es.unpersist()
......
......@@ -101,7 +101,7 @@ class ExportAsinWithoutKeepa(object):
print("1. [month_week] 读取 dim_asin_detail")
sql = f"""
select asin, asin_price, asin_bought_month,
asin_is_self,
asin_is_self, asin_category_desc,
category_id as top_category_id,
category_first_id as top_category_first_id
from dim_asin_detail
......@@ -137,8 +137,20 @@ class ExportAsinWithoutKeepa(object):
"""
df_measure = self.spark.sql(sql).repartition(40, 'asin')
# ④ us_bs_category_hide → 隐藏分类(用于 asin_type 计算)
print("4. 读取 us_bs_category_hide (隐藏分类)")
# ④ dim_bsr_category_tree → desc_category_first_id(用于 asin_is_need 双重校验)
print("4. 读取 dim_bsr_category_tree (分类名称→ID 映射)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name,
category_first_id as desc_category_first_id
from dim_bsr_category_tree
where site_name = '{self.site_name}'
and category_parent_id = 0
and leaf_node = 2
"""
df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
# ⑤ us_bs_category_hide → 隐藏分类(用于 asin_type 计算)
print("5. 读取 us_bs_category_hide (隐藏分类)")
mysql_con = DBUtil.get_connection_info("mysql", self.site_name)
sql = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"
df_hide = SparkUtil.read_jdbc_query(
......@@ -146,8 +158,8 @@ class ExportAsinWithoutKeepa(object):
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql
)
# 组装
print("5. 组装主DataFrame")
# 组装
print("6. 组装主DataFrame")
df = df_dim \
.join(df_bsr, on='asin', how='left') \
.join(df_measure, on='asin', how='left')
......@@ -169,7 +181,15 @@ class ExportAsinWithoutKeepa(object):
).drop("asin_amazon_orders")
# asin_type 计算(对齐 dwt.handle_asin_is_hide)
# desc_category_first_name:解析 asin_category_desc,取 › 分隔的第一段
df = df.withColumn(
"desc_category_first_name",
F.lower(F.trim(F.split(F.col("asin_category_desc"), "›").getItem(0)))
).join(df_bsr_category, on='desc_category_first_name', how='left') \
.drop("desc_category_first_name", "asin_category_desc")
df = df.join(F.broadcast(df_hide), on='category_id', how='left')
need_categories = NEED_FILTER_CATEGORIES
df = df.withColumn(
"asin_is_hide",
F.expr(f"""
......@@ -181,11 +201,13 @@ class ExportAsinWithoutKeepa(object):
).withColumn(
"asin_is_need",
F.expr(f"""
CASE WHEN category_first_id IN {NEED_FILTER_CATEGORIES} THEN 1
CASE WHEN category_first_id IN {need_categories}
AND desc_category_first_id IN {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END
""")
).withColumn(
).drop("desc_category_first_id") \
.withColumn(
"asin_type",
F.expr("""
CASE WHEN asin_is_self = 1 THEN 1
......@@ -207,7 +229,7 @@ class ExportAsinWithoutKeepa(object):
# month_week:字段在 Python 中计算,需在此处做条件过滤
# month:SQL 中已完成过滤,直接跳过此步
if self.date_type == 'month_week':
print("6. [month_week] 筛选目标ASIN")
print("7. [month_week] 筛选目标ASIN")
df = df.filter(
F.col("asin_type").isin(0, 1, 3)
).filter(
......@@ -223,7 +245,7 @@ class ExportAsinWithoutKeepa(object):
# 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN
# 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取)
print("7. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
print("8. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(f"""
select asin from dim_keepa_asin_info
where site_name = '{self.site_name}'
......@@ -236,7 +258,7 @@ class ExportAsinWithoutKeepa(object):
print(f"排除keepa后数据量: {df.count()}")
# 排除 {pg_table} 中已导出的ASIN
print(f"8. 排除已导出的ASIN ({self.pg_table})")
print(f"9. 排除已导出的ASIN ({self.pg_table})")
pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name)
df_exported = SparkUtil.read_jdbc_query(
session=self.spark,
......@@ -258,7 +280,7 @@ class ExportAsinWithoutKeepa(object):
# ------------------------------------------------------------------ #
def save_data(self):
total = self.df_save.count()
print(f"9. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
print(f"10. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write.format("jdbc") \
.option("url", con_info["url"]) \
......
......@@ -14,7 +14,7 @@ if __name__ == '__main__':
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='chenyuanjie', priority=1, export_tools_type=1, belonging_to_process='ABA周增长'
principal='hejiangming', priority=1, export_tools_type=1, belonging_to_process='ABA周增长'
)
db_type = DbTypes.postgresql_cluster.name
......@@ -103,7 +103,7 @@ if __name__ == '__main__':
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
},num_mappers=2
)
client = SSHUtil.get_ssh_client()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment