Commit d32c0830 by chenyuanjie

fix

parent 3a0decd0
......@@ -8,7 +8,7 @@ import pandas as pd
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.db_util import DbTypes, DBUtil
from pyspark.sql import functions as F
class EsStDetail(TemplatesMysql):
......@@ -51,7 +51,7 @@ class EsStDetail(TemplatesMysql):
# 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
self.pipeline_id = "user_asin_mask_enrich_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.es_body = EsUtils.get_es_body()
......@@ -97,12 +97,17 @@ class EsStDetail(TemplatesMysql):
current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail,
title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe, asin_fbm_price as fbm_price
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe, asin_fbm_price as fbm_price,
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print("sql:", sql)
self.df_synchronize = self.spark.sql(sqlQuery=sql)
self.df_synchronize = self.df_synchronize.repartition(40).cache()
self.df_synchronize = self.df_synchronize.repartition(40).withColumn(
"img_type", F.split(F.col("img_type"), ",")
).withColumn(
"img_type", F.expr("transform(img_type, x -> cast(x as int))")
).cache()
self.df_synchronize.show(10, truncate=False)
# 同步数据前的准备工作
......@@ -142,12 +147,12 @@ class EsStDetail(TemplatesMysql):
VALUES ('{self.site_name}', '{self.cur_date}', '流量选品计算完毕', 14, '{self.record_table_name_field}', '{self.record_type}', '流量选品', '是', '流量选品计算完毕', 'elasticsearch')
"""
DBUtil.exec_sql('mysql', 'us', replace_sql)
if EsUtils.exist_index_alias(self.alias_name, self.client):
print("切换最近三十天索引别名链接到月数据")
EsUtils.delete_index_alias(self.alias_name, self.client)
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
# if EsUtils.exist_index_alias(self.alias_name, self.client):
# print("切换最近三十天索引别名链接到月数据")
# EsUtils.delete_index_alias(self.alias_name, self.client)
# EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
# else:
# EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
pass
......
......@@ -473,6 +473,18 @@ class EsUtils(object):
},
"describe_len": {
"type": "integer"
},
"bought_month_mom": {
"type": "float"
},
"bought_month_yoy": {
"type": "float"
},
"tracking_since": {
"type": "date"
},
"tracking_since_type": {
"type": "short"
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment