Commit b0597344 by chenyuanjie

asin历史月数据上架时间fix

parent 971fa3ce
......@@ -2,27 +2,26 @@
@Author : HuangJian
@Description : 上架日期补充表
@SourceTable :
①dim_asin_launchtime_info
@SinkTable : dim_asin_label
① ods_asin_detail / ods_self_asin_detail(当期抓取上架时间)
② dim_keepa_asin_info(us) / ods_asin_keep_date(其他站点)(keepa上架时间兜底)
③ dim_asin_launchtime_info(历史数据)
@SinkTable : dim_asin_launchtime_info
@CreateTime : 2023/12/12 11:20
@UpdateTime : 2022/12/12 11:20
@UpdateTime : 2026/06/12
"""
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType,DoubleType
from pyspark.sql.types import StringType
from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF
from utils.redis_utils import RedisUtils
class DimAsinLaunchtimeInfo(object):
def __init__(self, site_name, date_type, date_info):
......@@ -32,63 +31,30 @@ class DimAsinLaunchtimeInfo(object):
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_table = "dim_asin_launchtime_info"
self.partition_dict = {
"site_name": site_name
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict)
self.partition_dict = {"site_name": site_name}
self.partitions_num = CommonUtil.reset_partitions(site_name, 50)
self.spark.udf.register('u_handle_string_num_value', NullUDF, StringType())
# 自定义全局df
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_history_launchtime = self.spark.sql(f"select 1+1;")
self.df_asin_keepa_date = self.spark.sql(f"select 1+1;")
self.df_asin_handle_launchtime = self.spark.sql(f"select 1+1;")
# udf函数
self.handle_string_num_value = F.udf(NullUDF, StringType())
self.spark.udf.register('u_handle_string_num_value', NullUDF,
StringType())
def read_data(self):
# 读取asin_detail数据
def _read_current_asin(self):
"""Step1: 读取当期 asin 及抓取上架时间"""
if self.date_type == 'all':
# 说明全局数据量执行
print("==================执行全量数据整合=================")
sql = f"""
with st_asin as(
select
asin,
select asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_asin_detail
where site_name = '{self.site_name}'
),
bsr_asin as(
select
asin,
from ods_asin_detail where site_name = '{self.site_name}'
union
select asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_self_asin_detail
where site_name = '{self.site_name}'
)
select asin, crawl_asin_launch_time,updated_time,period_label from st_asin
union
select asin, crawl_asin_launch_time,updated_time,period_label from bsr_asin
from ods_self_asin_detail where site_name = '{self.site_name}'
"""
print(sql)
self.df_asin_detail = self.spark.sql(sql)
else:
print("==================执行分区数据整合=================")
# 按分区检测是否有新增的asin
if self.date_type in (DateTypes.week.name,DateTypes.month.name,DateTypes.month_week.name, 'month_aba_me'):
# 取st维度的st下的asin数据
elif self.date_type in (DateTypes.week.name, DateTypes.month.name, DateTypes.month_week.name, 'month_aba_me'):
sql = f"""
select
asin,
select asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
......@@ -97,11 +63,9 @@ class DimAsinLaunchtimeInfo(object):
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
elif self.date_type in (DateTypes.day.name):
# 取bsr日维度的asin
elif self.date_type == DateTypes.day.name:
sql = f"""
select
asin,
select asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
......@@ -111,18 +75,32 @@ class DimAsinLaunchtimeInfo(object):
and date_info = '{self.date_info}'
"""
print(sql)
# 如果是分区执行可进行缓存数据
self.df_asin_detail = self.spark.sql(sql).cache()
return self.spark.sql(sql).cache()
def _read_history(self):
"""Step2(辅助): 读取历史已整合上架日期"""
sql = f"""
select asin,
crawl_asin_launch_time,
'9999-12-31 23:59:59' as updated_time,
appear_period_label as period_label
from dim_asin_launchtime_info
where site_name='{self.site_name}'
"""
print(sql)
return self.spark.sql(sql)
# 读取keepa数据
def _read_keepa(self):
"""Step3(辅助): 读取 keepa 上架时间"""
if self.site_name == 'us':
sql = f"""select asin,
launch_time as keepa_asin_launch_time,
updated_time as updated_time,
keepa_launch_time as keepa_asin_launch_time,
updated_time,
1 as keepa_crawl_flag
from ods_asin_keepa_date_tmp
where site_name='{self.site_name}'
and state = 1 """
from dim_keepa_asin_info
where site_name='{self.site_name}'"""
print(sql)
return self.spark.sql(sql).cache()
else:
sql = f"""select asin,
launch_time as keepa_asin_launch_time,
......@@ -130,99 +108,73 @@ class DimAsinLaunchtimeInfo(object):
1 as keepa_crawl_flag
from ods_asin_keep_date
where site_name='{self.site_name}'
and state = 3 """
print(sql)
self.df_asin_keepa_date = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_keepa_date = self.df_asin_keepa_date.orderBy(self.df_asin_keepa_date.updated_time.desc_nulls_last())
self.df_asin_keepa_date = self.df_asin_keepa_date.drop_duplicates(['asin'])
# 读取历史已经整合好的上架日期数据
sql = f"""
select asin,
crawl_asin_launch_time,
'9999-12-31 23:59:59' as updated_time,
appear_period_label as period_label
from dim_asin_launchtime_info
where site_name='{self.site_name}'
"""
and state = 3"""
print(sql)
self.df_history_launchtime = self.spark.sql(sqlQuery=sql)
w = Window.partitionBy('asin').orderBy(F.col('updated_time').desc_nulls_last())
return (
self.spark.sql(sql)
.withColumn('_rn', F.row_number().over(w))
.filter(F.col('_rn') == 1)
.drop('_rn')
.cache()
)
def run(self):
# Step1: 当期 asin 抓取上架时间
df_current = self._read_current_asin()
def handle_launchtime_data(self):
# Step2: 与历史合并去重(all 模式已含全量,无需合并历史)
if self.date_type != 'all':
# 如果不是走all逻辑,则说明走日常调度逻辑;可以将我们已有的历史上架日期放入逻辑中去重
self.df_asin_detail = self.df_asin_detail.unionByName(self.df_history_launchtime).cache()
df_all_asin = self.df_asin_detail.orderBy(self.df_asin_detail.updated_time.desc_nulls_last())
# 去重,保留所有的asin
df_all_asin = df_all_asin.drop_duplicates(['asin'])
df_all_asin = df_all_asin.select(
F.col("asin"),F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_left"), 'period_label')
df_current = df_current.unionByName(self._read_history()).cache()
# 过滤找出抓取的上架日期不为空的数据
df_not_null_asin = self.df_asin_detail.filter(" crawl_asin_launch_time is not null")
df_not_null_asin = df_not_null_asin.orderBy(df_not_null_asin.updated_time.desc_nulls_last())
df_not_null_asin = df_not_null_asin.drop_duplicates(['asin'])
df_not_null_asin = df_not_null_asin.select(
F.col("asin"),
F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_right"),
F.col("period_label").alias("period_label_right")
# 去重:优先保留非空 crawl_asin_launch_time,再取最新 updated_time
w = Window.partitionBy('asin').orderBy(
F.col('crawl_asin_launch_time').isNull().cast('int').asc(),
F.col('updated_time').desc_nulls_last()
)
self.df_asin_handle_launchtime = df_all_asin.join(
df_not_null_asin, on='asin', how='left'
df_merged = (
df_current
.withColumn('_rn', F.row_number().over(w))
.filter(F.col('_rn') == 1)
.drop('_rn')
)
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
F.col("asin"),
F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("crawl_asin_launch_time_right"))
.otherwise(F.col("crawl_asin_launch_time_left")).alias("crawl_asin_launch_time"),
F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("period_label_right"))
.otherwise(F.col("period_label")).alias("period_label")
# Step3: 关联 keepa,填充 crawl_asin_launch_time 为空的 asin
df_keepa = self._read_keepa()
df_result = df_merged.join(
df_keepa.select('asin', 'keepa_asin_launch_time', 'keepa_crawl_flag'),
on='asin', how='left'
)
# 跟keepa_date进行关联,补充launch_time数据
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.join(
self.df_asin_keepa_date, on='asin', how='left'
df_save = df_result.select(
F.col('asin'),
F.date_format(F.to_date(F.coalesce(F.col('crawl_asin_launch_time'), F.col('keepa_asin_launch_time'))), 'yyyy-MM-dd').alias('asin_launch_time'),
F.date_format(F.to_date(F.col('crawl_asin_launch_time')), 'yyyy-MM-dd').alias('crawl_asin_launch_time'),
F.date_format(F.to_date(F.col('keepa_asin_launch_time')), 'yyyy-MM-dd').alias('keepa_asin_launch_time'),
F.col('period_label').alias('appear_period_label'),
F.when(F.col('keepa_crawl_flag') == 1, F.lit(1)).otherwise(F.lit(0)).alias('keepa_crawl_flag'),
F.lit(self.site_name).alias('site_name')
).repartition(self.partitions_num)
CommonUtil.save_or_update_table(
spark_session=self.spark,
hive_tb_name=self.hive_table,
partition_dict=self.partition_dict,
df_save=df_save,
drop_exist_tmp_flag=False
)
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
F.col("asin"),
F.when(F.col("crawl_asin_launch_time").isNull(), F.col("keepa_asin_launch_time"))
.otherwise(F.col("crawl_asin_launch_time")).alias("asin_launch_time"),
F.col("crawl_asin_launch_time"),
F.col("keepa_asin_launch_time"),
F.col("period_label").alias("appear_period_label"),
F.when(F.col("keepa_crawl_flag") == 1, F.lit(1)).otherwise(F.lit(0)).alias("keepa_crawl_flag"),
F.lit(self.site_name).alias("site_name")
)
def save_data(self):
# print(f"清除hdfs目录中:{self.hdfs_path}")
# HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = self.df_asin_handle_launchtime.repartition(self.partitions_num)
CommonUtil.save_or_update_table(spark_session=self.spark,hive_tb_name=self.hive_table,partition_dict=self.partition_dict,df_save=df_save,drop_exist_tmp_flag=False)
print("success")
def run(self):
self.read_data()
self.handle_launchtime_data()
self.save_data()
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
lock_name = "dim_asin_launchtime_info"
if date_type == "all":
# 如果执行数据为all的情况,非自然解锁情况,则需锁定该表2h
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=120 * 60, retry_flag=True, retry_time=10*60)
else:
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60)
expire = 120 * 60 if date_type == "all" else 30 * 60
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=expire, retry_flag=True, retry_time=10 * 60)
if lock_flag:
try:
obj = DimAsinLaunchtimeInfo(site_name, date_type, date_info)
obj.run()
DimAsinLaunchtimeInfo(site_name, date_type, date_info).run()
finally:
# 执行完成后释放锁
RedisUtils.release_redis_lock(lock_name)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment