Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
16f825c9
Commit
16f825c9
authored
Jun 02, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
78306465
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
122 additions
and
0 deletions
+122
-0
export_bs_ns_to_doris.py
Pyspark_job/sqoop_export/export_bs_ns_to_doris.py
+122
-0
No files found.
Pyspark_job/sqoop_export/export_bs_ns_to_doris.py
0 → 100644
View file @
16f825c9
import
os
import
re
import
sys
sys
.
path
.
append
(
"/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/"
)
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.templates
import
Templates
from
utils.secure_db_client
import
get_remote_engine
from
utils.DorisHelper
import
DorisHelper
from
pyspark.sql
import
functions
as
F
class
ExportBsNs
(
Templates
):
def
__init__
(
self
,
site_name
=
'us'
,
date_type
=
"day"
,
date_info
=
'2022-10-01'
,
consumer_type
=
'lastest'
,
topic_name
=
"us_asin_detail"
,
batch_size
=
100000
):
super
()
.
__init__
()
self
.
site_name
=
site_name
self
.
date_type
=
date_type
self
.
date_info
=
date_info
self
.
db_save
=
f
'ods_bs_top100_asin'
self
.
spark
=
self
.
create_spark_object
(
app_name
=
f
"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}"
)
self
.
df_bs_top100
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
doris_db
=
"selection"
self
.
doris_table
=
f
"{site_name}_bsr_rank_latest"
def
read_data
(
self
):
print
(
"1. 读取ods_bs_top100_asin"
)
sql
=
f
"""
select date_info,id,asin,cate_1_id,cate_current_id,category_id,bsr_rank,price,rating,total_comments,
to_timestamp(created_at, 'yyyy-MM-dd HH:mm:ss.SSSSSS') AS created_at,
to_timestamp(updated_at, 'yyyy-MM-dd HH:mm:ss.SSSSSS') AS updated_at
from ods_bs_top100_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print
(
"sql="
,
sql
)
self
.
df_bs_top100
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
self
.
df_bs_top100
=
self
.
df_bs_top100
.
repartition
(
5
)
.
cache
()
self
.
df_bs_top100
.
show
(
10
,
truncate
=
False
)
# self.df_bs_top100 = self.df_bs_top100.withColumn(
# "created_at",
# F.when(
# (F.col("created_at").isNull()) | (F.col("created_at") == ""),
# None
# ).otherwise(F.to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))
# ).withColumn(
# "updated_at",
# F.when(
# (F.col("updated_at").isNull()) | (F.col("updated_at") == ""),
# None
# ).otherwise(F.to_timestamp("updated_at", "yyyy-MM-dd HH:mm:ss"))
# )
def
delete_data
(
self
):
engine
=
get_remote_engine
(
site_name
=
'us'
,
# -> database "selection"
db_type
=
"doris"
,
# -> 服务端 alias "mysql"
)
sql_delete
=
f
"delete from {self.doris_table} where date_info<'{self.date_info}'"
print
(
f
"删除最新导入之前的数据, sql_delete: {sql_delete}"
)
engine
.
execute
(
sql_delete
)
def
run
(
self
):
self
.
read_data
()
df
=
self
.
df_bs_top100
count
=
df
.
count
()
print
(
f
"读取完成,数据量:{count}"
)
df
.
show
(
10
,
truncate
=
False
)
TABLE_COLUMNS
=
"date_info,id,asin,cate_1_id,cate_current_id,category_id,bsr_rank,price,rating,total_comments,created_at,updated_at"
# ===== Step 2:写入 Doris selection.sys_edit_log =====
print
(
f
"[2/2] 写入 Doris {self.doris_db}.{self.doris_table}"
)
DorisHelper
.
spark_export_with_columns
(
df_save
=
df
,
db_name
=
self
.
doris_db
,
table_name
=
self
.
doris_table
,
table_columns
=
TABLE_COLUMNS
,
use_type
=
'selection'
,
)
print
(
"success"
)
self
.
delete_data
()
def
export_data
(
site_name
,
date_type
,
date_info
):
engine
=
get_remote_engine
(
site_name
=
'us'
,
# -> database "selection"
db_type
=
"doris"
,
# -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
partitions
=
{
'site_name'
:
site_name
,
'date_type'
:
date_type
,
'date_info'
:
date_info
,
}
cols_list
=
[
'date_info'
,
'id'
,
'asin'
,
'cate_1_id'
,
'cate_current_id'
,
'category_id'
,
'bsr_rank'
,
'price'
,
'rating'
,
'total_comments'
,
'created_at'
,
'updated_at'
]
import_table
=
f
'{site_name}_bsr_rank_lastest'
hive_table
=
'ods_bs_top100_asin'
print
(
f
"import_table: {import_table}, hive_table: {hive_table}"
)
print
(
f
"partitions: {partitions}"
)
engine
.
sqoop_raw_export
(
hive_table
=
hive_table
,
import_table
=
import_table
,
partitions
=
partitions
,
m
=
1
,
cols
=
','
.
join
(
cols_list
)
)
if
__name__
==
'__main__'
:
# site_name = 'us'
# date_type = 'day'
# date_info = '2026-06-02'
site_name
=
sys
.
argv
[
1
]
# 参数1:站点
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter/day
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
# export_data(site_name, date_type, date_info)
handle_obj
=
ExportBsNs
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
)
handle_obj
.
run
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment