image_id_index_to_srs.py 2.53 KB
import ast
import datetime
import logging
import os
import re
import sys
import threading
import time
import traceback

import pandas as pd
import redis
from pyspark.sql.types import ArrayType, FloatType

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates_mysql import TemplatesMysql
from utils.templates import Templates
# from ..utils.templates import Templates
from py4j.java_gateway import java_import
from sqlalchemy import text
from pyspark.sql import functions as F
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Process
from multiprocessing import Pool
import multiprocessing
from utils.db_util import DbTypes, DBUtil
from utils.StarRocksHelper import StarRocksHelper


class ImageIdIndexToSrs(Templates):

    def __init__(self, site_name='us', asin_type=1):
        super(ImageIdIndexToSrs, self).__init__()
        self.site_name = site_name
        self.asin_type = asin_type
        self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name)
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
        self.df_id_index = self.spark.sql(f"select 1+1;")

    def read_data(self):
        sql = f"select id, indice, asin_type, asin, site_name from image_dwd_id_index where site_name='{self.site_name}' and asin_type = {self.asin_type};"
        print("sql:", sql)
        self.df_id_index = self.spark.sql(sql).cache()
        self.df_id_index.show(10)

    def handle_data(self):
        pass

    def save_data(self):
        # starrocks_url = "jdbc:mysql://192.168.10.151:19030/selection"
        # properties = {
        #     "user": "fangxingjun",
        #     "password": "fangxingjun12345",
        #     "driver": "com.mysql.cj.jdbc.Driver",
        #     # "driver": "com.mysql.cj.jdbc.Driver",
        # }
        # self.df_id_index.write.jdbc(url=starrocks_url, table="image_id_index", mode="overwrite", properties=properties)
        # self.df_id_index = self.df_id_index.withColumn('created_time', F.lit(datetime.datetime.now()))
        self.df_id_index = self.df_id_index.withColumn("asin_type", F.col("asin_type").cast("int"))
        # StarRocksHelper.spark_export(df_save=self.df_id_index, db_name='selection', table_name='image_id_index')
        df_save = self.df_id_index.toPandas()
        df_save.to_sql("image_id_index", con=self.engine_srs, if_exists="append", index=False, chunksize=10000)


if __name__ == '__main__':
    handle_obj = ImageIdIndexToSrs()
    handle_obj.run()