最近在搞 tidb,想找个方法往 MySQL 协议中快速写入数据,在 postgresql 下有 copy,找了下,MySQL 下也有 load data,其本质也是一个 insert,看到这里 https://blog.csdn.net/seven_33… 有一个思路是不走落地文件,挖坑待填
===========
2019-9-4 回来填坑了,具体的方法如下
public static InputStream getPostgresqlInputStream(final String ds) throws SQLException,
ClassNotFoundException, IOException {
final PipedOutputStream os = new PipedOutputStream();
Class.forName("org.postgresql.Driver");
(new Thread() {
@Override
public void run() {
Connection connection = null;
try {
connection = DriverManager.getConnection(
"jdbc:postgresql://...",
"...", "...");
CopyManager cm = new CopyManager(
(BaseConnection) connection);
String COPY_CMD = "COPY (select ... from ... where ds between "+ds+"01 and "+ds+"31 ) to STDOUT";
@SuppressWarnings("unused")
long affectedRowCount = cm.copyOut(COPY_CMD, os);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
PipedInputStream in = new PipedInputStream();
in.connect(os);
return in;
}
public static void threadTransfer(boolean fromTpgOrFromFile, String ds, String csvFile) throws SQLException, ClassNotFoundException, IOException {
Class.forName("com.mysql.jdbc.Driver");
String localConnectionUrl = "jdbc:mysql://...";
Random randomizer = new Random();
java.util.List<String[]> random_pool = new ArrayList<String[]>();
random_pool.add(new String[]{"..."});
String[] randomRow1 = random_pool.get(randomizer.nextInt(random_pool.size()));
System.out.println(randomRow1[0]);
String tidbConnectionUrl = "jdbc:mysql://" + randomRow1[0] + ":4000/...?useSSL=false&characterEncoding=UTF-8";
Connection conn = DriverManager.getConnection(tidbConnectionUrl, "root", "");
conn.prepareStatement("set @@tidb_batch_delete = ON;").execute();
conn.prepareStatement("set @@tidb_batch_insert = ON;").execute();
if (fromTpgOrFromFile) {
String loadDataSql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE ...";
PreparedStatement statement = (PreparedStatement) conn
.prepareStatement(loadDataSql);
statement.setLocalInfileInputStream(getPostgresqlInputStream(ds));
int result = statement.executeUpdate();
System.out.println(result);
} else {
String loadDataSql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE ... FIELDS ENCLOSED BY '\"' TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES ;";
PreparedStatement statement = (PreparedStatement) conn
.prepareStatement(loadDataSql);
statement.setLocalInfileInputStream(new FileInputStream("/data/my_database/...." + csvFile + ".csv"));
int result = statement.executeUpdate();
System.out.println(result);
}
System.out.println(System.currentTimeMillis());
}