Commit d13750ba by chenyuanjie

流量选品-ASIN月销趋势图表优化

parent 5c1cd6b6
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.templates import Templates
from pyspark.sql import functions as F
class DwdAmazonReport(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2021-10'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwd_amazon_report'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.reset_partitions(partitions_num=5)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_buy_data = self.spark.sql(f"select 1+1;")
self.df_st_count = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def read_data(self):
# 读取asin的月销数据
sql1 = f"""
select
asin,
asin_amazon_orders as monthly_sales
from
dim_asin_amorders_info
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
print(sql1)
self.df_buy_data = self.spark.sql(sqlQuery=sql1).repartition(15, 'asin').cache()
self.df_buy_data.show(10, truncate=True)
sql2 = f"""
select
asin,
asin_zr_counts as zr_count,
asin_sp_counts as sp_count,
asin_st_counts as total_count
from
dwd_asin_measure
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
print(sql2)
self.df_st_count = self.spark.sql(sqlQuery=sql2).repartition(15, 'asin').cache()
self.df_st_count.show(10, truncate=True)
def handle_data(self):
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
self.df_save = self.df_buy_data.join(
self.df_st_count, on='asin', how='full'
)
columns = self.df_save.columns
for col_name in columns:
self.df_save = self.df_save.withColumn(
col_name, self.df_save[col_name].cast('string')
)
self.df_save = self.df_save.fillna('-1')
self.df_save = self.df_save.withColumn(
"weekly_sales", F.lit(None)
).withColumn(
"weekly_views", F.lit(None)
).withColumn(
"monthly_views", F.lit(None)
).withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_type", F.lit(self.date_type)
).withColumn(
"date_info", F.lit(self.date_info)
)
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
if (site_name in ['us', 'uk', 'de']) and (date_type == 'month') and (date_info >= '2024-04'):
handle_obj = DwdAmazonReport(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
else:
print("暂不计算该维度数据!")
quit()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
......@@ -17,67 +18,58 @@ class DwtAmazonReport(Templates):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_amazon_report'
self.date_info_pre = CommonUtil.get_month_offset(self.date_info, -1)
self.db_save = 'dwt_amazon_report'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.reset_partitions(partitions_num=120)
self.reset_partitions(partitions_num=200)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_dwd_new = self.spark.sql(f"select 1+1;")
self.df_dwd_old = self.spark.sql(f"select 1+1;")
self.df_asin_detail_new = self.spark.sql(f"select 1+1;")
self.df_asin_detail_old = self.spark.sql(f"select 1+1;")
self.df_joined = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def read_data(self):
# 从dwd层读取本月数据
# 读取流量选品本月月销数据
sql1 = f"""
select
asin,
monthly_sales as new_monthly_sales,
zr_count as new_zr_count,
sp_count as new_sp_count,
total_count as new_total_count,
asin,
asin_bought_month as new_monthly_sales,
asin_zr_counts as new_zr_count,
asin_sp_counts as new_sp_count,
asin_st_counts as new_total_count,
date_info as new_date_info_list
from
dwd_amazon_report
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
print(sql1)
self.df_dwd_new = self.spark.sql(sqlQuery=sql1).repartition(15, 'asin').cache()
self.df_dwd_new.show(10, truncate=True)
self.df_asin_detail_new = self.spark.sql(sqlQuery=sql1).repartition(15, 'asin').fillna('-1').cache()
self.df_asin_detail_new.show(10, truncate=True)
# 从dwt层读取上月数据
date_info_pre = CommonUtil.get_month_offset(self.date_info, -1)
# 读历史数据
sql2 = f"""
select
asin,
monthly_sales as old_monthly_sales,
zr_count as old_zr_count,
sp_count as old_sp_count,
total_count as old_total_count,
asin,
monthly_sales as old_monthly_sales,
zr_count as old_zr_count,
sp_count as old_sp_count,
total_count as old_total_count,
date_info_list as old_date_info_list
from
dwt_amazon_report
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{date_info_pre}';
from dwt_amazon_report
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info_pre}';
"""
print(sql2)
self.df_dwd_old = self.spark.sql(sqlQuery=sql2).repartition(15, 'asin').cache()
self.df_dwd_old.show(10, truncate=True)
self.df_asin_detail_old = self.spark.sql(sqlQuery=sql2).repartition(15, 'asin').cache()
self.df_asin_detail_old.show(10, truncate=True)
def handle_data(self):
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
# 关联后的列名
join_columns = ['monthly_sales', 'zr_count', 'sp_count', 'total_count', 'date_info_list']
# 获取历史df对象中,date_info的数量,用来确定关联不到的历史asin填充多少个 -1
old_date_info_first = self.df_dwd_old.select('old_date_info_list').distinct().first()
old_date_info_first = self.df_asin_detail_old.select('old_date_info_list').distinct().first()
if old_date_info_first is None:
old_date_info_list = None
old_date_info_list_len = 0
......@@ -88,9 +80,9 @@ class DwtAmazonReport(Templates):
# 本月数据如果关联不上,填充一个 -1
fillna_new = '-1'
# 关联df,并填充null值
self.df_joined = self.df_dwd_new.join(
self.df_dwd_old, on='asin', how='full'
)
self.df_joined = self.df_asin_detail_new.join(
self.df_asin_detail_old, on='asin', how='full'
).cache()
for col in join_columns:
self.df_joined = self.df_joined.fillna({'old_' + col: fillna_old})
self.df_joined = self.df_joined.fillna({'new_' + col: fillna_new})
......@@ -104,25 +96,17 @@ class DwtAmazonReport(Templates):
if old_date_info_first is None:
for col in join_columns:
self.df_joined = self.df_joined.withColumn(
col,
self.df_joined['new_' + col]
col, self.df_joined['new_' + col]
)
else:
for col in join_columns:
self.df_joined = self.df_joined.withColumn(
col,
concat_ws(',', self.df_joined['old_' + col], self.df_joined['new_' + col])
col, concat_ws(',', self.df_joined['old_' + col], self.df_joined['new_' + col])
)
# 选择需要的列
selected_columns = ['asin'] + join_columns
self.df_save = self.df_joined.select(selected_columns)
self.df_save = self.df_save.withColumn(
"weekly_sales", F.lit(None)
).withColumn(
"weekly_views", F.lit(None)
).withColumn(
"monthly_views", F.lit(None)
).withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_type", F.lit(self.date_type)
......@@ -130,14 +114,14 @@ class DwtAmazonReport(Templates):
"date_info", F.lit(self.date_info)
)
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
if (site_name in ['us', 'uk', 'de']) and (date_type == 'month') and (date_info >= '2024-04'):
handle_obj = DwtAmazonReport(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
else:
print("暂不计算该维度数据!")
quit()
handle_obj = DwtAmazonReport(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
......@@ -11,10 +11,6 @@ if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
if not ((site_name in ['us', 'uk', 'de']) and (date_type == 'month') and (date_info >= '2024-04')):
print("暂不计算该维度数据!")
quit()
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='chenyuanjie', priority=1, export_tools_type=1)
......@@ -33,7 +29,9 @@ if __name__ == '__main__':
(
like {master_tb} including ALL
);
SELECT create_distributed_table('{export_tb}', 'asin');
DO $$
DECLARE
index_record RECORD;
......@@ -45,8 +43,6 @@ if __name__ == '__main__':
END;
$$;
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
# 导出脚本
......@@ -57,10 +53,7 @@ if __name__ == '__main__':
export_tb=export_tb,
col=[
"asin",
"weekly_sales",
"weekly_views",
"monthly_sales",
"monthly_views",
"zr_count",
"sp_count",
"total_count",
......@@ -84,4 +77,3 @@ if __name__ == '__main__':
print("success")
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment