postgresql 迁移数据到 MySQL

最近在搞 tidb,想找个方法往 MySQL 协议中快速写入数据,在 postgresql 下有 copy,找了下,MySQL 下也有 load data,其本质也是一个 insert,看到这里 https://blog.csdn.net/seven_33… 有一个思路是不走落地文件,挖坑待填

===========

2019-9-4 回来填坑了,具体的方法如下

public static InputStream getPostgresqlInputStream(final String ds) throws SQLException,
        ClassNotFoundException, IOException {
    final PipedOutputStream os = new PipedOutputStream();
    Class.forName("org.postgresql.Driver");
    (new Thread() {
        @Override
        public void run() {
            Connection connection = null;
            try {
                connection = DriverManager.getConnection(
                        "jdbc:postgresql://...",
                        "...", "...");
                CopyManager cm = new CopyManager(
                        (BaseConnection) connection);                
                String COPY_CMD = "COPY (select ... from ... where ds between "+ds+"01 and "+ds+"31 ) to STDOUT";
                @SuppressWarnings("unused")
                long affectedRowCount = cm.copyOut(COPY_CMD, os);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
            try {
                os.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }).start();
    PipedInputStream in = new PipedInputStream();
    in.connect(os);
    return in;
}

public static void threadTransfer(boolean fromTpgOrFromFile, String ds, String csvFile) throws SQLException, ClassNotFoundException, IOException {
    Class.forName("com.mysql.jdbc.Driver");
    String localConnectionUrl = "jdbc:mysql://...";
    
    Random randomizer = new Random();
    java.util.List<String[]> random_pool = new ArrayList<String[]>();
    random_pool.add(new String[]{"..."});
    String[] randomRow1 = random_pool.get(randomizer.nextInt(random_pool.size()));
    System.out.println(randomRow1[0]);
    
    String tidbConnectionUrl = "jdbc:mysql://" + randomRow1[0] + ":4000/...?useSSL=false&characterEncoding=UTF-8";
    Connection conn = DriverManager.getConnection(tidbConnectionUrl, "root", "");
    conn.prepareStatement("set @@tidb_batch_delete = ON;").execute();
    conn.prepareStatement("set @@tidb_batch_insert = ON;").execute();
    if (fromTpgOrFromFile) {
        String loadDataSql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE ...";
        PreparedStatement statement = (PreparedStatement) conn
                .prepareStatement(loadDataSql);
        statement.setLocalInfileInputStream(getPostgresqlInputStream(ds));
        int result = statement.executeUpdate();
        System.out.println(result);
    } else {
        String loadDataSql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE ... FIELDS ENCLOSED BY '\"' TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES ;";
        PreparedStatement statement = (PreparedStatement) conn
                .prepareStatement(loadDataSql);
        statement.setLocalInfileInputStream(new FileInputStream("/data/my_database/...." + csvFile + ".csv"));
        int result = statement.executeUpdate();
        System.out.println(result);
    }
    System.out.println(System.currentTimeMillis());
}

Leave a Reply

Your email address will not be published. Required fields are marked *