Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
1f63468d
Commit
1f63468d
authored
Apr 15, 2025
by
吴济苍
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[fix]之前旧代码漏提交
parent
85267ca6
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
146 additions
and
2 deletions
+146
-2
dwt_st_pcp_current.py
Pyspark_job/dwt/dwt_st_pcp_current.py
+78
-1
dwt_st_pcp_current.py
Pyspark_job/sqoop_export/dwt_st_pcp_current.py
+68
-1
No files found.
Pyspark_job/dwt/dwt_st_pcp_current.py
View file @
1f63468d
...
...
@@ -81,5 +81,82 @@ def handle_calc():
print
(
"success"
)
def
handle_calc_new
():
day_end
=
CommonUtil
.
format_now
(
"
%
Y-
%
m-
%
d"
)
CommonUtil
.
orctable_concatenate
(
hive_table
=
"dim_st_pcp_history"
,
partition_dict
=
{
"date_info"
:
CommonUtil
.
get_day_offset
(
day_end
,
-
1
)
},
innerFlag
=
False
,
min_part_num
=
10
,
max_retry_time
=
5
)
spark
=
SparkUtil
.
get_spark_session
(
"dwt_st_pcp_current"
)
day_start
=
CommonUtil
.
get_day_offset
(
day_end
,
-
90
)
df_all
=
spark
.
sql
(
f
"""
select site_id,
group_id,
keyword_id,
keyword,
match_type,
created_at,
min_bid,
max_bid,
suggested_bid,
date_info
from dim_st_pcp_history
where date_info >= '{day_start}'
and date_info <= '{day_end}'
"""
)
window
=
Window
.
partitionBy
([
'site_id'
,
'match_type'
,
'keyword'
])
df_all
=
df_all
.
where
(
"site_id is not null and created_at is not null"
)
# 去重
df_all
=
df_all
.
dropDuplicates
([
'site_id'
,
'match_type'
,
'keyword'
,
'date_info'
])
# 获取最小的那天
df_save
=
df_all
.
withColumn
(
"day_row_number"
,
F
.
row_number
()
.
over
(
window
.
orderBy
(
F
.
col
(
"date_info"
)
.
desc
())))
df_save
=
df_save
.
where
(
"day_row_number == 1"
)
# 取最近的一天中的最小建议竞价的那一行作为过滤值
df_save
=
df_save
.
withColumn
(
"min_row_number"
,
F
.
row_number
()
.
over
(
window
.
orderBy
(
F
.
col
(
"suggested_bid"
)
.
asc
())))
df_save
=
df_save
.
where
(
"min_row_number == 1"
)
df_history
=
df_all
.
groupby
([
F
.
col
(
"site_id"
),
F
.
col
(
"keyword"
),
F
.
col
(
"match_type"
)])
.
agg
(
F
.
collect_list
(
F
.
struct
(
F
.
col
(
"min_bid"
),
F
.
col
(
"max_bid"
),
F
.
col
(
"suggested_bid"
),
F
.
col
(
"created_at"
)))
.
alias
(
"list"
)
)
df_history
=
df_history
.
withColumn
(
"history_json"
,
F
.
when
(
F
.
size
(
F
.
col
(
"list"
))
<=
1
,
F
.
lit
(
None
))
.
otherwise
(
F
.
to_json
(
F
.
col
(
"list"
))))
df_save
=
df_save
.
join
(
df_history
,
on
=
[
'site_id'
,
'keyword'
,
'match_type'
],
how
=
'left'
)
.
select
(
df_save
[
'site_id'
],
F
.
col
(
'group_id'
),
F
.
col
(
'keyword_id'
),
df_save
[
'keyword'
],
df_save
[
'match_type'
],
F
.
col
(
'created_at'
),
F
.
col
(
'min_bid'
),
F
.
col
(
'max_bid'
),
F
.
col
(
'suggested_bid'
),
F
.
col
(
'history_json'
),
F
.
lit
(
"90"
)
.
alias
(
"day"
)
)
# 更新
CommonUtil
.
save_or_update_table
(
spark_session
=
spark
,
hive_tb_name
=
"dwt_st_pcp_current_v2"
,
partition_dict
=
{
"day"
:
"90"
},
df_save
=
df_save
)
print
(
"success"
)
if
__name__
==
'__main__'
:
handle_calc
()
handle_calc
_new
()
Pyspark_job/sqoop_export/dwt_st_pcp_current.py
View file @
1f63468d
...
...
@@ -77,5 +77,72 @@ def handle_export():
pass
def
handle_export_v2
():
site_name
=
"us"
db_type
=
DbTypes
.
postgresql_cluster
.
name
export_tb
=
f
"st_pcp_current_v2"
export_tb_copy
=
f
"{export_tb}_copy"
hive_tb
=
"dwt_st_pcp_current_v2"
assert
CommonUtil
.
judge_not_working_hour
(),
"工作时间,请谨慎导出!!!!"
sql
=
f
""" drop table if exists {export_tb_copy};
create table if not exists {export_tb_copy}
(
like {export_tb} including indexes including comments
);
"""
print
(
"================================执行sql================================"
)
print
(
sql
)
DBUtil
.
exec_sql
(
db_type
,
site_name
,
sql
,
True
)
# 导出表名
sh
=
CommonUtil
.
build_export_sh
(
site_name
=
site_name
,
db_type
=
db_type
,
hive_tb
=
hive_tb
,
export_tb
=
export_tb_copy
,
col
=
[
"site_id"
,
"group_id"
,
"keyword_id"
,
"keyword"
,
"match_type"
,
"created_at"
,
"min_bid"
,
"max_bid"
,
"suggested_bid"
,
"history_json"
,
],
partition_dict
=
{
"day"
:
"90"
}
)
client
=
SSHUtil
.
get_ssh_client
()
SSHUtil
.
exec_command_async
(
client
,
sh
,
ignore_err
=
False
)
client
.
close
()
# 交换表名
DBUtil
.
exchange_tb
(
DBUtil
.
get_db_engine
(
db_type
,
site_name
),
export_tb
,
export_tb_copy
,
False
)
print
(
"success"
)
# # 更新 merchantwords_st_detail 表
# sql = f"""
# update merchantwords_st_detail msd
# set suggested_bid = spc.suggested_bid
# from st_pcp_current spc
# where spc.site_id = 4
# and spc.keyword = msd.keyword;
# """
# DBUtil.exec_sql(db_type, site_name, sql=sql, dispose_flag=True)
#
# # 更新 merchantwords_st_detail_v2_2024 表
# sql = f"""
# update merchantwords_st_detail_v2_2024 msd
# set suggested_bid = spc.suggested_bid
# from st_pcp_current spc
# where spc.site_id = 4
# and spc.keyword = msd.keyword;
# """
# DBUtil.exec_sql(db_type, site_name, sql=sql, dispose_flag=True)
pass
if
__name__
==
'__main__'
:
handle_export
()
handle_export
_v2
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment