Closed BigInventor closed 5 years ago
分布式应用,建议一个节点一个ip2region对象,便于db文件的升级方便,可以做成启动先http下载,然后在创建的形式,缓存一段时间,再定期下载覆盖db文件即可。
好的,我这边先尝试一下,成功之后给大家分享一下。
我在做flink任务时,使用了一个方案,将db文件打包到jar里面,写一个工具类静态加载jar包内部的db文件,巧妙的避开了分布式问题
我在做flink任务时,使用了一个方案,将db文件打包到jar里面,写一个工具类静态加载jar包内部的db文件,巧妙的避开了分布式问题
更新db文件咋办?每次重新发布flink的jar?
我这边的解决方案是,在spark初始化context的时候,也就是提交spark job任务的时候,读取db文件成一个二进制数组,然后用spark的广播机制广播出去,直接用就行。
val ip2RegionDbFile = "hdfs://master:9000/ip2region.db"
val regionArray = sc.binaryFiles(ip2RegionDbFile).first()._2.toArray()
val regionBroadcast = sc.broadcast(regionArray)
def updateDataIp2Region(data: JSONObject, ip2RegionArr: Array[Byte]): JSONObject = {
if (data.containsKey("ip")) {
val ip = data.get("ip").toString
val locationRegion = Ip2RegionUtil.transform(ip, ip2RegionArr)
}
data
}
public static LocationRegion transform(String ip, byte[] ip2Regionbytes) {
if (Util.isIpAddress(ip) == false) {
LOGGER.info("Error: Invalid ip address");
return null;
}
LocationRegion locationRegion = null;
try {
DbConfig config = new DbConfig();
DbSearcher searcher = new DbSearcher(config, ip2Regionbytes);
DataBlock dataBlock = searcher.memorySearch(ip);
if (dataBlock != null) {
// DataBlock: 城市Id|国家|区域|省份|城市|ISP
String[] regionArr = dataBlock.getRegion().split("\\|");
String state = regionArr[0];
String province = regionArr[2];
String city = regionArr[3];
locationRegion = new LocationRegion(state, province, city);
}
} catch (IOException e) {
e.printStackTrace();
} catch (DbMakerConfigException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
}
return locationRegion;
}
public class LocationRegion {
private String state; //洲、国家
private String province; //省
private String city; //市
}
ip2region.db文件通过http下载,然后定时更新也是可以的。
在用spark做数据清洗的时候,需要用到这个功能,大家有什么好的分布式的解决方案吗?