Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
9b140e43
Commit
9b140e43
authored
Apr 01, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
0cf27bb1
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
49 additions
and
4 deletions
+49
-4
dwt_asin_sync.py
Pyspark_job/dwt/dwt_asin_sync.py
+19
-2
post_to_dolphin.py
Pyspark_job/my_kafka/post_to_dolphin.py
+30
-2
No files found.
Pyspark_job/dwt/dwt_asin_sync.py
View file @
9b140e43
...
@@ -14,6 +14,7 @@ import random
...
@@ -14,6 +14,7 @@ import random
import
sys
import
sys
import
time
import
time
import
traceback
import
traceback
from
datetime
import
datetime
import
pandas
as
pd
import
pandas
as
pd
from
pyspark.storagelevel
import
StorageLevel
from
pyspark.storagelevel
import
StorageLevel
...
@@ -64,7 +65,16 @@ class DwtAsinSync(Templates):
...
@@ -64,7 +65,16 @@ class DwtAsinSync(Templates):
self
.
reset_partitions
(
partitions_num
=
5
)
self
.
reset_partitions
(
partitions_num
=
5
)
self
.
date_type
=
self
.
judge_date_type
()
self
.
date_type
=
self
.
judge_date_type
()
@staticmethod
def
judge_today
():
today
=
datetime
.
now
()
.
day
if
10
<=
today
<=
25
:
return
True
else
:
return
False
def
judge_date_type
(
self
):
def
judge_date_type
(
self
):
day_flag
=
self
.
judge_today
()
print
(
f
"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}"
)
print
(
f
"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}"
)
if
self
.
date_type
in
[
"month"
,
"month_week"
]:
if
self
.
date_type
in
[
"month"
,
"month_week"
]:
year
,
month
=
self
.
date_info
.
split
(
"-"
)
year
,
month
=
self
.
date_info
.
split
(
"-"
)
...
@@ -77,9 +87,11 @@ class DwtAsinSync(Templates):
...
@@ -77,9 +87,11 @@ class DwtAsinSync(Templates):
# self.date_type = "month_week" # 清空syn爬虫表
# self.date_type = "month_week" # 清空syn爬虫表
while
True
:
while
True
:
try
:
try
:
if
day_flag
:
# 1. 只有属于10-25号期间执行才清空syn爬虫表; 2. 月搜索词没有导入进来
sql_truncate
=
f
"truncate {self.table_syn};"
sql_truncate
=
f
"truncate {self.table_syn};"
print
(
f
"月搜索词没有导入进来, 需要先清空表, sql_truncate: {sql_truncate}"
)
print
(
f
"月搜索词没有导入进来, 需要先清空表, sql_truncate: {sql_truncate}"
)
self
.
engine_pg14
.
execute
(
sql_truncate
)
#
self.engine_pg14.execute(sql_truncate)
break
break
except
Exception
as
e
:
except
Exception
as
e
:
print
(
e
,
traceback
.
format_exc
())
print
(
e
,
traceback
.
format_exc
())
...
@@ -140,6 +152,9 @@ class DwtAsinSync(Templates):
...
@@ -140,6 +152,9 @@ class DwtAsinSync(Templates):
)
.
unionByName
(
)
.
unionByName
(
self
.
df_asin_from_day
,
allowMissingColumns
=
True
self
.
df_asin_from_day
,
allowMissingColumns
=
True
)
)
# self.df_save = self.df_asin_from_adv.unionByName(
# self.df_asin_from_day, allowMissingColumns=True
# )
if
self
.
date_type
==
'month_week'
:
if
self
.
date_type
==
'month_week'
:
self
.
df_save
=
self
.
df_asin_from_st
.
unionByName
(
self
.
df_save
=
self
.
df_asin_from_st
.
unionByName
(
self
.
df_asin_from_adv
,
allowMissingColumns
=
True
self
.
df_asin_from_adv
,
allowMissingColumns
=
True
...
@@ -165,7 +180,9 @@ class DwtAsinSync(Templates):
...
@@ -165,7 +180,9 @@ class DwtAsinSync(Templates):
self
.
df_asin_stable
,
on
=
'asin'
,
how
=
'left'
self
.
df_asin_stable
,
on
=
'asin'
,
how
=
'left'
)
)
# 处理同步逻辑
# 处理同步逻辑
if
self
.
date_type
!=
'day'
:
print
(
"==="
*
20
)
print
(
f
"{type(self.df_asin_syn)}: {self.df_asin_syn.count()}"
)
if
self
.
date_type
!=
'day'
and
self
.
df_asin_syn
.
count
()
>
0
:
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
self
.
df_save
=
self
.
df_save
.
withColumn
(
'site_name'
,
F
.
lit
(
self
.
site_name
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'site_name'
,
F
.
lit
(
self
.
site_name
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_type'
,
F
.
lit
(
self
.
date_type
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_type'
,
F
.
lit
(
self
.
date_type
))
...
...
Pyspark_job/my_kafka/post_to_dolphin.py
View file @
9b140e43
...
@@ -125,6 +125,8 @@ class DolphinschedulerHelper(object):
...
@@ -125,6 +125,8 @@ class DolphinschedulerHelper(object):
)
)
resp_json
=
json
.
loads
(
resp
.
content
.
decode
(
"utf-8"
))
resp_json
=
json
.
loads
(
resp
.
content
.
decode
(
"utf-8"
))
resp_state
=
bool
(
resp_json
[
'success'
])
resp_state
=
bool
(
resp_json
[
'success'
])
title
=
f
"【海豚调度】调度api触发提示"
if
resp_state
:
if
resp_state
:
DolphinschedulerHelper
.
send_startup_state_to_oa
(
project_name
,
process_df_name
,
resp_state
)
DolphinschedulerHelper
.
send_startup_state_to_oa
(
project_name
,
process_df_name
,
resp_state
)
return
True
return
True
...
@@ -133,6 +135,33 @@ class DolphinschedulerHelper(object):
...
@@ -133,6 +135,33 @@ class DolphinschedulerHelper(object):
raise
Exception
(
f
"任务【{project_name}/{process_df_name}】调度失败!"
)
raise
Exception
(
f
"任务【{project_name}/{process_df_name}】调度失败!"
)
@classmethod
@classmethod
def
send_wx_msg
(
cls
,
users
:
list
,
title
:
str
,
content
:
str
,
msgtype
:
str
=
"textcard"
):
"""
通过选品wx消息推送接口,推送消息到oa
:param users: 填写需要推送的微信用户名list
:param title: 推送的标题(如果msgtype采用markdown形式,则不附带标题)
:param content: 推送的主体内容
:param msgtype: 推送的消息类型(textcard:默认卡片类型;markdown:markdaown结构)
"""
if
users
is
not
None
:
accounts
=
","
.
join
(
users
)
# 排除users_list=[''] 无需发送
if
bool
(
accounts
):
host
=
"http://120.79.147.190:8080"
url
=
f
'{host}/soundasia_selection/dolphinScheduler/sendMessage'
data
=
{
'account'
:
accounts
,
'title'
:
title
,
'content'
:
content
,
'msgtype'
:
msgtype
}
try
:
requests
.
post
(
url
=
url
,
data
=
data
,
timeout
=
15
)
except
:
pass
return
True
@classmethod
def
send_startup_state_to_oa
(
cls
,
project_name
:
str
,
process_df_name
:
str
,
resp_state
:
bool
):
def
send_startup_state_to_oa
(
cls
,
project_name
:
str
,
process_df_name
:
str
,
resp_state
:
bool
):
"""
"""
根据api触发海豚oa消息推送(推送人由维护在海豚调度任务中的wx_user决定)
根据api触发海豚oa消息推送(推送人由维护在海豚调度任务中的wx_user决定)
...
@@ -141,7 +170,6 @@ class DolphinschedulerHelper(object):
...
@@ -141,7 +170,6 @@ class DolphinschedulerHelper(object):
:param resp_state:任务调度启动状态
:param resp_state:任务调度启动状态
:return
:return
"""
"""
from
utils.common_util
import
CommonUtil
wx_user_list
=
DolphinschedulerHelper
.
get_process_df_manger
(
project_name
,
process_df_name
)
wx_user_list
=
DolphinschedulerHelper
.
get_process_df_manger
(
project_name
,
process_df_name
)
title
=
f
"【海豚调度】调度api触发提示"
title
=
f
"【海豚调度】调度api触发提示"
if
resp_state
:
if
resp_state
:
...
@@ -149,7 +177,7 @@ class DolphinschedulerHelper(object):
...
@@ -149,7 +177,7 @@ class DolphinschedulerHelper(object):
else
:
else
:
msg
=
f
"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!"
msg
=
f
"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!"
if
bool
(
wx_user_list
):
if
bool
(
wx_user_list
):
CommonUtil
.
send_wx_msg
(
wx_user_list
,
title
,
msg
)
DolphinschedulerHelper
.
send_wx_msg
(
wx_user_list
,
title
,
msg
)
@classmethod
@classmethod
def
get_process_df_manger
(
cls
,
project_name
:
str
,
process_df_name
:
str
):
def
get_process_df_manger
(
cls
,
project_name
:
str
,
process_df_name
:
str
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment