Commit 3cf4294f by fangxingjun

no message

parent 16f825c9
......@@ -13,7 +13,7 @@ from pyspark.sql import functions as F
class ExportBsNs(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', seller_type="bsr"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
......@@ -23,6 +23,9 @@ class ExportBsNs(Templates):
self.df_bs_top100 = self.spark.sql(f"select 1+1;")
self.doris_db = "selection"
self.doris_table = f"{site_name}_bsr_rank_latest"
if seller_type == "nsr":
self.db_save = f'ods_nsr_top100_asin'
self.doris_table = f"{site_name}_nsr_rank_latest"
def read_data(self):
print("1. 读取ods_bs_top100_asin")
......@@ -30,25 +33,12 @@ class ExportBsNs(Templates):
select date_info,id,asin,cate_1_id,cate_current_id,category_id,bsr_rank,price,rating,total_comments,
to_timestamp(created_at, 'yyyy-MM-dd HH:mm:ss.SSSSSS') AS created_at,
to_timestamp(updated_at, 'yyyy-MM-dd HH:mm:ss.SSSSSS') AS updated_at
from ods_bs_top100_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
from {self.db_save} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print("sql=", sql)
self.df_bs_top100 = self.spark.sql(sqlQuery=sql)
self.df_bs_top100 = self.df_bs_top100.repartition(5).cache()
self.df_bs_top100.show(10, truncate=False)
# self.df_bs_top100 = self.df_bs_top100.withColumn(
# "created_at",
# F.when(
# (F.col("created_at").isNull()) | (F.col("created_at") == ""),
# None
# ).otherwise(F.to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))
# ).withColumn(
# "updated_at",
# F.when(
# (F.col("updated_at").isNull()) | (F.col("updated_at") == ""),
# None
# ).otherwise(F.to_timestamp("updated_at", "yyyy-MM-dd HH:mm:ss"))
# )
self.df_seller_top100 = self.spark.sql(sqlQuery=sql)
self.df_seller_top100 = self.df_seller_top100.repartition(5).cache()
self.df_seller_top100.show(10, truncate=False)
def delete_data(self):
engine = get_remote_engine(
......@@ -61,7 +51,7 @@ class ExportBsNs(Templates):
def run(self):
self.read_data()
df = self.df_bs_top100
df = self.df_seller_top100
count = df.count()
print(f"读取完成,数据量:{count}")
df.show(10, truncate=False)
......@@ -79,36 +69,6 @@ class ExportBsNs(Templates):
self.delete_data()
def export_data(site_name, date_type, date_info):
engine = get_remote_engine(
site_name='us', # -> database "selection"
db_type="doris", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['date_info', 'id', 'asin', 'cate_1_id', 'cate_current_id', 'category_id',
'bsr_rank', 'price', 'rating', 'total_comments', 'created_at', 'updated_at']
import_table = f'{site_name}_bsr_rank_lastest'
hive_table = 'ods_bs_top100_asin'
print(f"import_table: {import_table}, hive_table: {hive_table}")
print(f"partitions: {partitions}")
engine.sqoop_raw_export(
hive_table=hive_table,
import_table=import_table,
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
if __name__ == '__main__':
# site_name = 'us'
# date_type = 'day'
......@@ -116,7 +76,8 @@ if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
seller_type = sys.argv[4] # 参数4:榜单类型, 默认走bsr, 比如bsr, nsr
# export_data(site_name, date_type, date_info)
handle_obj = ExportBsNs(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj = ExportBsNs(site_name=site_name, date_type=date_type, date_info=date_info, seller_type=seller_type)
handle_obj.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment