Commit 6dd7a66e by fangxingjun

修复以图搜图导入doris数据不一致问题

parent 79a384c1
import ast
import datetime
import logging
import os
import re
import sys
import threading
import time
import traceback
import pandas as pd
import redis
from pyspark.sql.types import ArrayType, FloatType
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates_mysql import TemplatesMysql
from utils.templates import Templates
# from ..utils.templates import Templates
from py4j.java_gateway import java_import
from sqlalchemy import text
from pyspark.sql import functions as F
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Process
from multiprocessing import Pool
import multiprocessing
from utils.secure_db_client import get_remote_engine
from utils.db_util import DbTypes, DBUtil
from utils.StarRocksHelper import StarRocksHelper
from utils.DorisHelper import DorisHelper
class ImgIdIndexToDoris(Templates):
......@@ -36,10 +16,12 @@ class ImgIdIndexToDoris(Templates):
self.site_name = site_name
self.img_type = img_type
self.engine_doris = DBUtil.get_db_engine(db_type=DbTypes.doris.name, site_name=self.site_name)
self.table_name = "img_dwd_id_index"
self.doris_table = "img_id_index_copy"
self.db_save = self.table_name
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_id_index = self.spark.sql(f"select 1+1;")
self.table_name = "img_dwd_id_index"
self.table_save = "img_id_index_copy"
self.doris_db = "selection"
def read_data(self):
sql = f"select id, index, img_unique, site_name, img_type from {self.table_name} where site_name='{self.site_name}' and img_type = '{self.img_type}';"
......@@ -48,24 +30,50 @@ class ImgIdIndexToDoris(Templates):
self.df_id_index.show(10)
print(f"self.df_id_index.count(): {self.df_id_index.count()}")
def handle_data(self):
pass
def save_data(self):
# starrocks_url = "jdbc:mysql://192.168.10.151:19030/selection"
# properties = {
# "user": "fangxingjun",
# "password": "fangxingjun12345",
# "driver": "com.mysql.cj.jdbc.Driver",
# def save_data(self):
# # starrocks_url = "jdbc:mysql://192.168.10.151:19030/selection"
# # properties = {
# # "user": "fangxingjun",
# # "password": "fangxingjun12345",
# # "driver": "com.mysql.cj.jdbc.Driver",
# }
# self.df_id_index.write.jdbc(url=starrocks_url, table="image_id_index", mode="overwrite", properties=properties)
# self.df_id_index = self.df_id_index.withColumn('created_time', F.lit(datetime.datetime.now()))
# self.df_id_index = self.df_id_index.withColumn("img_type", F.col("img_type").cast("int"))
# StarRocksHelper.spark_export(df_save=self.df_id_index, db_name='selection', table_name='image_id_index')
df_save = self.df_id_index.toPandas()
# # # "driver": "com.mysql.cj.jdbc.Driver",
# # }
# # self.df_id_index.write.jdbc(url=starrocks_url, table="image_id_index", mode="overwrite", properties=properties)
# # self.df_id_index = self.df_id_index.withColumn('created_time', F.lit(datetime.datetime.now()))
# # self.df_id_index = self.df_id_index.withColumn("img_type", F.col("img_type").cast("int"))
# # StarRocksHelper.spark_export(df_save=self.df_id_index, db_name='selection', table_name='image_id_index')
# df_save = self.df_id_index.toPandas()
#
# df_save.to_sql(self.table_save, con=self.engine_doris, if_exists="append", index=False, chunksize=10000)
def truncate_data(self):
engine = get_remote_engine(
site_name='us', # -> database "selection"
db_type="doris", # -> 服务端 alias "mysql"
)
sql_truncate = f"truncate table {self.doris_table};"
print(f"清空最新导入之前的数据, sql_truncate: {sql_truncate}")
engine.execute(sql_truncate)
def run(self):
self.read_data()
self.truncate_data()
df = self.df_id_index
count = df.count()
print(f"读取完成,数据量:{count}")
df.show(10, truncate=False)
df_save.to_sql(self.table_save, con=self.engine_doris, if_exists="append", index=False, chunksize=10000)
TABLE_COLUMNS = "img_unique,site_name,index,id,img_type"
# ===== Step 2:写入 Doris selection.sys_edit_log =====
print(f"[2/2] 写入 Doris {self.doris_db}.{self.doris_table}")
DorisHelper.spark_export_with_columns(
df_save=df,
db_name=self.doris_db,
table_name=self.doris_table,
table_columns=TABLE_COLUMNS,
use_type='selection',
)
print("success")
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment