Commit a063aaef by chenyuanjie

利润率更新抓取时间

parent ab236510
......@@ -53,6 +53,10 @@ class EsAsinProfitRate(object):
"update_time": {
"type": "date",
"format": "yyyy-MM-dd"
},
"asin_crawl_date": {
"type": "date",
"format": "yyyy-MM-dd"
}
}
}
......@@ -76,10 +80,11 @@ class EsAsinProfitRate(object):
}
def run(self):
self.save_profit_rate_add()
self.read_profit_rate_add()
self.update_history_index()
self.save_profit_rate_to_es() # 遍历完成后统一更新利润率索引
def save_profit_rate_add(self):
def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据)
sql = f"""
select asin, price, ocean_profit, air_profit, updated_time from dim_asin_profit_rate_info
......@@ -94,13 +99,22 @@ class EsAsinProfitRate(object):
F.col("updated_time").isNotNull(),
F.substring(F.col("updated_time"), 1, 10)
).otherwise(F.lit("1970-01-01"))
).withColumn(
"asin_crawl_date", F.lit(None).cast("string") # 初始化为 null
).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date'
).cache()
print(f"增量利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
print(f"创建利润率索引:{self.es_profit_rate_index}!")
def save_profit_rate_to_es(self):
"""遍历完成后,统一更新利润率索引"""
print(f"\n{'='*60}")
print(f"开始更新利润率索引:{self.es_profit_rate_index}")
print(f"{'='*60}")
print(f"最终利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
......@@ -145,10 +159,10 @@ class EsAsinProfitRate(object):
current_date = datetime(year, month + 1, 1)
def update_single_history_index(self, index_name, month_str):
"""更新单个历史索引"""
"""更新单个历史索引,同时更新 asin_crawl_date"""
hive_sql = f"""
select asin, asin_price as price from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month'
and date_info = '{month_str}' and asin_price is not null
SELECT asin, asin_price as price, asin_crawl_date FROM dwt_flow_asin
WHERE site_name = '{self.site_name}' AND date_type = 'month' AND date_info = '{month_str}' AND asin_price IS NOT NULL
"""
df_hive = self.spark.sql(hive_sql)
......@@ -162,6 +176,16 @@ class EsAsinProfitRate(object):
)
).select("asin", "profit_rate_extra")
# 更新 df_asin_profit_rate 的 asin_crawl_date 字段(用当前分区的值覆盖)
df_crawl_date = df_hive.select(
'asin', 'price', F.substring(F.col('asin_crawl_date'), 1, 10).alias('new_crawl_date')
)
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
df_crawl_date, on=['asin', 'price'], how='left'
).withColumn(
"asin_crawl_date", F.coalesce(F.col("new_crawl_date"), F.col("asin_crawl_date"))
).drop("new_crawl_date").cache()
es_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment