Commit 281b9289 by chenyuanjie

asin信息库流程调整

parent d32c0830
...@@ -7,7 +7,7 @@ from utils.common_util import CommonUtil ...@@ -7,7 +7,7 @@ from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
from utils.es_util import EsUtils from utils.es_util import EsUtils
from utils.db_util import DBUtil from utils.db_util import DBUtil
from datetime import datetime, timedelta from datetime import datetime
from pyspark.sql import functions as F from pyspark.sql import functions as F
...@@ -19,15 +19,8 @@ class EsAiAsinAdd(object): ...@@ -19,15 +19,8 @@ class EsAiAsinAdd(object):
self.date_info = date_info self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}") self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
if self.site_name == 'us': self.pg_conn = DBUtil.get_connection_info("postgresql", "us")
self.pg_tb = "ai_asin_analyze_detail" self.export_pg_tb = f"{self.site_name}_ai_asin_detail_month_{self.date_info.replace('-', '_')}"
else:
self.pg_tb = f"{self.site_name}_ai_asin_analyze_detail"
launch_time_base_date = self.spark.sql(
f"""SELECT max(`date`) AS last_day FROM dim_date_20_to_30 WHERE year_month = '{self.date_info}'"""
).collect()[0]['last_day']
self.launch_time_interval_dict = self.get_launch_time_interval_dict(launch_time_base_date)
self.es_client = EsUtils.get_es_client() self.es_client = EsUtils.get_es_client()
self.es_index = f"{self.site_name}_ai_asin_analyze_detail_{self.date_info.replace('-', '_')}" self.es_index = f"{self.site_name}_ai_asin_analyze_detail_{self.date_info.replace('-', '_')}"
...@@ -36,19 +29,8 @@ class EsAiAsinAdd(object): ...@@ -36,19 +29,8 @@ class EsAiAsinAdd(object):
self.df_ai_asin_detail = self.spark.sql(f"select 1+1;") self.df_ai_asin_detail = self.spark.sql(f"select 1+1;")
self.df_ai_asin_analyze = self.spark.sql(f"select 1+1;") self.df_ai_asin_analyze = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;") self.df_save_pg = self.spark.sql(f"select 1+1;")
self.df_save_es = self.spark.sql(f"select 1+1;")
@staticmethod
def get_launch_time_interval_dict(base_date):
base_date = datetime.strptime(base_date, '%Y-%m-%d')
return {
"one_month": (base_date + timedelta(days=-30)).strftime('%Y-%m-%d'),
"three_month": (base_date + timedelta(days=-90)).strftime('%Y-%m-%d'),
"six_month": (base_date + timedelta(days=-180)).strftime('%Y-%m-%d'),
"twelve_month": (base_date + timedelta(days=-360)).strftime('%Y-%m-%d'),
"twenty_four_month": (base_date + timedelta(days=-720)).strftime('%Y-%m-%d'),
"thirty_six_month": (base_date + timedelta(days=-1080)).strftime('%Y-%m-%d')
}
@staticmethod @staticmethod
def get_es_options(index_name, pipeline_id): def get_es_options(index_name, pipeline_id):
...@@ -105,7 +87,12 @@ class EsAiAsinAdd(object): ...@@ -105,7 +87,12 @@ class EsAiAsinAdd(object):
bought_month_mom, bought_month_mom,
bought_month_yoy, bought_month_yoy,
is_new_flag, is_new_flag,
is_ascending_flag is_ascending_flag,
review_json_list,
launch_time_type,
describe,
product_json,
product_detail_json
from dwt_ai_asin_add from dwt_ai_asin_add
where site_name = '{self.site_name}' where site_name = '{self.site_name}'
and date_type = '{self.date_type}' and date_type = '{self.date_type}'
...@@ -140,14 +127,13 @@ class EsAiAsinAdd(object): ...@@ -140,14 +127,13 @@ class EsAiAsinAdd(object):
array_to_string(package_quantity_arr, ',') as package_quantity_arr, array_to_string(package_quantity_arr, ',') as package_quantity_arr,
package_quantity_flag, package_quantity_flag,
label_content label_content
from {self.pg_tb} from {self.site_name}_ai_asin_analyze_detail
""" """
conn_info = DBUtil.get_connection_info("postgresql", "us")
self.df_ai_asin_analyze = SparkUtil.read_jdbc_query( self.df_ai_asin_analyze = SparkUtil.read_jdbc_query(
session=self.spark, session=self.spark,
url=conn_info["url"], url=self.pg_conn["url"],
pwd=conn_info["pwd"], pwd=self.pg_conn["pwd"],
username=conn_info["username"], username=self.pg_conn["username"],
query=sql2 query=sql2
).withColumn( ).withColumn(
'package_quantity_arr', F.split(F.col('package_quantity_arr'), ',') 'package_quantity_arr', F.split(F.col('package_quantity_arr'), ',')
...@@ -158,83 +144,48 @@ class EsAiAsinAdd(object): ...@@ -158,83 +144,48 @@ class EsAiAsinAdd(object):
self.df_ai_asin_analyze.show(10, True) self.df_ai_asin_analyze.show(10, True)
def handle_data(self): def handle_data(self):
# 补充launch_time_type字段 self.df_save_pg = self.df_ai_asin_detail.join(
one_month = self.launch_time_interval_dict['one_month'] self.df_ai_asin_analyze, 'asin', 'left_anti'
three_month = self.launch_time_interval_dict['three_month'] ).select(
six_month = self.launch_time_interval_dict['six_month'] 'site_name', 'asin', 'weight', 'bought_month', 'category', 'img', 'title', 'brand', 'account_name',
twelve_month = self.launch_time_interval_dict['twelve_month'] 'account_addr', 'buy_box_seller_type', 'launch_time', 'img_num', 'variation_flag', 'variation_num',
twenty_four_month = self.launch_time_interval_dict['twenty_four_month'] 'ao_val', 'category_id', 'category_current_id', 'parent_asin', 'bsr_rank', 'price', 'rating',
thirty_six_month = self.launch_time_interval_dict['thirty_six_month'] 'total_comments', 'seller_id', 'fb_country_name', 'review_json_list', 'launch_time_type', 'describe',
expr_str = f""" 'product_json', 'product_detail_json', 'bought_month_mom', 'bought_month_yoy', 'is_new_flag',
CASE WHEN launch_time >= '{one_month}' THEN 1 'is_ascending_flag'
WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2 )
WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3
WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4 self.df_save_es = self.df_ai_asin_detail.join(
WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5
WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6
WHEN launch_time < '{thirty_six_month}' THEN 7
ELSE 0 END
"""
self.df_ai_asin_detail = self.df_ai_asin_detail.withColumn('launch_time_type', F.expr(expr_str))
def save_data(self):
self.df_save = self.df_ai_asin_detail.join(
self.df_ai_asin_analyze, 'asin', 'inner' self.df_ai_asin_analyze, 'asin', 'inner'
).select( ).select(
'account_addr', 'account_addr', 'account_name', 'analyze_id', 'ao_val', 'appearance', 'asin', 'bought_month',
'account_name', 'bought_month_mom', 'bought_month_yoy', 'brand', 'bsr_rank', 'buy_box_seller_type', 'category',
'analyze_id', 'category_current_id', 'category_id', 'color', 'crowd', 'fb_country_name', 'function', 'img',
'ao_val', 'img_num', 'is_ascending_flag', 'is_new_flag', 'label_content', 'launch_time', 'launch_time_type',
'appearance', 'material', 'package_quantity', 'package_quantity_arr', 'package_quantity_flag', 'parent_asin',
'asin', 'price', 'rating', 'scene_comment', 'scene_title', 'seller_id', 'shape', 'short_desc', 'site_name',
'bought_month', 'size', 'theme', 'title', 'title_pic_content', 'title_pic_flag', 'title_word_content',
'bought_month_mom', 'title_word_flag', 'total_comments', 'uses', 'variation_flag', 'variation_num', 'weight'
'bought_month_yoy', )
'brand',
'bsr_rank', def save_data(self):
'buy_box_seller_type', # 将新增asin导出给济苍
'category', try:
'category_current_id', self.df_save_pg.write.format("jdbc") \
'category_id', .option("url", self.pg_conn["url"]) \
'color', .option("dbtable", f"{self.export_pg_tb}") \
'crowd', .option("user", self.pg_conn["username"]) \
'fb_country_name', .option("password", self.pg_conn["pwd"]) \
'function', .mode("append") \
'img', .save()
'img_num', CommonUtil.send_wx_msg(['wujicang', 'chenyuanjie'], 'ASIN信息库增量数据导出', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}')
'is_ascending_flag', except Exception as e:
'is_new_flag', print("An error occurred while writing to Elasticsearch:", str(e))
'label_content', CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ASIN信息库增量数据导出失败', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}')
'launch_time',
'launch_time_type', # 将增量asin导出到es
'material',
'package_quantity',
'package_quantity_arr',
'package_quantity_flag',
'parent_asin',
'price',
'rating',
'scene_comment',
'scene_title',
'seller_id',
'shape',
'short_desc',
'site_name',
'size',
'theme',
'title',
'title_pic_content',
'title_pic_flag',
'title_word_content',
'title_word_flag',
'total_comments',
'uses',
'variation_flag',
'variation_num',
'weight'
).cache()
try: try:
self.df_save.write.format("org.elasticsearch.spark.sql") \ self.df_save_es.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \ .options(**self.es_options) \
.mode("append") \ .mode("append") \
.save() .save()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment