import ast import datetime import logging import os import re import sys import threading import time import traceback import pandas as pd import redis from pyspark.sql.types import ArrayType, FloatType os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates_mysql import TemplatesMysql from utils.templates import Templates # from ..utils.templates import Templates from py4j.java_gateway import java_import from sqlalchemy import text from pyspark.sql import functions as F import pyarrow as pa import pyarrow.parquet as pq from multiprocessing import Process from multiprocessing import Pool import multiprocessing from utils.db_util import DbTypes, DBUtil from utils.StarRocksHelper import StarRocksHelper class ImageIdIndexToSrs(Templates): def __init__(self, site_name='us', asin_type=1): super(ImageIdIndexToSrs, self).__init__() self.site_name = site_name self.asin_type = asin_type self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name) self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") self.df_id_index = self.spark.sql(f"select 1+1;") def read_data(self): sql = f"select id, indice, asin_type, asin, site_name from image_dwd_id_index where site_name='{self.site_name}' and asin_type = {self.asin_type};" print("sql:", sql) self.df_id_index = self.spark.sql(sql).cache() self.df_id_index.show(10) def handle_data(self): pass def save_data(self): # starrocks_url = "jdbc:mysql://192.168.10.151:19030/selection" # properties = { # "user": "fangxingjun", # "password": "fangxingjun12345", # "driver": "com.mysql.cj.jdbc.Driver", # # "driver": "com.mysql.cj.jdbc.Driver", # } # self.df_id_index.write.jdbc(url=starrocks_url, table="image_id_index", mode="overwrite", properties=properties) # self.df_id_index = self.df_id_index.withColumn('created_time', F.lit(datetime.datetime.now())) self.df_id_index = self.df_id_index.withColumn("asin_type", F.col("asin_type").cast("int")) # StarRocksHelper.spark_export(df_save=self.df_id_index, db_name='selection', table_name='image_id_index') df_save = self.df_id_index.toPandas() df_save.to_sql("image_id_index", con=self.engine_srs, if_exists="append", index=False, chunksize=10000) if __name__ == '__main__': handle_obj = ImageIdIndexToSrs() handle_obj.run()