Commit 80aaf611 by chenyuanjie

keepa详情数据解析入库

parent 04c1dd3a
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.common_util import CommonUtil
from utils.DorisHelper import DorisHelper
from utils.hdfs_utils import HdfsUtils
from datetime import datetime, timedelta
class DimKeepaAsinInfo(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_keepa_asin_history = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
# doris相关配置
self.doris_db = "selection"
self.doris_table = f"{self.site_name}_keepa_asin_latest_detail"
self.df_to_doris = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
sql = f"""
select asin, last_detail, update_at as updated_time from ods_keepa_asin_detail
where site_name = '{self.site_name}' and date_info = '{self.date_info}' and last_detail is not null;
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').select(
F.col('asin'),
F.get_json_object("last_detail", "$.packageLength").cast("int").alias("package_length"),
F.get_json_object("last_detail", "$.packageWidth").cast("int").alias("package_width"),
F.get_json_object("last_detail", "$.packageHeight").cast("int").alias("package_height"),
F.get_json_object("last_detail", "$.packageWeight").cast("int").alias("package_weight"),
F.get_json_object("last_detail", "$.itemWeight").cast("int").alias("item_weight"),
F.get_json_object("last_detail", "$.listedSince").cast("int").alias("listed_since"),
F.get_json_object("last_detail", "$.releaseDate").cast("int").alias("release_date"),
F.get_json_object("last_detail", "$.trackingSince").cast("int").alias("tracking_since"),
F.col('updated_time')
).withColumn(
'weight', F.greatest(F.col("package_weight"), F.col("item_weight"))
).withColumn(
"min_time", F.least(
F.when(F.col("listed_since") != 0, F.col("listed_since")),
F.when(F.col("tracking_since") > 0, F.col("tracking_since"))
)
).withColumn(
"keepa_launch_time",
F.when(
F.col("min_time").isNull(), F.lit(None)
).otherwise(
F.date_format(F.from_unixtime((F.col("min_time") + F.lit(21564000)) * 60), "yyyy-MM-dd")
)
).select(
F.col('asin'),
F.col('package_length'),
F.col('package_width'),
F.col('package_height'),
F.col('package_weight'),
F.col('item_weight'),
F.col('weight'),
F.col('keepa_launch_time'),
F.col('updated_time'),
F.col('listed_since'),
F.col('release_date'),
F.col('tracking_since')
).filter(
(F.col("package_length").isNotNull()) & (F.col("package_width").isNotNull()) & (F.col("package_height").isNotNull()) & (F.col("weight").isNotNull())
).cache()
self.df_to_doris = self.df_keepa_asin.select(
'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight', 'weight',
'listed_since', 'release_date', 'tracking_since', 'keepa_launch_time', 'updated_time'
)
# 读取历史数据
sql = f"""
select asin, package_length, package_width, package_height, package_weight, item_weight, weight, keepa_launch_time, updated_time, listed_since, release_date, tracking_since
from dim_keepa_asin_info where site_name = '{self.site_name}';
"""
self.df_keepa_asin_history = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
def handle_data(self):
# 去重
self.df_save = self.df_keepa_asin.unionByName(self.df_keepa_asin_history)
window = Window.partitionBy(['asin']).orderBy(
self.df_save.updated_time.desc_nulls_last()
)
self.df_save = self.df_save.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank')
def save_data(self):
self.df_save = self.df_save.withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_info", F.lit(self.date_info)
).repartition(50)
hive_tb = "dim_keepa_asin_info"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success!")
if self.df_save.count() >= self.df_keepa_asin_history.count():
print(f"正在删除历史分区数据")
self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
""")
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info})
)
print("success!")
# 写入Doris表
print("往doris存储最新keepa详情信息:")
doris_table_columns = """
asin, package_length, package_width, package_height, package_weight, item_weight, weight,
listed_since, release_date, tracking_since, keepa_launch_time, updated_time
"""
DorisHelper.spark_export_with_columns(
df_save=self.df_to_doris,
db_name=self.doris_db,
table_name=self.doris_table,
table_columns=doris_table_columns
)
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DimKeepaAsinInfo(site_name, date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
from datetime import datetime, timedelta
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
# date_info = date.today().strftime("%Y-%m-%d")
# last_date_info = (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
db_type = 'postgresql_cluster'
import_table = f"{site_name}_keepa_last_detail"
hive_table = "ods_keepa_asin_detail"
partition_dict = {
"site_name": site_name,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
query = f"""
SELECT asin, last_detail::text as last_detail, update_at FROM {import_table}
WHERE update_at >= '{last_date_info}' AND update_at < '{date_info}' AND last_detail is not null AND \$CONDITIONS
"""
engine.sqoop_raw_import(
query=query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment