Commit a9d0da5f by chenyuanjie

流量选品30day-增加删除时间字段

parent 728c54df
...@@ -986,6 +986,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -986,6 +986,7 @@ class KafkaFlowAsinDetail(Templates):
F.col("describe_len").alias("asin_describe_len") F.col("describe_len").alias("asin_describe_len")
) )
df = df.drop("category", "seller_json") df = df.drop("category", "seller_json")
df = df.withColumn("date_info_del", F.lit(self.date_info))
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time() end_time = time.time()
elapsed_time = end_time - start_time elapsed_time = end_time - start_time
......
...@@ -982,6 +982,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -982,6 +982,7 @@ class KafkaRankAsinDetail(Templates):
F.col("describe_len").alias("asin_describe_len") F.col("describe_len").alias("asin_describe_len")
) )
df = df.drop("category", "seller_json") df = df.drop("category", "seller_json")
df = df.withColumn("date_info_del", F.lit("1970-01"))
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time() end_time = time.time()
elapsed_time = end_time - start_time elapsed_time = end_time - start_time
......
...@@ -1044,6 +1044,9 @@ class EsUtils(object): ...@@ -1044,6 +1044,9 @@ class EsUtils(object):
}, },
"img_type_arr": { "img_type_arr": {
"type": "integer" "type": "integer"
},
"date_info_del": {
"type": "keyword"
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment