1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pandas as pd
from sqlalchemy import create_engine
def export_data(pg14_con, starRocks_con):
batch_size = 5000000
offset = 0
while True:
sql = f"SELECT asin, img_url, img_order_by, created_at, updated_at, data_type FROM us_asin_image_copy OFFSET {offset} LIMIT {batch_size}"
print(sql)
df = pd.read_sql(sql, con=pg14_con)
df.to_sql('us_asin_image_copy2', con=starRocks_con, if_exists='append', index=False)
# 判断是否还有数据需要读取
if df.empty:
break
# 更新 offset,准备读取下一个批次的数据
offset += batch_size
def get_pg14_con():
db_username = 'postgres'
db_password = 'fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS'
db_host = '192.168.10.223'
db_port = '5432'
db_name = 'selection'
# 创建 PostgreSQL 数据库连接引擎
pg14_con = create_engine(f'postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}')
return pg14_con
def get_starRocks_con():
db_username = 'root'
db_host = '192.168.10.221'
db_port = '9030'
db_name = 'selection'
# 创建 PostgreSQL 数据库连接引擎
starRocks_con = create_engine(f'mysql+mysqlconnector://{db_username}@{db_host}:{db_port}/{db_name}')
return starRocks_con
if __name__ == '__main__':
pg14_con = get_pg14_con()
starRocks_con = get_starRocks_con()
export_data(pg14_con, starRocks_con)