Commit 1a13d614 by chenyuanjie

ASIN信息库更新月数据至ES

parent c59539d4
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from utils.db_util import DBUtil
from datetime import datetime, timedelta
from pyspark.sql import functions as F
class EsAiAsinAdd(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
if self.site_name == 'us':
self.pg_tb = "ai_asin_analyze_detail"
else:
self.pg_tb = f"{self.site_name}_ai_asin_analyze_detail"
launch_time_base_date = self.spark.sql(
f"""SELECT max(`date`) AS last_day FROM dim_date_20_to_30 WHERE year_month = '{self.date_info}'"""
).collect()[0]['last_day']
self.launch_time_interval_dict = self.get_launch_time_interval_dict(launch_time_base_date)
self.es_client = EsUtils.get_es_client()
self.es_index = f"{self.site_name}_ai_asin_analyze_detail_{self.date_info.replace('-', '_')}"
self.es_pipeline = f"{self.site_name}_ai_analyze_pipeline"
self.es_options = self.get_es_options(self.es_index, self.es_pipeline)
self.df_ai_asin_detail = self.spark.sql(f"select 1+1;")
self.df_ai_asin_analyze = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
@staticmethod
def get_launch_time_interval_dict(base_date):
base_date = datetime.strptime(base_date, '%Y-%m-%d')
return {
"one_month": (base_date + timedelta(days=-30)).strftime('%Y-%m-%d'),
"three_month": (base_date + timedelta(days=-90)).strftime('%Y-%m-%d'),
"six_month": (base_date + timedelta(days=-180)).strftime('%Y-%m-%d'),
"twelve_month": (base_date + timedelta(days=-360)).strftime('%Y-%m-%d'),
"twenty_four_month": (base_date + timedelta(days=-720)).strftime('%Y-%m-%d'),
"thirty_six_month": (base_date + timedelta(days=-1080)).strftime('%Y-%m-%d')
}
@staticmethod
def get_es_options(index_name, pipeline_id):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "40",
"es.write.operation": "index",
"es.ingest.pipeline": f"{pipeline_id}"
}
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 读取asin信息库月数据
sql1 = f"""
select
site_name,
asin,
weight,
bought_month,
category,
img,
title,
brand,
account_name,
account_addr,
buy_box_seller_type,
launch_time,
img_num,
variation_flag,
variation_num,
ao_val,
category_id,
category_current_id,
parent_asin,
bsr_rank,
price,
rating,
total_comments,
seller_id,
fb_country_name,
bought_month_mom,
bought_month_yoy,
is_new_flag,
is_ascending_flag
from dwt_ai_asin_add
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_ai_asin_detail = self.spark.sql(sqlQuery=sql1).repartition(40, 'asin').cache()
print("ASIN信息库数据如下:")
self.df_ai_asin_detail.show(10, True)
# 读取Ai分析结果
sql2 = f"""
select
asin,
id as analyze_id,
package_quantity,
material,
color,
appearance,
size,
function,
shape,
scene_title,
scene_comment,
uses,
theme,
crowd,
short_desc,
title_pic_flag,
title_word_flag,
title_pic_content,
title_word_content,
array_to_string(package_quantity_arr, ',') as package_quantity_arr,
package_quantity_flag,
label_content
from {self.pg_tb}
"""
conn_info = DBUtil.get_connection_info("postgresql", "us")
self.df_ai_asin_analyze = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql2
).withColumn(
'package_quantity_arr', F.split(F.col('package_quantity_arr'), ',')
).withColumn(
'package_quantity_arr', F.expr('transform(package_quantity_arr, x -> cast(x as int))')
).repartition(40, 'asin').cache()
print("AI分析数据如下:")
self.df_ai_asin_analyze.show(10, True)
def handle_data(self):
# 补充launch_time_type字段
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN launch_time >= '{one_month}' THEN 1
WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2
WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3
WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4
WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5
WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6
WHEN launch_time < '{thirty_six_month}' THEN 7
ELSE 0 END
"""
self.df_ai_asin_detail = self.df_ai_asin_detail.withColumn('launch_time_type', F.expr(expr_str))
def save_data(self):
self.df_save = self.df_ai_asin_detail.join(
self.df_ai_asin_analyze, 'asin', 'inner'
).select(
'account_addr',
'account_name',
'analyze_id',
'ao_val',
'appearance',
'asin',
'bought_month',
'bought_month_mom',
'bought_month_yoy',
'brand',
'bsr_rank',
'buy_box_seller_type',
'category',
'category_current_id',
'category_id',
'color',
'crowd',
'fb_country_name',
'function',
'img',
'img_num',
'is_ascending_flag',
'is_new_flag',
'label_content',
'launch_time',
'launch_time_type',
'material',
'package_quantity',
'package_quantity_arr',
'package_quantity_flag',
'parent_asin',
'price',
'rating',
'scene_comment',
'scene_title',
'seller_id',
'shape',
'short_desc',
'site_name',
'size',
'theme',
'title',
'title_pic_content',
'title_pic_flag',
'title_word_content',
'title_word_flag',
'total_comments',
'uses',
'variation_flag',
'variation_num',
'weight'
).cache()
try:
self.df_save.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES {self.es_index} 索引更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_index}')
if __name__ == "__main__":
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
print("开始执行时间:", datetime.now().strftime("%Y-%m-%d %H:%M"))
handle_obj = EsAiAsinAdd(site_name, date_type, date_info)
handle_obj.run()
print("执行结束时间:", datetime.now().strftime("%Y-%m-%d %H:%M"))
print("success!!!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment