范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

我用Java几分钟处理完30亿个数据

  来源: https://c1n.cn/GM8hb
  目录  场景说明 模拟数据 场景分析 读取数据 处理数据 遇到的问题
  场景说明
  现有一个 10G 文件的数据,里面包含了什么 18-70 之间的整数,分别表示 18-70 岁的人群数量统计,假设年龄范围分布均匀,分别表示系统中所有用户的年龄数,找出重复次数最多的那个数,现有一台内存为 4G、2 核 CPU 的电脑,请写一个算法实现。
  23,31,42,19,60,30,36,........
  模拟数据
  Java 中一个整数占 4 个字节,模拟 10G 为 30 亿左右个数据, 采用追加模式写入 10G 数据到硬盘里。每 100 万个记录写一行,大概 4M 一行,10G 大概 2500 行数据。
  package   bigdata;
  import   java.io.*;
  import   java.util.Random;
  /**
  * @Desc:
  * @Author: bingbing
  * @Date: 2022/5/4 0004 19:05
  */
  public   class   GenerateData   {
  private   static   Random random = new   Random();
  public static int   generateRandomData  (  int   start, int   end) {
  return   random.nextInt(end - start + 1  ) + start;
  }
  /**
  * 产生10G的 1-1000的数据在D盘
  */
  public void   generateData  ()   throws IOException {
  File file = new   File("D: User.dat"  );
  if   (!file.exists()) {
  try   {
  file.createNewFile();
  } catch   (IOException e) {
  e.printStackTrace();
  }
  }
  int   start = 18  ;
  int   end = 70  ;
  long   startTime = System.currentTimeMillis();
  BufferedWriter bos = new   BufferedWriter(new   OutputStreamWriter(new   FileOutputStream(file, true  )));
  for   (long   i = 1  ; i < Integer.MAX_VALUE * 1.7  ; i++) {
  String data = generateRandomData(start, end) + ","  ;
  bos.write(data);
  // 每100万条记录成一行,100万条数据大概4M
  if   (i % 1000000   == 0  ) {
  bos.write(" "  );
  }
  }
  System.out.println("写入完成! 共花费时间:"   + (System.currentTimeMillis() - startTime) / 1000   + " s"  );
  bos.close();
  }
  public static void   main  (String[] args)   {
  GenerateData generateData = new   GenerateData();
  try   {
  generateData.generateData();
  } catch   (IOException e) {
  e.printStackTrace();
  }
  }
  }
  上述代码调整参数执行 2 次,凑 10 个 G 的数据在 D 盘的 User.dat 文件里。
  准备好 10G 数据后,接着写如何处理这些数据。
  场景分析
  10G 的数据比当前拥有的运行内存大得多,不能全量加载到内存中读取,如果采用全量加载,那么内存会直接爆掉,只能按行读取,Java 中的 bufferedReader 的 readLine() 按行读取文件里的内容。
  读取数据
  首先我们写一个方法单线程读完这 30E 数据需要多少时间,每读 100 新打印一次:
  private static void   readData  () throws IOException {
  BufferedReader br = new   BufferedReader(new   InputStreamReader(new   FileInputStream(FILE_NAME), "utf-8"  ));
  String line;
  long   start = System.currentTimeMillis();
  int   count = 1  ;
  while   ((line = br.readLine()) != null  ) {
  // 按行读取
  // SplitData.splitLine(line);
  if   (count % 100   == 0  ) {
  System.out  .println("读取100行,总耗时间: "   + (System.currentTimeMillis() - start) / 1000   + " s"  );
  System.gc();
  }
  count++;
  }
  running = false  ;
  br.close();
  }
  按行读完 10G 的数据大概 20 秒,基本每 100 行,1E 多数据化 1S,速度还挺快:
  处理数据
  | 思路一:通过单线程处理
  通过单线程处理,初始化一个 countMap,key 为年龄,value 为出现的次数,将每行读取到的数据按照 "," 进行分割,然后获取到的每一项进行保存到 countMap 里,如果存在,那么值 key 的 value+1。
  for   (int   i = start; i <= end  ; i++) {
  try {
  File subFile = new   File(dir + ""   + i + ".dat"  );
  if   (!file.exists()) {
  subFile.createNewFile();
  }
  countMap.computeIfAbsent(i + ""  , integer -> new   AtomicInteger(0  ));
  } catch (FileNotFoundException e) {
  e.printStackTrace();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  单线程读取并统计 countMap:
  public   static   void   splitLine(String   lineData) {
  String  [] arr = lineData.split(","  );
  for   (String   str : arr) {
  if   (StringUtils.isEmpty(str)) {
  continue  ;
  }
  countMap.computeIfAbsent(str, s -> new   AtomicInteger(0  )).getAndIncrement();
  }
  }
  通过比较找出年龄数最多的年龄并打印出来:
  private   static   void   findMostAge() {
  Integer targetValue = 0  ;
  String   targetKey = null  ;
  Iterator  <Map .Entry<String , AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
  while   (entrySetIterator.hasNext()) {
  Map  .Entry<String , AtomicInteger> entry = entrySetIterator.next();
  Integer value = entry.getValue().get  ();
  String   key = entry.getKey();
  if   (value > targetValue) {
  targetValue = value;
  targetKey = key;
  }
  }
  System.out.println("数量最多的年龄为:"   + targetKey + "数量为:"   + targetValue);
  }
  完整代码:
  package bigdata;
  import org.apache.commons.lang3.StringUtils;
  import java.io.*;
  import java.util.*;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.atomic.AtomicInteger;
  /**
  * @Desc:
  * @Author: bingbing
  * @Date: 2022/5/4 0004 19:19
  * 单线程处理
  */
  public   class   HandleMaxRepeatProblem_v0   {
  public   static   final   int start = 18  ;
  public   static   final   int end = 70  ;
  public   static   final   String dir = "D:dataDir"  ;
  public   static   final   String FILE_NAME = "D: User.dat"  ;
  /**
  * 统计数量
  */
  private   static   Map countMap = new   ConcurrentHashMap<>();
  /**
  * 开启消费的标志
  */
  private   static   volatile boolean startConsumer = false  ;
  /**
  * 消费者运行保证
  */
  private   static   volatile boolean consumerRunning = true  ;
  /**
  * 按照 "," 分割数据,并写入到countMap里
  */
  static   class   SplitData   {
  public   static   void splitLine(String lineData) {
  String[] arr = lineData.split(","  );
  for   (String str : arr) {
  if   (StringUtils.isEmpty(str)) {
  continue  ;
  }
  countMap.computeIfAbsent(str, s -> new AtomicInteger(0  )).getAndIncrement();
  }
  }
  }
  /**
  * init map
  */
  static   {
  File file = new   File(dir);
  if   (!file.exists()) {
  file.mkdir();
  }
  for   (int i = start; i <= end; i++) {
  try   {
  File subFile = new   File(dir + "" + i + "  .dat");
  if (!file.exists()) {
  subFile.createNewFile();
  }
  countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
  } catch (FileNotFoundException e) {
  e.printStackTrace();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  }
  public static void main(String[] args) {
  new Thread(() -> {
  try {
  readData();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }).start();
  }
  private static void readData() throws IOException {
  BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "  utf-8  "));
  String line;
  long start = System.currentTimeMillis();
  int count = 1;
  while ((line = br.readLine()) != null) {
  // 按行读取,并向map里写入数据
  SplitData.splitLine(line);
  if (count % 100 == 0) {
  System.out.println("  读取100  行,总耗时间: " + (System.currentTimeMillis() - start) / 1000 + "   s");
  try {
  Thread.sleep(1000L);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  count++;
  }
  findMostAge();
  br.close();
  }
  private static void findMostAge() {
  Integer targetValue = 0;
  String targetKey = null;
  Iterator> entrySetIterator = countMap.entrySet().iterator();
  while (entrySetIterator.hasNext()) {
  Map.Entry entry = entrySetIterator.next();
  Integer value = entry.getValue().get();
  String key = entry.getKey();
  if (value > targetValue) {
  targetValue = value;
  targetKey = key;
  }
  }
  System.out.println("  数量最多的年龄为:" + targetKey + "  数量为:" + targetValue);
  }
  private static void clearTask() {
  // 清理,同时找出出现的字符最大的数
  findMostAge();
  System.exit(-1);
  }
  }
  测试结果:总共花了 3 分钟读取完并统计完所有数据。
  内存消耗为 2G-2.5G,CPU 利用率太低,只向上浮动了 20%-25% 之间:
  要想提高 CPU 的利用率,那么可以使用多线程去处理。下面我们使用多线程去解决这个问题 CPU 利用率低的问题。
  | 思路二:分治法
  使用多线程去消费读取到的数据。采用生产者、消费者模式去消费数据,因为在读取的时候是比较快的,单线程的数据处理能力比较差,因此思路一的性能阻塞在取数据方,又是同步的,所以导致整个链路的性能会变的很差。
  所谓分治法就是分而治之,也就是说将海量数据分割处理。根据 CPU 的能力初始化 n 个线程,每一个线程去消费一个队列,这样线程在消费的时候不会出现抢占队列的问题。
  同时为了保证线程安全和生产者消费者模式的完整,采用阻塞队列,Java 中提供了 LinkedBlockingQueue 就是一个阻塞队列。
  ①初始化阻塞队列
  使用 linkedList 创建一个阻塞队列列表:
  private   static   List  > blockQueueLists = new   LinkedList<>();
  在 static 块里初始化阻塞队列的数量和单个阻塞队列的容量为 256,上面讲到了 30E 数据大概 2500 行,按行塞到队列里,20 个队列,那么每个队列 125 个,因此可以容量可以设计为 256 即可:
  //每个队列容量为256
  for   (int   i = 0  ; i < threadNums; i++) {
  blockQueueLists.add  (new   LinkedBlockingQueue<>(256  ));
  }
  ②生产者
  为了实现负载的功能, 首先定义一个 count 计数器,用来记录行数:
  private   static   AtomicLong count = new   AtomicLong(0  );
  按照行数来计算队列的下标:long index=count.get()%threadNums。
  下面算法就实现了对队列列表中的队列进行轮询的投放:
  static   class   SplitData   {
  public static void   splitLine  (String lineData  ) {
  // System.out.println(lineData.length());
  String[] arr = lineData.split(" "  );
  for   (String str : arr) {
  if   (StringUtils.isEmpty(str)) {
  continue  ;
  }
  long   index = count.get  () % threadNums;
  try   {
  // 如果满了就阻塞
  blockQueueLists.get  ((int  ) index).put(str);
  } catch   (InterruptedException e) {
  e.printStackTrace();
  }
  count.getAndIncrement();
  }
  }
  ③消费者
  队列线程私有化:消费方在启动线程的时候根据 index 去获取到指定的队列,这样就实现了队列的线程私有化。
  private static void   startConsumer  () throws FileNotFoundException, UnsupportedEncodingException {
  //如果共用一个队列,那么线程不宜过多,容易出现抢占现象
  System.out  .println("开始消费..."  );
  for   (int   i = 0  ; i < threadNums; i++) {
  final int   index = i;
  // 每一个线程负责一个queue,这样不会出现线程抢占队列的情况。
  new   Thread(() -> {
  while   (consumerRunning) {
  startConsumer = true  ;
  try   {
  String str = blockQueueLists.get  (index).take();
  countNum(str);
  } catch   (InterruptedException e) {
  e.printStackTrace();
  }
  }
  }).start();
  }
  }
  多子线程分割字符串:由于从队列中多到的字符串非常的庞大,如果又是用单线程调用 split(",") 去分割,那么性能同样会阻塞在这个地方。
  // 按照arr的大小,运用多线程分割字符串
  private static   void   countNum(String   str) {
  int  [] arr = new   int  [2  ];
  arr[1  ] = str.length() / 3  ;
  // System.out.println("分割的字符串为start位置为:" + arr[0] + ",end位置为:" + arr[1]);
  for   (int   i = 0  ; i < 3  ; i++) {
  final   String   innerStr = SplitData.splitStr(str, arr);
  // System.out.println("分割的字符串为start位置为:" + arr[0] + ",end位置为:" + arr[1]);
  new   Thread(() -> {
  String  [] strArray = innerStr.split(","  );
  for   (String   s : strArray) {
  countMap.computeIfAbsent(s, s1 -> new   AtomicInteger(0  )).getAndIncrement();
  }
  }).start();
  }
  }
  分割字符串算法:分割时从 0 开始,按照等分的原则,将字符串 n 等份,每一个线程分到一份。
  用一个 arr 数组的 arr[0] 记录每次的分割开始位置,arr[1] 记录每次分割的结束位置,如果遇到的开始的字符不为 ",",那么就 startIndex-1,如果结束的位置不为 ",",那么将 endIndex 向后移一位。
  如果 endIndex 超过了字符串的最大长度,那么就把最后一个字符赋值给 arr[1]。
  /**
  * 按照 x坐标 来分割 字符串,如果切到的字符不为",", 那么把坐标向前或者向后移动一位。
  *
  * @param line
  * @param arr 存放x1,x2坐标
  * @return
  */
  public static String   splitStr  (String line,   int  [] arr) {
  int   startIndex = arr[0  ];
  int   endIndex = arr[1  ];
  char   start = line.charAt(startIndex);
  char   end = line.charAt(endIndex);
  if   ((startIndex == 0   || start == ","  ) && end == ","  ) {
  arr[0  ] = endIndex + 1  ;
  arr[1  ] = arr[0  ] + line.length() / 3  ;
  if   (arr[1  ] >= line.length()) {
  arr[1  ] = line.length() - 1  ;
  }
  return   line.substring(startIndex, endIndex);
  }
  if   (startIndex != 0   && start != ","  ) {
  startIndex = startIndex - 1  ;
  }
  if   (end != ","  ) {
  endIndex = endIndex + 1  ;
  }
  arr[0  ] = startIndex;
  arr[1  ] = endIndex;
  if   (arr[1  ] >= line.length()) {
  arr[1  ] = line.length() - 1  ;
  }
  return   splitStr(line, arr);
  }
  完整代码:
  package bigdata;
  import cn.hutool.core.collection.CollectionUtil;
  import org.apache.commons.lang3.StringUtils;
  import java.io.*;
  import java.util.*;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.LinkedBlockingQueue;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
  import java.util.concurrent.locks.ReentrantLock;
  /**
  * @Desc:
  * @Author: bingbing
  * @Date: 2022/5/4 0004 19:19
  * 多线程处理
  */
  public   class   HandleMaxRepeatProblem   {
  public   static   final   int start = 18  ;
  public   static   final   int end = 70  ;
  public   static   final   String dir = "D:dataDir"  ;
  public   static   final   String FILE_NAME = "D: User.dat"  ;
  private   static   final   int threadNums = 20  ;
  /**
  * key 为年龄, value为所有的行列表,使用队列
  */
  private   static   Map valueMap = new   ConcurrentHashMap<>();
  /**
  * 存放数据的队列
  */
  private   static   List  > blockQueueLists = new   LinkedList<>();
  /**
  * 统计数量
  */
  private   static   Map countMap = new   ConcurrentHashMap<>();
  private   static   Map lockMap = new   ConcurrentHashMap<>();
  // 队列负载均衡
  private   static   AtomicLong count = new   AtomicLong(0  );
  /**
  * 开启消费的标志
  */
  private   static   volatile boolean startConsumer = false  ;
  /**
  * 消费者运行保证
  */
  private   static   volatile boolean consumerRunning = true  ;
  /**
  * 按照 "," 分割数据,并写入到文件里
  */
  static   class   SplitData   {
  public   static   void splitLine(String lineData) {
  // System.out.println(lineData.length());
  String[] arr = lineData.split(" "  );
  for   (String str : arr) {
  if   (StringUtils.isEmpty(str)) {
  continue  ;
  }
  long index = count.get() % threadNums;
  try   {
  // 如果满了就阻塞
  blockQueueLists.get((int) index).put(str);
  } catch   (InterruptedException e) {
  e.printStackTrace();
  }
  count.getAndIncrement();
  }
  }
  /**
  * 按照 x坐标 来分割 字符串,如果切到的字符不为",", 那么把坐标向前或者向后移动一位。
  *
  * @param line
  * @param arr 存放x1,x2坐标
  * @return
  */
  public   static   String splitStr(String line, int[] arr) {
  int startIndex = arr[0  ];
  int endIndex = arr[1  ];
  char start = line.charAt(startIndex);
  char end = line.charAt(endIndex);
  if   ((startIndex == 0   || start == ","  ) && end == ","  ) {
  arr[0  ] = endIndex + 1  ;
  arr[1  ] = arr[0  ] + line.length() / 3  ;
  if   (arr[1  ] >= line.length()) {
  arr[1  ] = line.length() - 1  ;
  }
  return   line.substring(startIndex, endIndex);
  }
  if   (startIndex != 0   && start != ","  ) {
  startIndex = startIndex - 1  ;
  }
  if   (end != ","  ) {
  endIndex = endIndex + 1  ;
  }
  arr[0  ] = startIndex;
  arr[1  ] = endIndex;
  if   (arr[1  ] >= line.length()) {
  arr[1  ] = line.length() - 1  ;
  }
  return   splitStr(line, arr);
  }
  public   static   void splitLine0(String lineData) {
  String[] arr = lineData.split(","  );
  for   (String str : arr) {
  if   (StringUtils.isEmpty(str)) {
  continue  ;
  }
  int keyIndex = Integer.parseInt(str);
  ReentrantLock lock = lockMap.computeIfAbsent(keyIndex, lockMap -> new ReentrantLock());
  lock.lock();
  try   {
  valueMap.get(keyIndex).add(str);
  } finally   {
  lock.unlock();
  }
  // boolean wait = true;
  // for (; ; ) {
  // if (!lockMap.get(Integer.parseInt(str)).isLocked()) {
  // wait = false;
  // valueMap.computeIfAbsent(Integer.parseInt(str), integer -> new Vector<>()).add(str);
  // }
  // // 当前阻塞,直到释放锁
  // if (!wait) {
  // break;
  // }
  // }
  }
  }
  }
  /**
  * init map
  */
  static   {
  File file = new   File(dir);
  if   (!file.exists()) {
  file.mkdir();
  }
  //每个队列容量为256
  for   (int i = 0  ; i < threadNums; i++) {
  blockQueueLists.add(new   LinkedBlockingQueue<>(256  ));
  }
  for   (int i = start; i <= end; i++) {
  try   {
  File subFile = new   File(dir + "" + i + "  .dat");
  if (!file.exists()) {
  subFile.createNewFile();
  }
  countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
  // lockMap.computeIfAbsent(i, lock -> new ReentrantLock());
  } catch (FileNotFoundException e) {
  e.printStackTrace();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  }
  public static void main(String[] args) {
  new Thread(() -> {
  try {
  // 读取数据
  readData();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }).start();
  new Thread(() -> {
  try {
  // 开始消费
  startConsumer();
  } catch (FileNotFoundException e) {
  e.printStackTrace();
  } catch (UnsupportedEncodingException e) {
  e.printStackTrace();
  }
  }).start();
  new Thread(() -> {
  // 监控
  monitor();
  }).start();
  }
  /**
  * 每隔60s去检查栈是否为空
  */
  private static void monitor() {
  AtomicInteger emptyNum = new AtomicInteger(0);
  while (consumerRunning) {
  try {
  Thread.sleep(10 * 1000);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  if (startConsumer) {
  // 如果所有栈的大小都为0,那么终止进程
  AtomicInteger emptyCount = new AtomicInteger(0);
  for (int i = 0; i < threadNums; i++) {
  if (blockQueueLists.get(i).size() == 0) {
  emptyCount.getAndIncrement();
  }
  }
  if (emptyCount.get() == threadNums) {
  emptyNum.getAndIncrement();
  // 如果连续检查指定次数都为空,那么就停止消费
  if (emptyNum.get() > 12) {
  consumerRunning = false;
  System.out.println("  消费结束...");
  try {
  clearTask();
  } catch (Exception e) {
  System.out.println(e.getCause());
  } finally {
  System.exit(-1);
  }
  }
  }
  }
  }
  }
  private static void readData() throws IOException {
  BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "  utf-8  "));
  String line;
  long start = System.currentTimeMillis();
  int count = 1;
  while ((line = br.readLine()) != null) {
  // 按行读取,并向队列写入数据
  SplitData.splitLine(line);
  if (count % 100 == 0) {
  System.out.println("  读取100  行,总耗时间: " + (System.currentTimeMillis() - start) / 1000 + "   s");
  try {
  Thread.sleep(1000L);
  System.gc();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  count++;
  }
  br.close();
  }
  private static void clearTask() {
  // 清理,同时找出出现字符最大的数
  Integer targetValue = 0;
  String targetKey = null;
  Iterator> entrySetIterator = countMap.entrySet().iterator();
  while (entrySetIterator.hasNext()) {
  Map.Entry entry = entrySetIterator.next();
  Integer value = entry.getValue().get();
  String key = entry.getKey();
  if (value > targetValue) {
  targetValue = value;
  targetKey = key;
  }
  }
  System.out.println("  数量最多的年龄为:" + targetKey + "  数量为:" + targetValue);
  System.exit(-1);
  }
  /**
  * 使用linkedBlockQueue
  *
  * @throws FileNotFoundException
  * @throws UnsupportedEncodingException
  */
  private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
  //如果共用一个队列,那么线程不宜过多,容易出现抢占现象
  System.out.println("  开始消费...");
  for (int i = 0; i < threadNums; i++) {
  final int index = i;
  // 每一个线程负责一个queue,这样不会出现线程抢占队列的情况。
  new Thread(() -> {
  while (consumerRunning) {
  startConsumer = true;
  try {
  String str = blockQueueLists.get(index).take();
  countNum(str);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  }).start();
  }
  }
  // 按照arr的大小,运用多线程分割字符串
  private static void countNum(String str) {
  int[] arr = new int[2];
  arr[1] = str.length() / 3;
  // System.out.println("  分割的字符串为start位置为:" + arr[0] + "  ,end位置为:" + arr[1]);
  for (int i = 0; i < 3; i++) {
  final String innerStr = SplitData.splitStr(str, arr);
  // System.out.println("  分割的字符串为start位置为:" + arr[0] + "  ,end位置为:" + arr[1]);
  new Thread(() -> {
  String[] strArray = innerStr.split("  ,");
  for (String s : strArray) {
  countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
  }
  }).start();
  }
  }
  /**
  * 后台线程去消费map里数据写入到各个文件里, 如果不消费,那么会将内存程爆
  */
  private static void startConsumer0() throws FileNotFoundException, UnsupportedEncodingException {
  for (int i = start; i <= end; i++) {
  final int index = i;
  BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dir + "  " + i + "  .dat", false), "  utf-8  "));
  new Thread(() -> {
  int miss = 0;
  int countIndex = 0;
  while (true) {
  // 每隔100万打印一次
  int count = countMap.get(index).get();
  if (count > 1000000 * countIndex) {
  System.out.println(index + "  岁年龄的个数为:" + countMap.get(index).get());
  countIndex += 1;
  }
  if (miss > 1000) {
  // 终止线程
  try {
  Thread.currentThread().interrupt();
  bw.close();
  } catch (IOException e) {
  }
  }
  if (Thread.currentThread().isInterrupted()) {
  break;
  }
  Vector lines = valueMap.computeIfAbsent(index, vector -> new Vector<>());
  // 写入到文件里
  try {
  if (CollectionUtil.isEmpty(lines)) {
  miss++;
  Thread.sleep(1000);
  } else {
  // 100个一批
  if (lines.size() < 1000) {
  Thread.sleep(1000);
  continue;
  }
  // 1000个的时候开始处理
  ReentrantLock lock = lockMap.computeIfAbsent(index, lockIndex -> new ReentrantLock());
  lock.lock();
  try {
  Iterator iterator = lines.iterator();
  StringBuilder sb = new StringBuilder();
  while (iterator.hasNext()) {
  sb.append(iterator.next());
  countMap.get(index).addAndGet(1);
  }
  try {
  bw.write(sb.toString());
  bw.flush();
  } catch (IOException e) {
  e.printStackTrace();
  }
  // 清除掉vector
  valueMap.put(index, new Vector<>());
  } finally {
  lock.unlock();
  }
  }
  } catch (InterruptedException e) {
  }
  }
  }).start();
  }
  }
  }
  测试结果:
  内存和 CPU 初始占用大小:
  启动后,运行时稳定在 11.7,CPU 稳定利用在 90% 以上。
  总耗时由 180S 缩减到 103S,效率提升 75%,得到的结果也与单线程处理的一致!
  遇到的问题
  如果在运行了的时候,发现 GC 突然罢工了,开始不工作了,有可能是 JVM 的堆中存在的垃圾太多,没回收导致内存的突增。
  解决方法:在读取一定数量后,可以让主线程暂停几秒,手动调用 GC。
  提示:本 demo 的线程创建都是手动创建的,实际开发中使用的是线程池!

黄晓明杨颖公然互怼,婚姻即将走到尽头,为何网友们说早就料到?黄晓明和杨颖一向是粉丝和娱乐圈羡慕的伴侣。无论走到哪,黄晓明都会记着杨颖的喜好每去一个地方,他总是记着要给杨颖买什么吃的。这样的宠溺,让杨颖成为了无数少女羡慕的对象。事实上一开始杨孙艺珍40岁生日晒零修图被嫌老,小S怒批网友很变态孙艺珍(左)因晒出零修图照被嫌老,小S(右)见此也忍不住开枪酸民。(图)南韩女星孙艺珍气质出众,自演出爱的迫降后事业攀向高峰,即使认爱同剧的玄彬人气丝毫不减。她日前在40岁生日时晒从安卓手机换成苹果手机是一种什么体验?网友再也不想换回去了大家都知道安卓和苹果是两大阵营,使用体验也是完全不一样的。那从安卓手机换成苹果手机是一种什么体验?1。终于不用担心内存小而闪退了2。微信接受消息从无延迟变成有延迟,总被媳妇说不回消真女神转生V保留传统的同时,又带给人惊喜的传统硬派RPG对不少Atlus作品的忠实玩家来说,最期待的新作永远不是衍生出的外传作品,而是道道地地的真女神转生正统续作。于双11发售的真女神转生V正是许多硬派玩家等待已久的新作,也因此在发售后王冰冰塌房了但我更爱她了王冰冰塌房了???看到王冰冰塌房这个词条挂在热搜时,小天和你们一样困惑。。仔细点进去一看,发现事情不简单事情大概是这样,有疯狂粉丝通过人肉王冰冰。将疑似王冰冰结婚的照片四级英语考试世嘉新Mini街机公布竖屏内置大量飞行射击游戏世嘉玩具公司近日公布了新的游戏机AstroCityMiniV,这是继2020年12月发布原版AstroCityMini之后,其街机经典系列的第二款产品。该产品将于2022年夏天上市斗鱼直播CEO陈少杰今年38岁大一时爱玩游戏导致辍学只有高中学历运营商财经网实习生林红艳文不日前,斗鱼宣布重大人事变动,董事兼联席首席执行官张文明因个人原因辞职,自此斗鱼两个CEO成为历史。而作为另一个CEO,陈少杰也再次引发人们关注,运营商也再见戴老板!山东泰山队最有血性的汉子?泰山队在经过了11年的坚持和努力后,终于迎来了自己的第五个联赛冠军和又一个双冠王。在举队狂欢的时刻,也有人会暗自落寞,他就是戴老板,戴琳,因为球队已没有他的位置,他的明天还不知道在视频无损放大,低画质修复!TopazVideoEnhanceAI最新版来了TopazEnhanceAI是一款非常好用的视频分辨率放大软件,用户可以通过这款软件,将视频的分辨率进行自定义调节,可对模糊的画质进行AI智能修复。现在TopazVideoEnha柳传志还能扬眉吐气吗?继央媒视频表态后,最新销量数据也出来了终于,柳传志可以扬眉吐气一把了。众所周知在12月份,司马南等人为民请命的联想事件,让联想的声誉受到了重挫。之后在12月底,联想集团申请科创板上市,证监会查证发现中金公司在保荐联想时回顾逐渐没落的三国群英传系列,你最喜欢的是哪一部?细细回想童年时期,感觉我们能接触到的三国题材游戏很多,我们也很乐于玩这些三国题材游戏像是横版过关的吞食天地,战棋类的三国志曹操传即时战略的三国霸业还有对以上游戏降维打击最有代入感的
全球第91个取消所有COVID19旅行限制的国家地区自8月24日起,英属维尔京群岛(BVI)取消了所有对国际旅行或国内活动任何与COVID19相关的限制。旅客可以通过TerranceB。Lettsome国际机场(EIS)海港码头或西什么时候可以来一次说走就走的旅行我最希望的旅行那就是坐着大游轮出到大海去。我本想着在2021年的时候来一次游轮旅行,连路程我都规划好了!没想到突然的疫情爆发,所有航线全部停航,所以就没去成。我原本打算坐的船是歌诗60岁的华仔,颜值依然就像30岁时闪闪发光的他曾经年少爱追梦一心只想往前飞行遍千山和万水一路走来不能回蓦然回首情已远身不由已在天边才明白爱恨情仇最伤最痛是后悔如果你不曾心碎你不会懂得我伤悲当我眼中有泪别问我是为谁就首批吃螃蟹电动车主电池迎退役潮将被重组再利用给外卖小哥续上!据最新统计,截至2022年6月底,全国新能源汽车达到1001万辆。有报告表明,我国第一批新能源汽车上的动力电池已经迎来了退役潮,这也是第一批吃螃蟹的车主面临的问题。那马新能源车报废宁交违约费1。1亿美元也要砍单!美系射频巨头Qorvo砍单联电2022年下半年消费电子产品库存调整已成定局,其中4G5G智能手机特别是Android阵营需求大打折扣,又以中低端机种受伤最深,这已经连带影响高通(Qualcomm)联发科等手机应血亏3亿港元!酷派财报上半年营业额1。405亿港元CNMO新闻中国手机市场中,很多人都听说过酷派这个品牌,酷派集团自1993年成立以来,曾走在手机圈的前沿,但如今却连年亏损,失去了大部分的市场份额。8月24日,酷派集团发布截至20管家婆云财贸ERP业财税供应链一体化高效协同管家婆云财贸管家婆云财贸ERP一体化解决方案打通企业供应链管理,实现产供销业财税一体化。通过数据可视化,管理销售和库存,提升企业效率和效益,为商贸企业量身打造。云财贸ERP一体化管隋东亮国安找回了赢球心态和气质以后点球还会让张玉宁主罚直播吧8月25日讯据记者袁野报道,8月26日,北京国安队将在中超联赛第15轮比赛主场对阵上海申花队。今天,国安临时主帅隋东亮出席了赛前新闻发布会。介绍备战这几天球队在身体和心情上都王者之路风云版怎么进行觉醒?王者之路风云版觉醒攻略介绍各位看客老爷大家好,这里是多多传奇手游站,今天给大家来介绍王者之路风云版怎么进行觉醒,王者之路风云版觉醒系统介绍。希望能给大家带来帮助!在我们王者之路风云版中,有一个特殊的系统,叫猛男杯夏季赛来袭,对局赛制迎来全面升级!豪华观赛福利曝光要说起目前国内手游圈当中知名度最高的一款游戏,大部分网友第一时间想到的肯定非和平精英莫属。作为圈内唯一一款正版大逃杀类型吃鸡手游,依靠其独特的玩法一经上线便受到无数网友们的追捧,在科学家发现超光速方法,将人类极限速度提升1000倍以上,可能吗?科学家发现超越光速方法,将人类极限速度提升1000倍以上,可能吗?近日,科学家们对物体的极限速度有了一个令人惊讶的发现。按照阿尔伯特爱因斯坦的狭义相对论,在宇宙中,没有一种物质可以别吹2亿像素1英寸了,救救潜望镜头吧现在手机想拍远处景物,潜望镜头就是必备,但2022年都过一大半了,用潜望镜头的手机,一只手就能数过来。丢两年前可不是这样的,一大批手机厂商排着队,都要用潜望镜头,就连小米10青春版Iphome14将于9月8日凌晨发布,四款机型,比Mate50晚一天发布Iphome14系列将于美国当地时间2022年9月7日(北京时间9月8日凌晨)发布。图片源于网络可能推出四款机型Iphome146。1英寸的刘海屏A15处理器(iphome13处理摩托罗拉连发三款新机,旧旗舰加速清仓,12GB512GB跌至2299元声明原创不易,禁止搬运,违者必究!要给别人留下深刻的印象,首先必须具有突出的特点。这在手机行业也是同理所当然的事情。除了苹果公司这样的自有高热度的移动品牌外,其他手机品牌的快速被消传苹果已开始研发iPhone15两个新变化据HTTech报道,苹果已开始研发iPhone15系列,新品将会在2024年下半年登场。根据传闻,iPhone15系列将有两个新的变化,一个是取消Lightning接口,转而使用U她是日乒女神,与马龙合照被赞郎才女貌,儿时与刘诗雯合照太可爱世界乒坛,总有那么几位颜值让人记忆犹新的选手,比如韩国女乒的徐孝元,以及日本女乒的石川佳纯,这两人都未退役,属于现役选手。其中,石川佳纯比较受中国球迷的喜爱,除了石川佳纯颜值好看,协议已签,中美达成审计共识,在美上市中国公司可能不用退市了阿里京东等中概股看来不用退市了,根据证监会的最新消息,中美于今日签署了审计监管合作协议。为何说中概股不用退市了呢?简单解释下这个事情。不知道大家是否还记得阿里被纳入美股预摘牌名单吗高端市场逆势增长vivo跃居第二X80系列成关键推手曾经,提到高端智能手机市场人们总是会将其与苹果华为三星品牌联系在一起,但随着硬件技术成熟迭代,行业整体向前发展如今国内高端市场格局已然发生变化,厚积薄发的vivo成功接过华为大旗,角逐消费市场,国产AR眼镜的关键一战作者山竹出品锌产业2021年12月14日,当刘畅戴着酷似龙珠里战斗力探测器的AR眼镜站在发布会现场演讲台上时,很多人都表示难以置信,这就是OPPO面向消费市场发布的首款AR眼镜。尽凶悍的黑贝是开心了,敢问湖人队置猛男威少于何地?经历了本赛季的惨痛失利,洛杉矶湖人队休赛期势必是要有所作为的。于是,相继签下了沃克布朗安德森布莱恩特等作为补充,但说实在的,要想进入季后赛,冲出西部,闯进总决赛,这些补强根本不值一女排吊打伊朗!全员开心庆祝,央媒发文祝贺,淘汰赛首轮打弱旅今天女排和伊朗的比赛,在开局因为戴口罩造成全队状态非常低迷,毕竟这对于自身状态是巨大的影响。幸好女排摘下口罩及时调整,最终连续拿下三局的情况下,轻轻松松以31淘汰伊朗,从而晋级到了iQOOz6确定在8月25日发布,使用80瓦快充,并且支持ois光学防抖iQOOz6确定在8月25日发布,主打续航?这次依然是主打续航,有三个配色,有80瓦的快充,并且是摄像头部分有ois光学防抖了,这个设计还是不错的。摄像头排列是有变化的,依然是后置