发布网友
共1个回答
热心网友
public class File2ES {
private static Logger LOG = LoggerFactory.getLogger(File2ES.class);
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
@SuppressWarnings("unchecked")
/**
*
* @param index
* @param type
* @param clusterName
* @param IP
* @param inputPath
*/
public static void readFile2elasticsearch(String index, String type, String clusterName, String IP,
String inputPath) {
try {
Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(IP), 9030));
File article = new File(inputPath);
FileReader fr = new FileReader(article);
BufferedReader bfr = new BufferedReader(fr);
String line = null;
BulkRequestBuilder bulkRequest = client.prepareBulk();
int count = 0;
String id = null;
while ((line = bfr.readLine()) != null) {
System.out.println(line);
Map<String, Object> record = gson.fromJson(line.toString(), Map.class);
if (record == null || record.size() < 1) {
continue;
} else {
id = record.containsKey("_id") ? (String) record.remove("_id")
: IDGenerator.generateByMapValues(record);
bulkRequest.add(client.prepareIndex(index, type, id).setSource(record));
if (count % 1 == 0) {
bulkRequest.execute().actionGet();
}
count++;
}
}
//System.out.println("count:"+count);
bulkRequest.execute().actionGet();
bfr.close();
fr.close();
} catch (UnknownHostException e) {
LOG.error(e.getMessage(), e);
} catch (FileNotFoundException e) {
LOG.error(e.getMessage(), e);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}