Commit da83d3ac by chenyuanjie

keepa数据过滤脏数据

parent 25966c8b
...@@ -48,6 +48,8 @@ class DimKeepaAsinInfo(object): ...@@ -48,6 +48,8 @@ class DimKeepaAsinInfo(object):
F.get_json_object("last_detail", "$.listedSince").cast("int").alias("listed_since"), F.get_json_object("last_detail", "$.listedSince").cast("int").alias("listed_since"),
F.get_json_object("last_detail", "$.releaseDate").cast("int").alias("release_date"), F.get_json_object("last_detail", "$.releaseDate").cast("int").alias("release_date"),
F.get_json_object("last_detail", "$.trackingSince").cast("int").alias("tracking_since"), F.get_json_object("last_detail", "$.trackingSince").cast("int").alias("tracking_since"),
F.get_json_object("last_detail", "$.productType").cast("int").alias("product_type"),
F.get_json_object("last_detail", "$.title").alias("title"),
F.col('updated_time') F.col('updated_time')
).withColumn( ).withColumn(
'weight', F.greatest(F.col("package_weight"), F.col("item_weight")) 'weight', F.greatest(F.col("package_weight"), F.col("item_weight"))
...@@ -75,9 +77,12 @@ class DimKeepaAsinInfo(object): ...@@ -75,9 +77,12 @@ class DimKeepaAsinInfo(object):
F.col('updated_time'), F.col('updated_time'),
F.col('listed_since'), F.col('listed_since'),
F.col('release_date'), F.col('release_date'),
F.col('tracking_since') F.col('tracking_since'),
F.col('product_type'),
F.col('title')
).filter( ).filter(
(F.col("package_length").isNotNull()) & (F.col("package_width").isNotNull()) & (F.col("package_height").isNotNull()) & (F.col("weight").isNotNull()) # 过滤脏数据:productType in (3,4,5) 且 title 为空的异常数据不做保留
~(F.col("product_type").isin(3, 4, 5) & F.col("title").isNull())
).cache() ).cache()
self.df_to_doris = self.df_keepa_asin.select( self.df_to_doris = self.df_keepa_asin.select(
'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight', 'weight', 'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight', 'weight',
...@@ -86,7 +91,7 @@ class DimKeepaAsinInfo(object): ...@@ -86,7 +91,7 @@ class DimKeepaAsinInfo(object):
# 读取历史数据 # 读取历史数据
sql = f""" sql = f"""
select asin, package_length, package_width, package_height, package_weight, item_weight, weight, keepa_launch_time, updated_time, listed_since, release_date, tracking_since select asin, package_length, package_width, package_height, package_weight, item_weight, weight, keepa_launch_time, updated_time, listed_since, release_date, tracking_since, product_type, title
from dim_keepa_asin_info where site_name = '{self.site_name}'; from dim_keepa_asin_info where site_name = '{self.site_name}';
""" """
self.df_keepa_asin_history = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache() self.df_keepa_asin_history = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment