Commit 3dd677b9 by chenyuanjie

fix

parent a2c1c284
...@@ -52,7 +52,10 @@ class KafkaFlowAsinDetail(Templates): ...@@ -52,7 +52,10 @@ class KafkaFlowAsinDetail(Templates):
# 富集策略相关配置,用于更新 usr_mask_type 字段 # 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy" self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy" self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline" if self.site_name == 'us':
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
else:
self.pipeline_id = ""
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id) self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.db_save = 'kafka_flow_asin_detail' self.db_save = 'kafka_flow_asin_detail'
self.app_name = self.get_app_name() self.app_name = self.get_app_name()
...@@ -353,7 +356,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -353,7 +356,7 @@ class KafkaFlowAsinDetail(Templates):
WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END""" WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END"""
df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height") df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")
# 6.处理五点描述长度 # 6.处理五点描述长度
df = df.withColumn("describe_len", F.length(F.col("describe").replace("|-|", ""))) df = df.withColumn("describe_len", F.length(F.regexp_replace(F.col("describe"), "\\|-\\|", "")))
return df return df
# 7. 处理asin图片信息 # 7. 处理asin图片信息
...@@ -834,8 +837,11 @@ class KafkaFlowAsinDetail(Templates): ...@@ -834,8 +837,11 @@ class KafkaFlowAsinDetail(Templates):
EsUtils.create_index(self.es_index_name, self.client, self.es_index_body) EsUtils.create_index(self.es_index_name, self.client, self.es_index_body)
print("索引名称为:", self.es_index_name) print("索引名称为:", self.es_index_name)
# 执行富集策略 # 执行富集策略
self.client.enrich.execute_policy(name=self.policy_name1) if self.site_name == 'us':
self.client.enrich.execute_policy(name=self.policy_name2) self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2)
else:
pass
# EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2) # EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
# if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client): # if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client):
# EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) # EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
......
...@@ -51,7 +51,10 @@ class KafkaRankAsinDetail(Templates): ...@@ -51,7 +51,10 @@ class KafkaRankAsinDetail(Templates):
# 富集策略相关配置,用于更新 usr_mask_type 字段 # 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy" self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy" self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline" if self.site_name == 'us':
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
else:
self.pipeline_id = ""
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id) self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.db_save = 'kafka_rank_asin_detail' self.db_save = 'kafka_rank_asin_detail'
self.app_name = self.get_app_name() self.app_name = self.get_app_name()
...@@ -352,7 +355,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -352,7 +355,7 @@ class KafkaRankAsinDetail(Templates):
WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END""" WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END"""
df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height") df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")
# 6.处理五点描述长度 # 6.处理五点描述长度
df = df.withColumn("describe_len", F.length(F.col("describe").replace("|-|", ""))) df = df.withColumn("describe_len", F.length(F.regexp_replace(F.col("describe"), "\\|-\\|", "")))
return df return df
# 7. 处理asin图片信息 # 7. 处理asin图片信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment