import pandas as pd from sqlalchemy import create_engine def export_data(pg14_con, starRocks_con): batch_size = 5000000 offset = 0 while True: sql = f"SELECT asin, img_url, img_order_by, created_at, updated_at, data_type FROM us_asin_image_copy OFFSET {offset} LIMIT {batch_size}" print(sql) df = pd.read_sql(sql, con=pg14_con) df.to_sql('us_asin_image_copy2', con=starRocks_con, if_exists='append', index=False) # 判断是否还有数据需要读取 if df.empty: break # 更新 offset,准备读取下一个批次的数据 offset += batch_size def get_pg14_con(): db_username = 'postgres' db_password = 'fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS' db_host = '192.168.10.223' db_port = '5432' db_name = 'selection' # 创建 PostgreSQL 数据库连接引擎 pg14_con = create_engine(f'postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}') return pg14_con def get_starRocks_con(): db_username = 'root' db_host = '192.168.10.221' db_port = '9030' db_name = 'selection' # 创建 PostgreSQL 数据库连接引擎 starRocks_con = create_engine(f'mysql+mysqlconnector://{db_username}@{db_host}:{db_port}/{db_name}') return starRocks_con if __name__ == '__main__': pg14_con = get_pg14_con() starRocks_con = get_starRocks_con() export_data(pg14_con, starRocks_con)