Commit 31c1321b by chenyuanjie

流量选品-更新hive中销量异常的数据

parent 7e007607
......@@ -2,32 +2,74 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
spark_session = SparkUtil.get_spark_session("rerun-dwt—flow-asin")
date_list = ["2024-02","2024-01","2023-12"]
for date_info in date_list:
startParams = {
"site_name": "us",
"date_type": "month",
"date_info": date_info
class RerunDemo(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{self.site_name}:{self.date_type}:{self.date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.db_save = "dwt_flow_asin"
self.partitions_num = 200
# 分区字段
self.partitions_by = ['site_name', 'date_type', 'date_info']
# 拼接where条件
self.partitions_dict = {
'site_name': self.site_name,
'date_type': self.date_type,
'date_info': self.date_info
}
print(startParams)
DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='rerun_dwt_flow_asin_api',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": date_info
}
def run(self):
sql = f"""
select * from {self.db_save} where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}';
"""
df_history_data = self.spark.sql(sql).repartition(self.partitions_num).cache()
print("重跑前历史数据如下:")
df_history_data.show(10, True)
sql = f"""
select asin, 1 as flag from {self.site_name}_asin_detail_2025_buysales_err where date_info = '{self.date_info}'
"""
pg_con_info = DBUtil.get_connection_info("postgresql_14", self.site_name)
df_asin = SparkUtil.read_jdbc_query(
session=self.spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query=sql
)
df_asin = df_asin.dropDuplicates(['asin']).cache()
print("爬虫表数据量为:", df_asin.count())
df_save = df_history_data.join(
df_asin, 'asin', 'left'
).withColumn(
'asin_bought_month', F.when(F.col('flag') == 1, F.lit(None)).otherwise(F.col('asin_bought_month'))
).drop('flag').cache()
CommonUtil.send_wx_msg(["chenyuanjie"], "【rerun_dwt_flow_asin_api】重跑完成", "")
if df_history_data.count() == df_save.count():
print(f"当前存储的表名为:{self.db_save}, 分区为{self.partitions_by}")
hdfs_path = CommonUtil.build_hdfs_path(self.db_save, self.partitions_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
df_save.repartition(self.partitions_num).write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by)
print("重跑成功!")
else:
print("重跑失败!")
exit()
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
obj = RerunDemo(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
if __name__ == '__main__':
# 获取入参
site_name = 'us'
date_type = 'month'
date_info = '2023-12'
db_type = 'postgresql_cluster'
engine = DBUtil.get_db_engine(db_type, site_name)
# 导出表--基准字段
export_base_cols = [
"search_term_id",
"date_info",
"search_term",
"bsr_orders",
"orders",
"top100_asin",
"top100_orders",
"top100_market_share",
"top100_is_new",
"aadd_bsr_orders",
"aadd_video_num",
"aadd_no_video_num",
"no_aadd_no_video_num",
"no_aadd_video_num",
"ao_range_val1",
"ao_range_val2",
"ao_range_val3",
"ao_range_val4",
"ao_range_val5",
"ao_range_val6",
"ao_range_val7",
"ao_range_val8",
"ao_range_val9",
"ao_range_val10",
"ao_range_val11",
"ao_range_val12",
"ao_range_market_share1",
"ao_range_market_share2",
"ao_range_market_share3",
"ao_range_market_share4",
"ao_range_market_share5",
"ao_range_market_share6",
"ao_range_market_share7",
"ao_range_market_share8",
"ao_range_market_share9",
"ao_range_market_share10",
"ao_range_market_share11",
"ao_range_market_share12",
"launch_time_num1",
"launch_time_num2",
"launch_time_num3",
"launch_time_num4",
"launch_time_num5",
"launch_time_num6",
"launch_time_num7",
"launch_time_num8",
"launch_time_market_share1",
"launch_time_market_share2",
"launch_time_market_share3",
"launch_time_market_share4",
"launch_time_market_share5",
"launch_time_market_share6",
"launch_time_market_share7",
"launch_time_market_share8",
"top20_asin",
"top20_orders",
"top20_brand",
"top20_brand_new_num_proportion",
"top20_brand_bsr_oders",
"top20_brand_market_share",
"comments_num1",
"comments_num2",
"comments_num3",
"comments_num4",
"comments_num5",
"comments_num6",
"comments_num7",
"comments_num8",
"comments_num9",
"comments_num10",
"comments_num11",
"comments_num12",
"comments_num13",
"comments_num_market_share1",
"comments_num_market_share2",
"comments_num_market_share3",
"comments_num_market_share4",
"comments_num_market_share5",
"comments_num_market_share6",
"comments_num_market_share7",
"comments_num_market_share8",
"comments_num_market_share9",
"comments_num_market_share10",
"comments_num_market_share11",
"comments_num_market_share12",
"comments_num_market_share13",
"buy_box_name",
"buy_box_num",
"seller_name",
"seller_num",
"seller_bsr_orders",
"color_name",
"color_num",
"total_asin_num",
"new_asin_num"
]
export_table = "us_aba_last_month_report_2023_12_copy"
export_cols = export_base_cols + [
"new_asin_bsr_orders",
"rating_section",
"rating_market_share_section",
"size_name",
"size_num",
"package_num_trend",
"package_num_trend_market_share",
"package_num_corresponding_asin",
"price_interval",
"price_interval_asin_count",
"price_interval_asin_market_share",
"color_bsr_orders_percent",
"size_bsr_orders_percent"
]
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_aba_st_analytics_report",
export_tb=export_table,
col=export_cols,
partition_dict=partition_dict
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment