Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
13cba59c
Commit
13cba59c
authored
Nov 25, 2025
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
新增redis-存储报错-消息推送
parent
c84a984f
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
40 additions
and
30 deletions
+40
-30
kafka_asin_detail.py
Pyspark_job/my_kafka/kafka_asin_detail.py
+40
-30
No files found.
Pyspark_job/my_kafka/kafka_asin_detail.py
View file @
13cba59c
...
@@ -53,7 +53,7 @@ class DimStAsinInfo(Templates):
...
@@ -53,7 +53,7 @@ class DimStAsinInfo(Templates):
"fr"
:
4
,
"fr"
:
4
,
"it"
:
5
,
"it"
:
5
,
}
}
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
self
.
redis_db
[
self
.
site_name
],
password
=
'
yswg202
3'
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
self
.
redis_db
[
self
.
site_name
],
password
=
'
N8#rTp2Xz!Lk6@Vw9qHs4&Yb1Fm0Cj
3'
)
self
.
db_save
=
f
'kafka_asin_detail'
self
.
db_save
=
f
'kafka_asin_detail'
self
.
app_name
=
self
.
get_app_name
()
self
.
app_name
=
self
.
get_app_name
()
self
.
spark
=
self
.
create_spark_object
(
self
.
spark
=
self
.
create_spark_object
(
...
@@ -535,35 +535,45 @@ class DimStAsinInfo(Templates):
...
@@ -535,35 +535,45 @@ class DimStAsinInfo(Templates):
# 将Spark DataFrame转换为Pandas DataFrame
# 将Spark DataFrame转换为Pandas DataFrame
pdf
=
df
.
toPandas
()
pdf
=
df
.
toPandas
()
print
(
f
"开始存储数据: {pdf.shape}"
)
print
(
f
"开始存储数据: {pdf.shape}"
)
# 遍历Pandas DataFrame并将数据插入到Redis
while
True
:
for
index
,
row
in
pdf
.
iterrows
():
try
:
# 创建一个复合键,或者根据你的需要选择适当的键
# 遍历Pandas DataFrame并将数据插入到Redis
# 1. 外层key为10197, 内层可以为10197:15931
for
index
,
row
in
pdf
.
iterrows
():
# redis_key = f"{row['key_outer']}:{row['key_inner']}" #
# 创建一个复合键,或者根据你的需要选择适当的键
# # 插入值到Redis - 在这里我仅仅存储了一个值,你可以存储一个字典来存储多个值
# 1. 外层key为10197, 内层可以为10197:15931
# self.client.set(redis_key, row['value'])
# redis_key = f"{row['key_outer']}:{row['key_inner']}" #
# row_json = row.to_json(orient='split')
# # 插入值到Redis - 在这里我仅仅存储了一个值,你可以存储一个字典来存储多个值
# self.client.set(redis_key, row_json)
# self.client.set(redis_key, row['value'])
# 2. 外层key为10197, 内层可以为15931
# row_json = row.to_json(orient='split')
# redis_key = row['key_outer']
# self.client.set(redis_key, row_json)
# redis_field = row['key_inner']
# 2. 外层key为10197, 内层可以为15931
# row_json = row.to_json(orient='split')
# redis_key = row['key_outer']
# self.client.hset(redis_key, redis_field, row_json)
# redis_field = row['key_inner']
# row_json = row.to_json(orient='split')
# 3. hashmap + 外层key为10197, 内层可以为15931
# self.client.hset(redis_key, redis_field, row_json)
redis_key
=
row
[
'key_outer'
]
# redis_field = row['key_inner']
# 3. hashmap + 外层key为10197, 内层可以为15931
redis_field
=
row
[
'asin'
]
redis_key
=
row
[
'key_outer'
]
row_dict
=
row
.
to_dict
()
# redis_field = row['key_inner']
# row_dict = {k: str(v).lower().replace("none", "").replace("nan", "") for k, v in row_dict.items()} # 确保所有的值都是字符串
redis_field
=
row
[
'asin'
]
row_dict
=
{
k
:
str
(
v
)
.
replace
(
"None"
,
""
)
.
replace
(
"none"
,
""
)
.
replace
(
"NaN"
,
""
)
.
replace
(
"nan"
,
""
)
for
k
,
v
in
row_dict
.
items
()}
# 确保所有的值都是字符串
row_dict
=
row
.
to_dict
()
row_dict
=
{
k
:
format
(
v
,
".2f"
)
if
isinstance
(
v
,
(
int
,
float
))
else
str
(
v
)
.
replace
(
"None"
,
""
)
.
replace
(
# row_dict = {k: str(v).lower().replace("none", "").replace("nan", "") for k, v in row_dict.items()} # 确保所有的值都是字符串
"nan"
,
""
)
for
k
,
v
in
row_dict
.
items
()}
row_dict
=
{
k
:
str
(
v
)
.
replace
(
"None"
,
""
)
.
replace
(
"none"
,
""
)
.
replace
(
"NaN"
,
""
)
.
replace
(
"nan"
,
""
)
for
k
,
v
in
row_dict
.
items
()}
# 确保所有的值都是字符串
row_dict
=
{
k
:
format
(
v
,
".2f"
)
if
isinstance
(
v
,
(
int
,
float
))
else
str
(
v
)
.
replace
(
"None"
,
""
)
.
replace
(
del
row_dict
[
"key_outer"
]
"nan"
,
""
)
for
k
,
v
in
row_dict
.
items
()}
del
row_dict
[
"key_inner"
]
row_json
=
json
.
dumps
(
row_dict
)
del
row_dict
[
"key_outer"
]
self
.
client
.
hset
(
redis_key
,
redis_field
,
row_json
)
del
row_dict
[
"key_inner"
]
row_json
=
json
.
dumps
(
row_dict
)
self
.
client
.
hset
(
redis_key
,
redis_field
,
row_json
)
break
except
Exception
as
e
:
print
(
f
"存储redis报错--数据库账号密码不对: {e, traceback.format_exc()}"
)
time
.
sleep
(
10
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
self
.
redis_db
[
self
.
site_name
],
password
=
'N8#rTp2Xz!Lk6@Vw9qHs4&Yb1Fm0Cj3'
)
continue
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment