Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
1f501233
Commit
1f501233
authored
Jun 17, 2025
by
fangxingjun
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer
parents
0ad41221
13f4a99e
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
297 additions
and
68 deletions
+297
-68
dim_asin_related_traffic.py
Pyspark_job/dim/dim_asin_related_traffic.py
+73
-46
dwt_asin_related_traffic.py
Pyspark_job/dwt/dwt_asin_related_traffic.py
+7
-0
update_syn_pg14.py
Pyspark_job/script/update_syn_pg14.py
+120
-0
export_dwt_asin_related_traffic.py
Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py
+45
-22
ods_self_asin_related_traffic.py
Pyspark_job/sqoop_import/ods_self_asin_related_traffic.py
+52
-0
No files found.
Pyspark_job/dim/dim_asin_related_traffic.py
View file @
1f501233
import
os
import
sys
import
re
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
...
...
@@ -43,15 +44,42 @@ class DimAsinRelatedTraffic(object):
self
.
u_categorize_flow
=
F
.
udf
(
self
.
categorize_flow
,
StringType
())
self
.
u_merge_df
=
F
.
udf
(
self
.
merge_df
,
StringType
())
self
.
u_repair_json
=
F
.
udf
(
self
.
repair_json
,
StringType
())
@staticmethod
def
repair_json
(
json_str
):
"""修复指定字段的数组格式"""
if
not
json_str
:
return
json_str
# 匹配三种情况:1) 已格式化的数组 2) 引号包裹的字符串 3) 无引号的值
pattern
=
re
.
compile
(
r'("Brand in this category on Amazon"\s*:\s*)(\[[^\]]+\]|"([^"]+)"|([^,{}"]+))'
)
def
replace_func
(
m
):
# 如果已经是数组格式(group(2)以[开头),直接返回
if
m
.
group
(
2
)
.
startswith
(
'['
):
return
m
.
group
(
0
)
# 返回整个匹配,不做修改
# 处理字符串值或无引号值
raw_value
=
m
.
group
(
3
)
or
m
.
group
(
4
)
items
=
[
v
.
strip
()
for
v
in
raw_value
.
split
(
","
)
if
v
.
strip
()]
return
f
'{m.group(1)}["{""",""".join(items)}"]'
return
pattern
.
sub
(
replace_func
,
json_str
)
@staticmethod
def
merge_df
(
col1
,
col2
):
if
col1
is
None
:
return
col2
if
col2
is
None
:
return
col1
combined
=
set
(
col1
.
split
(
","
)
+
col2
.
split
(
","
))
return
","
.
join
([
x
for
x
in
combined
if
x
])
if
not
col1
or
col1
.
strip
()
==
""
:
return
col2
if
(
col2
and
col2
.
strip
())
else
None
if
not
col2
or
col2
.
strip
()
==
""
:
return
col1
if
(
col1
and
col1
.
strip
())
else
None
list1
=
list
(
set
(
x
.
strip
()
for
x
in
col1
.
split
(
","
)
if
x
.
strip
()))
list2
=
list
(
set
(
x
.
strip
()
for
x
in
col2
.
split
(
","
)
if
x
.
strip
()))
combined
=
list
(
set
(
list1
+
list2
))
return
","
.
join
(
combined
)
if
combined
else
None
@staticmethod
def
categorize_flow
(
key
):
...
...
@@ -106,7 +134,7 @@ class DimAsinRelatedTraffic(object):
包含去重后值的DataFrame(只有一列)
"""
return
df
.
withColumn
(
'json_array'
,
F
.
from_json
(
F
.
co
alesce
(
F
.
col
(
json_column
),
F
.
lit
(
"[]"
)
),
ArrayType
(
MapType
(
StringType
(),
StringType
())))
'json_array'
,
F
.
from_json
(
F
.
co
l
(
json_column
),
ArrayType
(
MapType
(
StringType
(),
StringType
())))
)
.
withColumn
(
"exploded_item"
,
F
.
explode
(
"json_array"
)
)
.
withColumn
(
...
...
@@ -130,7 +158,8 @@ class DimAsinRelatedTraffic(object):
result_list_json,
bundles_this_asins_json,
updated_at
from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
from ods_asin_related_traffic
where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null;
"""
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
...
@@ -146,7 +175,7 @@ class DimAsinRelatedTraffic(object):
result_list_json,
null as bundles_this_asins_json,
updated_at
from ods_self_asin_related_traffic where site_name='{self.site_name}';
from ods_self_asin_related_traffic where site_name='{self.site_name}'
and asin is not null
;
"""
self
.
df_self_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
...
@@ -162,42 +191,30 @@ class DimAsinRelatedTraffic(object):
# 处理result_list_json字段
def
handle_result_list_json
(
self
):
self
.
df_result_list_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'result_list_json'
)
# 对复杂json进行拆解
self
.
df_result_list_json
=
self
.
df_result_list_json
.
withColumn
(
'result_list_json'
,
F
.
from_json
(
F
.
coalesce
(
F
.
col
(
'result_list_json'
),
F
.
lit
(
'[]'
)),
ArrayType
(
MapType
(
StringType
(),
ArrayType
(
StringType
()))))
)
.
withColumn
(
"exploded_map"
,
F
.
explode
(
"result_list_json"
)
)
.
withColumn
(
"key_value_pair"
,
F
.
explode
(
F
.
map_entries
(
"exploded_map"
))
)
.
withColumn
(
"original_key"
,
F
.
col
(
"key_value_pair.key"
)
json_schema
=
ArrayType
(
MapType
(
StringType
(),
ArrayType
(
StringType
())))
self
.
df_result_list_json
=
self
.
df_asin_detail
.
filter
(
F
.
col
(
'result_list_json'
)
.
isNotNull
()
)
.
select
(
'asin'
,
F
.
from_json
(
self
.
u_repair_json
(
F
.
col
(
'result_list_json'
)),
json_schema
)
.
alias
(
'parsed_json'
)
)
.
withColumn
(
"value_array"
,
F
.
col
(
"key_value_pair.value"
)
"kv"
,
F
.
explode
(
"parsed_json"
)
)
.
select
(
"asin"
,
F
.
explode
(
"kv"
)
.
alias
(
"key"
,
"value"
)
)
.
withColumn
(
"distinct_values"
,
F
.
array_distinct
(
"value_array"
)
)
.
withColumn
(
# 过滤掉asin长度不为10的脏数据
"filtered_values"
,
F
.
filter
(
"distinct_values"
,
lambda
x
:
(
x
.
isNotNull
()
&
(
x
!=
""
)
&
(
F
.
length
(
F
.
trim
(
x
))
==
10
)))
)
.
withColumn
(
"values"
,
F
.
concat_ws
(
","
,
"filtered_values"
)
"category"
,
self
.
u_categorize_flow
(
F
.
col
(
"key"
))
)
.
filter
(
F
.
col
(
"values"
)
.
isNotNull
()
&
(
F
.
trim
(
F
.
col
(
'values'
))
!=
""
)
)
.
select
(
"asin"
,
"original_key"
,
"values"
)
# 将流量类型就行归类
self
.
df_result_list_json
=
self
.
df_result_list_json
.
withColumn
(
"key"
,
self
.
u_categorize_flow
(
"original_key"
)
F
.
col
(
"category"
)
!=
"other"
)
.
withColumn
(
"distinct_values"
,
F
.
array_distinct
(
"value"
)
)
.
filter
(
F
.
col
(
'key'
)
!=
'other'
)
.
groupBy
(
"asin"
,
"key"
)
.
agg
(
F
.
concat_ws
(
","
,
F
.
array_distinct
(
F
.
flatten
(
F
.
collect_list
(
F
.
split
(
"values"
,
","
)))))
.
alias
(
"values"
)
)
.
select
(
"asin"
,
"key"
,
"values"
)
# 行转列
self
.
df_result_list_json
=
self
.
df_result_list_json
.
groupBy
(
"asin"
)
\
.
pivot
(
"
key"
)
\
.
agg
(
F
.
first
(
"values"
))
\
F
.
expr
(
"size(distinct_values) > 0"
)
)
.
select
(
'asin'
,
'category'
,
'distinct_values'
)
.
groupBy
([
"asin"
,
"category"
])
.
agg
(
F
.
concat_ws
(
","
,
F
.
array_distinct
(
F
.
flatten
(
F
.
collect_list
(
"distinct_values"
))))
.
alias
(
"values"
)
)
.
groupBy
(
"asin"
)
\
.
pivot
(
"
category"
)
\
.
agg
(
F
.
first
(
"values"
))
\
.
cache
()
print
(
"处理result_list_json字段结果如下:"
)
self
.
df_result_list_json
.
show
(
10
,
True
)
...
...
@@ -205,7 +222,9 @@ class DimAsinRelatedTraffic(object):
# 处理其他流量字段
def
handle_other_field
(
self
):
# 处理sp_initial_seen_asins_json字段
self
.
df_sp_initial_seen_asins_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'sp_initial_seen_asins_json'
)
self
.
df_sp_initial_seen_asins_json
=
self
.
df_asin_detail
\
.
select
(
'asin'
,
'sp_initial_seen_asins_json'
)
\
.
filter
(
F
.
col
(
'sp_initial_seen_asins_json'
)
.
isNotNull
())
self
.
df_sp_initial_seen_asins_json
=
self
.
other_json_handle
(
df
=
self
.
df_sp_initial_seen_asins_json
,
json_column
=
'sp_initial_seen_asins_json'
,
...
...
@@ -216,7 +235,9 @@ class DimAsinRelatedTraffic(object):
self
.
df_sp_initial_seen_asins_json
.
show
(
10
,
True
)
# 处理sp_4stars_initial_seen_asins_json字段
self
.
df_sp_4stars_initial_seen_asins_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'sp_4stars_initial_seen_asins_json'
)
self
.
df_sp_4stars_initial_seen_asins_json
=
self
.
df_asin_detail
\
.
select
(
'asin'
,
'sp_4stars_initial_seen_asins_json'
)
\
.
filter
(
F
.
col
(
'sp_4stars_initial_seen_asins_json'
)
.
isNotNull
())
self
.
df_sp_4stars_initial_seen_asins_json
=
self
.
other_json_handle
(
df
=
self
.
df_sp_4stars_initial_seen_asins_json
,
json_column
=
'sp_4stars_initial_seen_asins_json'
,
...
...
@@ -227,7 +248,9 @@ class DimAsinRelatedTraffic(object):
self
.
df_sp_4stars_initial_seen_asins_json
.
show
(
10
,
True
)
# 处理sp_delivery_initial_seen_asins_json字段
self
.
df_sp_delivery_initial_seen_asins_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'sp_delivery_initial_seen_asins_json'
)
self
.
df_sp_delivery_initial_seen_asins_json
=
self
.
df_asin_detail
\
.
select
(
'asin'
,
'sp_delivery_initial_seen_asins_json'
)
\
.
filter
(
F
.
col
(
'sp_delivery_initial_seen_asins_json'
)
.
isNotNull
())
self
.
df_sp_delivery_initial_seen_asins_json
=
self
.
other_json_handle
(
df
=
self
.
df_sp_delivery_initial_seen_asins_json
,
json_column
=
'sp_delivery_initial_seen_asins_json'
,
...
...
@@ -238,7 +261,9 @@ class DimAsinRelatedTraffic(object):
self
.
df_sp_delivery_initial_seen_asins_json
.
show
(
10
,
True
)
# 处理compare_similar_asin_json字段
self
.
df_compare_similar_asin_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'compare_similar_asin_json'
)
self
.
df_compare_similar_asin_json
=
self
.
df_asin_detail
\
.
select
(
'asin'
,
'compare_similar_asin_json'
)
\
.
filter
(
F
.
col
(
'compare_similar_asin_json'
)
.
isNotNull
())
self
.
df_compare_similar_asin_json
=
self
.
other_json_handle
(
df
=
self
.
df_compare_similar_asin_json
,
json_column
=
'compare_similar_asin_json'
,
...
...
@@ -249,7 +274,9 @@ class DimAsinRelatedTraffic(object):
self
.
df_compare_similar_asin_json
.
show
(
10
,
True
)
# 处理bundles_this_asins_json字段
self
.
df_bundles_this_asins_json
=
self
.
df_asin_detail
.
select
(
'asin'
,
'bundles_this_asins_json'
)
self
.
df_bundles_this_asins_json
=
self
.
df_asin_detail
\
.
select
(
'asin'
,
'bundles_this_asins_json'
)
\
.
filter
(
F
.
col
(
'bundles_this_asins_json'
)
.
isNotNull
())
self
.
df_bundles_this_asins_json
=
self
.
other_json_handle
(
df
=
self
.
df_bundles_this_asins_json
,
json_column
=
'bundles_this_asins_json'
,
...
...
Pyspark_job/dwt/dwt_asin_related_traffic.py
View file @
1f501233
...
...
@@ -72,6 +72,13 @@ class DwtAsinRelatedTraffic(object):
def
handle_data
(
self
):
cols
=
[
col
for
col
in
self
.
df_dim_asin_related_traffic
.
columns
if
col
!=
'asin'
]
for
col
in
cols
:
self
.
df_dim_asin_related_traffic
=
self
.
df_dim_asin_related_traffic
.
withColumn
(
col
,
F
.
concat_ws
(
","
,
F
.
filter
(
F
.
split
(
F
.
col
(
col
),
","
),
lambda
x
:
(
F
.
length
(
F
.
trim
(
x
))
==
10
)))
)
.
withColumn
(
col
,
F
.
when
(
F
.
col
(
col
)
==
""
,
None
)
.
otherwise
(
F
.
col
(
col
))
)
# 将所有类型下的关联流量asin拼接
self
.
df_dim_asin_related_traffic
=
self
.
df_dim_asin_related_traffic
.
withColumn
(
"related_asin"
,
F
.
concat_ws
(
","
,
*
[
F
.
col
(
col
)
for
col
in
cols
])
...
...
Pyspark_job/script/update_syn_pg14.py
0 → 100644
View file @
1f501233
import
os
import
sys
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.templates
import
Templates
from
pyspark.sql
import
functions
as
F
from
utils.spark_util
import
SparkUtil
from
utils.db_util
import
DBUtil
from
utils.common_util
import
CommonUtil
class
UpdateSynPG14
(
Templates
):
def
__init__
(
self
,
site_name
,
date_type
,
date_info
):
super
()
.
__init__
()
self
.
site_name
=
site_name
self
.
date_type
=
date_type
self
.
date_info
=
date_info
app_name
=
f
"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self
.
spark
=
SparkUtil
.
get_spark_session
(
app_name
)
self
.
df_existing_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_related_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_variation
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_stable
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
def
read_data
(
self
):
print
(
"读取ods_asin_detail表,获取所有已抓asin"
)
sql
=
f
"""
select asin from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self
.
df_existing_asin
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"本月已抓asin如下:"
)
self
.
df_existing_asin
.
show
(
10
,
True
)
print
(
"从dwt_asin_related_traffic表中读取所有关联asin"
)
sql
=
f
"""
select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self
.
df_related_asin
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
cache
()
print
(
"关联asin数据如下:"
)
self
.
df_related_asin
.
show
(
10
,
True
)
print
(
"读取dim_asin_variation_info表"
)
sql
=
f
"""
select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name='{self.site_name}';
"""
self
.
df_asin_variation
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"asin_variation数据如下:"
)
self
.
df_asin_variation
.
show
(
10
,
True
)
print
(
"读取dim_asin_stable_info表"
)
sql
=
f
"""
select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name='{self.site_name}';
"""
self
.
df_asin_stable
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"asin重量体积数据如下:"
)
self
.
df_asin_stable
.
show
(
10
,
True
)
def
handle_data
(
self
):
# 解析关联asin字段
self
.
df_related_asin
=
self
.
df_related_asin
.
withColumn
(
'asin'
,
F
.
explode
(
F
.
split
(
F
.
col
(
'related_asin'
),
','
))
)
.
select
(
'asin'
)
.
drop_duplicates
([
'asin'
])
# 找出需要补抓的asin,并关联详情数据
self
.
df_save
=
self
.
df_related_asin
.
join
(
self
.
df_existing_asin
,
on
=
'asin'
,
how
=
'anti'
)
.
join
(
self
.
df_asin_variation
,
on
=
'asin'
,
how
=
'left'
)
.
join
(
self
.
df_asin_stable
,
on
=
'asin'
,
how
=
'left'
)
.
fillna
({
'asin_is_variation'
:
0
})
# 入库前处理
self
.
df_save
=
self
.
df_save
.
filter
(
F
.
length
(
F
.
col
(
'asin'
))
==
10
)
.
withColumn
(
'state'
,
F
.
lit
(
1
)
)
.
withColumn
(
'data_type'
,
F
.
lit
(
2
)
)
.
withColumn
(
'date_info'
,
F
.
lit
(
self
.
date_info
)
)
.
select
(
'asin'
,
'state'
,
'asin_is_variation'
,
'date_info'
,
'data_type'
,
'volume'
,
'weight_str'
)
.
cache
()
# print("最终结果如下:")
# self.df_save.show(10, True)
# print(f"需要补抓的asin数据量为:{self.df_save.count()}")
def
save_data
(
self
):
# 爬虫数据库连接
con_info
=
DBUtil
.
get_connection_info
(
'postgresql_14'
,
self
.
site_name
)
year_month
=
str
(
self
.
date_info
)
.
replace
(
"-"
,
"_"
)
table_name
=
f
'{self.site_name}_all_syn_st_month_{year_month}'
self
.
df_save
.
write
.
format
(
"jdbc"
)
\
.
option
(
"url"
,
con_info
[
"url"
])
\
.
option
(
"dbtable"
,
table_name
)
\
.
option
(
"user"
,
con_info
[
"username"
])
\
.
option
(
"password"
,
con_info
[
"pwd"
])
\
.
mode
(
"append"
)
\
.
save
()
users
=
[
"chenyuanjie"
,
"pengyanbing"
]
title
=
f
"关联流量:{self.site_name},{self.date_info}"
content
=
f
"关联流量需补抓的asin已导出到syn表,补抓量:{self.df_save.count()}"
CommonUtil
.
send_wx_msg
(
users
=
users
,
title
=
title
,
content
=
content
)
pass
if
__name__
==
"__main__"
:
site_name
=
sys
.
argv
[
1
]
# 参数1:站点
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj
=
UpdateSynPG14
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
)
handle_obj
.
run
()
Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py
View file @
1f501233
...
...
@@ -6,6 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from
utils.ssh_util
import
SSHUtil
from
utils.common_util
import
CommonUtil
from
utils.db_util
import
DBUtil
from
datetime
import
date
if
__name__
==
'__main__'
:
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
...
...
@@ -13,32 +14,39 @@ if __name__ == '__main__':
date_info
=
CommonUtil
.
get_sys_arg
(
3
,
None
)
print
(
f
"执行参数为{sys.argv}"
)
# CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
# principal='chenyuanjie',
# priority=2, export_tools_type=1, belonging_to_process=f'新ABA流程_{date_type}')
CommonUtil
.
judge_is_work_hours
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
,
principal
=
'chenyuanjie'
,
priority
=
3
,
export_tools_type
=
1
,
belonging_to_process
=
'关联流量'
)
db_type
=
'postgresql_cluster'
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
dt
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
export_tb
=
f
"{site_name}_asin_related_{dt}"
export_cols
=
[
'asin'
,
'related_asin'
,
'related_type'
,
'related_type_num'
,
'related_type_rate'
,
'free_cnt'
,
'free_rat'
,
'pay_cnt'
,
'pay_rat'
,
'total_cnt'
'related_type'
]
if
date_type
==
'month'
:
dt
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
export_tb
=
f
"{site_name}_asin_related_{dt}"
report_type
=
date_type
report_date
=
date_info
elif
date_type
==
'month_week'
:
export_tb
=
f
"{site_name}_asin_related_30_day"
report_type
=
'30_day'
report_date
=
date
.
today
()
else
:
raise
'date_type有误!'
sql
=
f
"""
DROP TABLE IF EXISTS {export_tb};
CREATE TABLE {export_tb} (LIKE us_asin_related_template INCLUDING ALL);
ALTER TABLE {export_tb} ALTER COLUMN related_asin TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_num TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_rate TYPE text;
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql
)
partition_dict
=
{
...
...
@@ -57,22 +65,37 @@ if __name__ == '__main__':
client
=
SSHUtil
.
get_ssh_client
()
SSHUtil
.
exec_command_async
(
client
,
sh
,
ignore_err
=
False
)
client
.
close
()
print
(
"数据导出完成,准备修改数据类型!"
)
sql
=
f
"""
ALTER TABLE {export_tb}
ALTER COLUMN related_asin TYPE VARCHAR(
2
0)[]
ALTER COLUMN related_asin TYPE VARCHAR(
1
0)[]
USING string_to_array(related_asin, ',');
ALTER TABLE {export_tb}
ALTER COLUMN related_type TYPE INTEGER[]
USING string_to_array(related_type, ',')::int[];
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql
)
print
(
"数据类型修改完成,准备创建索引!"
)
ALTER TABLE {export_tb}
ALTER COLUMN related_type_num TYPE INTEGER[]
USING string_to_array(related_type_num, ',')::int[];
sql
=
f
"""
CREATE INDEX {export_tb}_asin_idx ON {export_tb} USING btree (
"asin" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
ALTER TABLE {export_tb}
ALTER COLUMN related_type_rate TYPE numeric(10,4)[]
USING string_to_array(related_type_rate, ',')::numeric(10,4)[]
;
CREATE INDEX {export_tb}_related_asin_idx ON {export_tb} USING gin (
"related_asin" COLLATE "pg_catalog"."default" "pg_catalog"."array_ops"
)
;
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql
)
print
(
"索引创建完成,准备插入流程记录表!"
)
sql
=
f
"""
REPLACE INTO selection.workflow_everyday
(site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES
('{site_name}', '{report_date}', '导出PG集群完成', 14, '{export_tb}', '{report_type}', '关联流量', '是', '关联流量模块数据', 'postgresql_cluster');
"""
DBUtil
.
engine_exec_sql
(
DBUtil
.
get_db_engine
(
'mysql'
,
'us'
),
sql
)
print
(
"success!"
)
Pyspark_job/sqoop_import/ods_self_asin_related_traffic.py
0 → 100644
View file @
1f501233
import
os
import
sys
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
from
utils.ssh_util
import
SSHUtil
from
utils.common_util
import
CommonUtil
from
utils.hdfs_utils
import
HdfsUtils
if
__name__
==
'__main__'
:
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
assert
site_name
is
not
None
,
"site_name 不能为空!"
hive_table
=
"ods_self_asin_related_traffic"
partition_dict
=
{
"site_name"
:
site_name
}
hdfs_path
=
CommonUtil
.
build_hdfs_path
(
hive_table
,
partition_dict
=
partition_dict
)
print
(
f
"hdfs_path is {hdfs_path}"
)
db_type
=
'mysql'
import_table
=
f
"{site_name}_self_asin_detail"
sql_query
=
f
"""
select
id,
asin,
together_asin,
sp_initial_seen_asins_json,
sp_4stars_initial_seen_asins_json,
sp_delivery_initial_seen_asins_json,
compare_similar_asin_json,
result_list_json,
updated_at
from {import_table}
where site = '{site_name}'
and DATE(updated_at) >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
and
\
$CONDITIONS
"""
# 生成导出脚本
import_sh
=
CommonUtil
.
build_import_sh
(
site_name
=
site_name
,
db_type
=
db_type
,
query
=
sql_query
,
hdfs_path
=
hdfs_path
,
map_num
=
25
,
key
=
'id'
)
# 导入前先删除原始hdfs数据
HdfsUtils
.
delete_hdfs_file
(
hdfs_path
)
# 创建ssh Client对象--用于执行cmd命令
client
=
SSHUtil
.
get_ssh_client
()
SSHUtil
.
exec_command_async
(
client
,
import_sh
,
ignore_err
=
False
)
# 创建lzo索引和修复元数据
CommonUtil
.
after_import
(
hdfs_path
=
hdfs_path
,
hive_tb
=
hive_table
)
# 关闭链接
client
.
close
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment