Commit b1363229 by chenyuanjie

关联流量-新增捆绑销售字段 bundle_asin_component_json

parent b4062a54
......@@ -40,6 +40,7 @@ class DimAsinRelatedTraffic(object):
self.df_sp_delivery_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
self.df_compare_similar_asin_json = self.spark.sql(f"select 1+1;")
self.df_bundles_this_asins_json = self.spark.sql(f"select 1+1;")
self.df_bundle_asin_component_json = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_updated_at = self.spark.sql(f"select 1+1;")
......@@ -158,6 +159,7 @@ class DimAsinRelatedTraffic(object):
compare_similar_asin_json,
result_list_json,
bundles_this_asins_json,
bundle_asin_component_json,
updated_at
from ods_asin_detail
where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null;
......@@ -175,6 +177,7 @@ class DimAsinRelatedTraffic(object):
compare_similar_asin_json,
result_list_json,
null as bundles_this_asins_json,
bundle_asin_component_json,
updated_at
from ods_self_asin_related_traffic where site_name='{self.site_name}' and asin is not null;
"""
......@@ -287,6 +290,19 @@ class DimAsinRelatedTraffic(object):
print("处理bundles_this_asins_json字段结果如下:")
self.df_bundles_this_asins_json.show(10, True)
# 处理bundle_asin_component_json字段
self.df_bundle_asin_component_json = self.df_asin_detail\
.select('asin', 'bundle_asin_component_json')\
.filter(F.col('bundle_asin_component_json').isNotNull())
self.df_bundle_asin_component_json = self.other_json_handle(
df=self.df_bundle_asin_component_json,
json_column='bundle_asin_component_json',
asin_key='bundle_component_asin',
output_column='bundle_bought'
).cache()
print("处理bundle_asin_component_json字段结果如下:")
self.df_bundle_asin_component_json.show(10, True)
# 处理together_asin字段
self.df_together_asin = self.df_asin_detail.select('asin', 'together_asin').filter(
F.col('together_asin').isNotNull()
......@@ -300,7 +316,7 @@ class DimAsinRelatedTraffic(object):
def handle_merge_df(self):
all_merge_df = [self.df_together_asin, self.df_sp_initial_seen_asins_json,
self.df_sp_4stars_initial_seen_asins_json, self.df_sp_delivery_initial_seen_asins_json,
self.df_compare_similar_asin_json, self.df_bundles_this_asins_json]
self.df_compare_similar_asin_json, self.df_bundles_this_asins_json, self.df_bundle_asin_component_json]
main_df = self.df_result_list_json
for df in all_merge_df:
for col in set(df.columns) - {"asin"}:
......@@ -333,6 +349,7 @@ class DimAsinRelatedTraffic(object):
self.df_sp_delivery_initial_seen_asins_json.unpersist()
self.df_compare_similar_asin_json.unpersist()
self.df_bundles_this_asins_json.unpersist()
self.df_bundle_asin_component_json.unpersist()
# 数据落盘
def save_data(self):
......
......@@ -87,7 +87,7 @@ if __name__ == '__main__':
new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \
"lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \
"variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url, result_list_json"
"variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json"
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
......
......@@ -29,7 +29,8 @@ if __name__ == '__main__':
sp_delivery_initial_seen_asins_json,
compare_similar_asin_json,
result_list_json,
updated_at
updated_at,
bundle_asin_component_json
from {import_table}
where site = '{site_name}'
and DATE(updated_at) >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment