us_asin_image_copy2.py 1.41 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
import pandas as pd
from sqlalchemy import create_engine


def export_data(pg14_con, starRocks_con):
    batch_size = 5000000
    offset = 0
    while True:
        sql = f"SELECT asin, img_url, img_order_by, created_at, updated_at, data_type FROM us_asin_image_copy OFFSET {offset} LIMIT {batch_size}"
        print(sql)
        df = pd.read_sql(sql, con=pg14_con)
        df.to_sql('us_asin_image_copy2', con=starRocks_con, if_exists='append', index=False)
        # 判断是否还有数据需要读取
        if df.empty:
            break
        # 更新 offset,准备读取下一个批次的数据
        offset += batch_size

def get_pg14_con():
    db_username = 'postgres'
    db_password = 'fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS'
    db_host = '192.168.10.223'
    db_port = '5432'
    db_name = 'selection'

    # 创建 PostgreSQL 数据库连接引擎
    pg14_con = create_engine(f'postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}')
    return pg14_con

def get_starRocks_con():
    db_username = 'root'
    db_host = '192.168.10.221'
    db_port = '9030'
    db_name = 'selection'

    # 创建 PostgreSQL 数据库连接引擎
    starRocks_con = create_engine(f'mysql+mysqlconnector://{db_username}@{db_host}:{db_port}/{db_name}')
    return starRocks_con

if __name__ == '__main__':
    pg14_con = get_pg14_con()
    starRocks_con = get_starRocks_con()
    export_data(pg14_con, starRocks_con)