Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
30d321fd
Commit
30d321fd
authored
Jul 16, 2025
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
f7c24024
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
56 additions
and
15 deletions
+56
-15
__init__.py
Pyspark_job/img_search/__init__.py
+37
-0
img_download.py
Pyspark_job/img_search/img_download.py
+8
-8
img_dwd_id_index.py
Pyspark_job/img_search/img_dwd_id_index.py
+0
-1
img_extract_features.py
Pyspark_job/img_search/img_extract_features.py
+6
-6
db_util.py
Pyspark_job/utils/db_util.py
+5
-0
No files found.
Pyspark_job/img_search/__init__.py
View file @
30d321fd
"""
以图搜图执行代码顺序:
# 1. 更新图片并下载到本地h7, h5运行
/mnt/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/img_search/img_download.py us amazon_inv 200 1
# 2. 新增图片-默认选择最近7天
/mnt/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/img_search/img_local_path.py us amazon_inv
# 3. 提取图片特征 -- h567--多台机器同时跑(暂时放在h5)
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/img_search/img_extract_features.py us amazon_inv 1000 5
# 4. 导入图片特征数据ods
/mnt/run_shell/sqoop_shell/import/img_features.sh us amazon_inv
# 5. 切片dim
/mnt/run_shell/spark_shell/dim/img_dim_features_slice.sh us amazon_inv
# 6.1 建立索引对应关系--doris-img_hdfs_index 先导入copy表
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/img_search/img_hdfs_index.py us amazon_inv
# 6.2 建立索引对应关系--hive-img_dwd_id_index
/opt/module/spark/bin/spark-submit --master yarn --driver-memory 2g --executor-memory 4g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/img_search/img_dwd_id_index_multiprocess.py us amazon_inv 3
# 7. 导出id和index对应关系到doris(copy表)
/opt/module/spark/bin/spark-submit --master yarn --driver-memory 20g --executor-memory 20g --executor-cores 4 --num-executors 2 --queue spark /opt/module/spark/demo/py_demo/img_search/img_id_index_to_doris.py us amazon_inv
# 8. 删除索引hdfs路径相关的文件
# 删除索引hdfs路径相关的文件
hdfs dfs -rm -r /home/img_search/img_parquet/${site_name}/${img_type}/* 2>/dev/null || true
hdfs dfs -rm -r /home/img_search/img_tmp/${site_name}/${img_type}/* 2>/dev/null || true
hdfs dfs -rm -r /home/img_search/img_index/${site_name}/${img_type}/* 2>/dev/null || true
# 9. 上传parquet文件到hdfs
hdfs dfs -put /mnt/data/img_data/img_parquet/${site_name}/${img_type}/*/*.parquet /home/img_search/img_parquet/${site_name}/${img_type}/
# 10. 创建索引
/mnt/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/img_search/img_create_index.py
# 11. 把hdfs的索引文件拉到本地
rm -rf /mnt/data/img_data/img_index/${site_name}/${img_type}/* 2>/dev/null || true
hdfs dfs -get /home/img_search/img_index/${site_name}/${img_type}/knn.index /mnt/data/img_data/img_index/${site_name}/${img_type}/
# 12. 开启接口
ssh hadoop7 systemctl restart img_search.service
# 13. 交换表名
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/img_search/img_alter_table_name.py ${site_name} ${img_type}
"""
\ No newline at end of file
Pyspark_job/img_search/img_download.py
View file @
30d321fd
...
@@ -16,7 +16,7 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', lev
...
@@ -16,7 +16,7 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', lev
os
.
environ
[
"PYARROW_IGNORE_TIMEZONE"
]
=
"1"
os
.
environ
[
"PYARROW_IGNORE_TIMEZONE"
]
=
"1"
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.db_util
import
DbTypes
,
DBUtil
from
utils.db_util
import
DbTypes
,
DBUtil
,
get_redis_h14
class
ImgDownload
(
object
):
class
ImgDownload
(
object
):
...
@@ -27,7 +27,7 @@ class ImgDownload(object):
...
@@ -27,7 +27,7 @@ class ImgDownload(object):
self
.
thread_num
=
thread_num
self
.
thread_num
=
thread_num
self
.
limit
=
limit
self
.
limit
=
limit
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
self
.
hostname
=
socket
.
gethostname
()
self
.
hostname
=
socket
.
gethostname
()
self
.
first_local_dir
,
self
.
read_table
=
self
.
get_first_local_dir
()
self
.
first_local_dir
,
self
.
read_table
=
self
.
get_first_local_dir
()
# self.read_table = f"{self.site_name}_inv_img_info"
# self.read_table = f"{self.site_name}_inv_img_info"
...
@@ -54,8 +54,8 @@ class ImgDownload(object):
...
@@ -54,8 +54,8 @@ class ImgDownload(object):
lock_name: 锁的key, 建议和任务名称保持一致
lock_name: 锁的key, 建议和任务名称保持一致
"""
"""
lock_value
=
str
(
uuid
.
uuid4
())
lock_value
=
str
(
uuid
.
uuid4
())
lock_acquired
=
self
.
client
.
set
(
lock_name
,
lock_value
,
nx
=
True
,
ex
=
timeout
)
# 可以不设置超时时间
lock_acquired
=
self
.
client
_redis
.
set
(
lock_name
,
lock_value
,
nx
=
True
,
ex
=
timeout
)
# 可以不设置超时时间
# lock_acquired = self.client.set(lock_name, lock_value, nx=True)
# lock_acquired = self.client
_redis
.set(lock_name, lock_value, nx=True)
return
lock_acquired
,
lock_value
return
lock_acquired
,
lock_value
def
release_lock
(
self
,
lock_name
,
lock_value
):
def
release_lock
(
self
,
lock_name
,
lock_value
):
...
@@ -67,7 +67,7 @@ class ImgDownload(object):
...
@@ -67,7 +67,7 @@ class ImgDownload(object):
return 0
return 0
end
end
"""
"""
result
=
self
.
client
.
eval
(
script
,
1
,
lock_name
,
lock_value
)
result
=
self
.
client
_redis
.
eval
(
script
,
1
,
lock_name
,
lock_value
)
return
result
return
result
@staticmethod
@staticmethod
...
@@ -109,7 +109,7 @@ class ImgDownload(object):
...
@@ -109,7 +109,7 @@ class ImgDownload(object):
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
time
.
sleep
(
20
)
time
.
sleep
(
20
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
continue
continue
def
read_data
(
self
):
def
read_data
(
self
):
...
@@ -138,7 +138,7 @@ class ImgDownload(object):
...
@@ -138,7 +138,7 @@ class ImgDownload(object):
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
time
.
sleep
(
20
)
time
.
sleep
(
20
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
continue
continue
def
handle_data
(
self
,
df
,
thread_id
):
def
handle_data
(
self
,
df
,
thread_id
):
...
@@ -180,7 +180,7 @@ class ImgDownload(object):
...
@@ -180,7 +180,7 @@ class ImgDownload(object):
except
Exception
as
e
:
except
Exception
as
e
:
print
(
e
,
traceback
.
format_exc
())
print
(
e
,
traceback
.
format_exc
())
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_mysql
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
mysql
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
time
.
sleep
(
20
)
time
.
sleep
(
20
)
continue
continue
...
...
Pyspark_job/img_search/img_dwd_id_index.py
View file @
30d321fd
...
@@ -93,7 +93,6 @@ class PicturesIdIndex(Templates):
...
@@ -93,7 +93,6 @@ class PicturesIdIndex(Templates):
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
print
(
f
"读取数据错误: {e}"
,
traceback
.
format_exc
())
time
.
sleep
(
5
)
time
.
sleep
(
5
)
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
continue
continue
def
handle_data
(
self
):
def
handle_data
(
self
):
...
...
Pyspark_job/img_search/img_extract_features.py
View file @
30d321fd
...
@@ -16,7 +16,7 @@ sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
...
@@ -16,7 +16,7 @@ sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# from utils.templates import Templates
# from utils.templates import Templates
from
sqlalchemy
import
text
from
sqlalchemy
import
text
from
vgg_model
import
VGGNet
from
vgg_model
import
VGGNet
from
utils.db_util
import
DbTypes
,
DBUtil
from
utils.db_util
import
DbTypes
,
DBUtil
,
get_redis_h14
logging
.
basicConfig
(
format
=
'
%(asctime)
s
%(name)
s
%(levelname)
s
%(message)
s'
,
level
=
logging
.
INFO
)
logging
.
basicConfig
(
format
=
'
%(asctime)
s
%(name)
s
%(levelname)
s
%(message)
s'
,
level
=
logging
.
INFO
)
...
@@ -30,7 +30,7 @@ class ImgExtractFeatures(object):
...
@@ -30,7 +30,7 @@ class ImgExtractFeatures(object):
self
.
thread_num
=
thread_num
self
.
thread_num
=
thread_num
self
.
limit
=
limit
self
.
limit
=
limit
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
self
.
local_name
=
f
"{self.site_name}_img_features"
self
.
local_name
=
f
"{self.site_name}_img_features"
self
.
vgg_model
=
VGGNet
()
self
.
vgg_model
=
VGGNet
()
self
.
hostname
=
socket
.
gethostname
()
self
.
hostname
=
socket
.
gethostname
()
...
@@ -43,8 +43,8 @@ class ImgExtractFeatures(object):
...
@@ -43,8 +43,8 @@ class ImgExtractFeatures(object):
lock_name: 锁的key, 建议和任务名称保持一致
lock_name: 锁的key, 建议和任务名称保持一致
"""
"""
lock_value
=
str
(
uuid
.
uuid4
())
lock_value
=
str
(
uuid
.
uuid4
())
lock_acquired
=
self
.
client
.
set
(
lock_name
,
lock_value
,
nx
=
True
,
ex
=
timeout
)
# 可以不设置超时时间
lock_acquired
=
self
.
client
_redis
.
set
(
lock_name
,
lock_value
,
nx
=
True
,
ex
=
timeout
)
# 可以不设置超时时间
# lock_acquired = self.client.set(lock_name, lock_value, nx=True)
# lock_acquired = self.client
_redis
.set(lock_name, lock_value, nx=True)
return
lock_acquired
,
lock_value
return
lock_acquired
,
lock_value
def
release_lock
(
self
,
lock_name
,
lock_value
):
def
release_lock
(
self
,
lock_name
,
lock_value
):
...
@@ -56,7 +56,7 @@ class ImgExtractFeatures(object):
...
@@ -56,7 +56,7 @@ class ImgExtractFeatures(object):
return 0
return 0
end
end
"""
"""
result
=
self
.
client
.
eval
(
script
,
1
,
lock_name
,
lock_value
)
result
=
self
.
client
_redis
.
eval
(
script
,
1
,
lock_name
,
lock_value
)
return
result
return
result
def
read_data
(
self
):
def
read_data
(
self
):
...
@@ -133,7 +133,7 @@ class ImgExtractFeatures(object):
...
@@ -133,7 +133,7 @@ class ImgExtractFeatures(object):
except
Exception
as
e
:
except
Exception
as
e
:
print
(
e
,
traceback
.
format_exc
())
print
(
e
,
traceback
.
format_exc
())
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
engine_doris
=
DBUtil
.
get_db_engine
(
db_type
=
DbTypes
.
doris
.
name
,
site_name
=
self
.
site_name
)
self
.
client
=
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'xA9!wL3pZ@q2'
)
self
.
client
_redis
=
get_redis_h14
(
)
self
.
vgg_model
=
VGGNet
()
self
.
vgg_model
=
VGGNet
()
time
.
sleep
(
20
)
time
.
sleep
(
20
)
continue
continue
...
...
Pyspark_job/utils/db_util.py
View file @
30d321fd
import
os
import
os
import
sys
import
sys
import
redis
from
enum
import
Enum
from
enum
import
Enum
from
sqlalchemy
import
create_engine
,
text
from
sqlalchemy
import
create_engine
,
text
from
datetime
import
datetime
,
timedelta
from
datetime
import
datetime
,
timedelta
...
@@ -10,6 +11,10 @@ from urllib.parse import quote_plus as urlquote
...
@@ -10,6 +11,10 @@ from urllib.parse import quote_plus as urlquote
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
def
get_redis_h14
():
return
redis
.
Redis
(
host
=
'192.168.10.224'
,
port
=
6379
,
db
=
9
,
password
=
'fG7#vT6kQ1pX'
)
class
DbTypes
(
Enum
):
class
DbTypes
(
Enum
):
"""
"""
导出数据库类型
导出数据库类型
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment