Closed ShydowLi closed 4 years ago
提供下详细信息,包括你的代码,查询的SQL,以及结果截图
`public class PageSinkToOdps extends RichSinkFunction<List
private Properties props;
private Connection conn;
private PreparedStatement ps;
public PageSinkToOdps(Properties props) {
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = OdpsUtils.getConnection(props);
String sql = "INSERT INTO ods_flink_jdbc_test VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
ps = this.conn.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(List<PageEvent> value, Context context) throws Exception {
try {
for (PageEvent pageEvent : value) {
ps.setString(1, pageEvent.getTp());
ps.setString(2, pageEvent.getUid());
ps.setString(3, pageEvent.getPco());
ps.setString(4, pageEvent.getNid());
ps.setString(5, pageEvent.getSid());
ps.setString(6, pageEvent.getE());
ps.setString(7, pageEvent.getBr());
ps.setString(8, pageEvent.getOs());
ps.setString(9, pageEvent.getDv());
ps.setString(10, pageEvent.getCh());
ps.setString(11, pageEvent.getVer());
ps.setString(12, pageEvent.getDw());
ps.setString(13, pageEvent.getDh());
ps.setString(14, pageEvent.getNt());
ps.setString(15, pageEvent.getSrc());
ps.setString(16, pageEvent.getIp());
ps.setString(17, pageEvent.getCid());
ps.setString(18, pageEvent.getPid());
ps.setString(19, pageEvent.getPu());
ps.setString(20, String.valueOf(pageEvent.getT()));
ps.setString(21, pageEvent.getTp());
ps.setString(22, String.valueOf(pageEvent.getTm()));
ps.setString(23, pageEvent.getFid());
ps.setString(24, toDate(pageEvent.getT()));
ps.addBatch();
}
int[] count = ps.executeBatch();
System.out.println("成功插入" + count.length + "行数据---------------");
} catch (Exception e) {
e.printStackTrace();
}
}
private static String toDate(Long time) {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHH");
if (time == null) {
return formatter.format(new Date());
}
return formatter.format(time);
}
}`
`public class OdpsUtils { private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";
public static Connection getConnection(Properties props) throws SQLException {
String url = props.getProperty("custom.dataSource.url");
String accessId = props.getProperty("accessId");
String accessKey = props.getProperty("accessKey");
Connection conn = null;
try {
Class.forName(DRIVER_NAME);
conn = DriverManager.getConnection(url, accessId, accessKey);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
return conn;
}`
以上是代码,通过flink往maxcomputer上写数据,查询是在dataworks上,在log日志输入成功插入,谢谢啊
这是部分运行结果截图,是不是使用这个上传数据也会有一个缓冲区,只有达到缓冲区的最大限度,才会把数据写入maxcomputer里?
@cornmonster
这是部分运行结果截图,是不是使用这个上传数据也会有一个缓冲区,只有达到缓冲区的最大限度,才会把数据写入maxcomputer里?
没有
我查了下code,现在的实现是prepared statement close的时候,数据才可见
另外如果不是一定要用jdbc,可以直接用maxcompute sdk,jdbc其实也是调用maxcompute sdk。
code: https://github.com/aliyun/aliyun-odps-java-sdk
example: https://github.com/aliyun/aliyun-odps-java-sdk/tree/master/odps-examples/tunnel-examples
六月 04, 2020 5:40:21 下午 com.aliyun.odps.jdbc.utils.OdpsLogger info 信息: It took me 113 ms to insert 200 records [54], 0.59 MiB/s 17:40:21,775 INFO com.aliyun.odps.jdbc.OdpsConnection - It took me 113 ms to insert 200 records [54], 0.59 MiB/s 成功插入200行数据---------------
你好,当我使用jdbc时插入数据,log显示写入成功,但去查询表时没有数据,请问这个是什么问题?