来源:https:c1n。cnGM8hb 目录场景说明模拟数据场景分析读取数据处理数据遇到的问题 场景说明 现有一个10G文件的数据,里面包含了什么1870之间的整数,分别表示1870岁的人群数量统计,假设年龄范围分布均匀,分别表示系统中所有用户的年龄数,找出重复次数最多的那个数,现有一台内存为4G、2核CPU的电脑,请写一个算法实现。 23,31,42,19,60,30,36,。。。。。。。。 模拟数据 Java中一个整数占4个字节,模拟10G为30亿左右个数据,采用追加模式写入10G数据到硬盘里。每100万个记录写一行,大概4M一行,10G大概2500行数据。 packagebigdata; importjava。io。; importjava。util。Random; Desc: Author:bingbing Date:202254000419:05 publicclassGenerateData{ privatestaticRandomrandomnewRandom(); publicstaticintgenerateRandomData(intstart,intend){ returnrandom。nextInt(endstart1)start; } 产生10G的11000的数据在D盘 publicvoidgenerateData()throwsIOException{ FilefilenewFile(D:User。dat); if(!file。exists()){ try{ file。createNewFile(); }catch(IOExceptione){ e。printStackTrace(); } } intstart18; intend70; longstartTimeSystem。currentTimeMillis(); BufferedWriterbosnewBufferedWriter(newOutputStreamWriter(newFileOutputStream(file,true))); for(longi1;iInteger。MAXVALUE1。7;i){ StringdatagenerateRandomData(start,end),; bos。write(data); 每100万条记录成一行,100万条数据大概4M if(i10000000){ bos。write(); } } System。out。println(写入完成!共花费时间:(System。currentTimeMillis()startTime)1000s); bos。close(); } publicstaticvoidmain(String〔〕args){ GenerateDatagenerateDatanewGenerateData(); try{ generateData。generateData(); }catch(IOExceptione){ e。printStackTrace(); } } } 上述代码调整参数执行2次,凑10个G的数据在D盘的User。dat文件里。 准备好10G数据后,接着写如何处理这些数据。 场景分析 10G的数据比当前拥有的运行内存大得多,不能全量加载到内存中读取,如果采用全量加载,那么内存会直接爆掉,只能按行读取,Java中的bufferedReader的readLine()按行读取文件里的内容。 读取数据 首先我们写一个方法单线程读完这30E数据需要多少时间,每读100新打印一次: privatestaticvoidreadData()throwsIOException{ BufferedReaderbrnewBufferedReader(newInputStreamReader(newFileInputStream(FILENAME),utf8)); Stringline; longstartSystem。currentTimeMillis(); intcount1; while((linebr。readLine())!null){ 按行读取 SplitData。splitLine(line); if(count1000){ System。out。println(读取100行,总耗时间:(System。currentTimeMillis()start)1000s); System。gc(); } count; } runningfalse; br。close(); } 按行读完10G的数据大概20秒,基本每100行,1E多数据化1S,速度还挺快: 处理数据 思路一:通过单线程处理 通过单线程处理,初始化一个countMap,key为年龄,value为出现的次数,将每行读取到的数据按照,进行分割,然后获取到的每一项进行保存到countMap里,如果存在,那么值key的value1。 for(intistart;iend;i){ try{ FilesubFilenewFile(diri。dat); if(!file。exists()){ subFile。createNewFile(); } countMap。computeIfAbsent(i,integernewAtomicInteger(0)); }catch(FileNotFoundExceptione){ e。printStackTrace(); }catch(IOExceptione){ e。printStackTrace(); } } 单线程读取并统计countMap: publicstaticvoidsplitLine(StringlineData){ String〔〕arrlineData。split(,); for(Stringstr:arr){ if(StringUtils。isEmpty(str)){ continue; } countMap。computeIfAbsent(str,snewAtomicInteger(0))。getAndIncrement(); } } 通过比较找出年龄数最多的年龄并打印出来: privatestaticvoidfindMostAge(){ IntegertargetValue0; StringtargetKeynull; Iteratorspanstylecolor:F82375;ttdarkmodecolor:FF3282;Mapspan。Entryspanstylecolor:F82375;ttdarkmodecolor:FF3282;Stringspan,AtomicIntegerentrySetIteratorcountMap。entrySet()。iterator(); while(entrySetIterator。hasNext()){ Map。Entryspanstylecolor:F82375;ttdarkmodecolor:FF3282;Stringspan,AtomicIntegerentryentrySetIterator。next(); Integervalueentry。getValue()。get(); Stringkeyentry。getKey(); if(valuetargetValue){ targetValuevalue; targetKeykey; } } System。out。println(数量最多的年龄为:targetKey数量为:targetValue); } 完整代码: packagebigdata; importorg。apache。commons。lang3。StringUtils; importjava。io。; importjava。util。; importjava。util。concurrent。ConcurrentHashMap; importjava。util。concurrent。atomic。AtomicInteger; Desc: Author:bingbing Date:202254000419:19 单线程处理 publicclassHandleMaxRepeatProblemv0{ publicstaticfinalintstart18; publicstaticfinalintend70; publicstaticfinalStringdirD:dataDir; publicstaticfinalStringFILENAMED:User。dat; 统计数量 privatestaticMapString,AtomicIntegercountMapnewConcurrentHashMap(); 开启消费的标志 privatestaticvolatilebooleanstartConsumerfalse; 消费者运行保证 privatestaticvolatilebooleanconsumerRunningtrue; 按照,分割数据,并写入到countMap里 staticclassSplitData{ publicstaticvoidsplitLine(StringlineData){ String〔〕arrlineData。split(,); for(Stringstr:arr){ if(StringUtils。isEmpty(str)){ continue; } countMap。computeIfAbsent(str,snewAtomicInteger(0))。getAndIncrement(); } } } initmap static{ FilefilenewFile(dir); if(!file。exists()){ file。mkdir(); } for(intistart;iend;i){ try{ FilesubFilenewFile(diri。dat); if(!file。exists()){ subFile。createNewFile(); } countMap。computeIfAbsent(i,integernewAtomicInteger(0)); }catch(FileNotFoundExceptione){ e。printStackTrace(); }catch(IOExceptione){ e。printStackTrace(); } } } publicstaticvoidmain(String〔〕args){ newThread((){ try{ readData(); }catch(IOExceptione){ e。printStackTrace(); } })。start(); } privatestaticvoidreadData()throwsIOException{ BufferedReaderbrnewBufferedReader(newInputStreamReader(newFileInputStream(FILENAME),utf8)); Stringline; longstartSystem。currentTimeMillis(); intcount1; while((linebr。readLine())!null){ 按行读取,并向map里写入数据 SplitData。splitLine(line); if(count1000){ System。out。println(读取100行,总耗时间:(System。currentTimeMillis()start)1000s); try{ Thread。sleep(1000L); }catch(InterruptedExceptione){ e。printStackTrace(); } } count; } findMostAge(); br。close(); } privatestaticvoidfindMostAge(){ IntegertargetValue0; StringtargetKeynull; IteratorMap。EntryString,AtomicIntegerentrySetIteratorcountMap。entrySet()。iterator(); while(entrySetIterator。hasNext()){ Map。EntryString,AtomicIntegerentryentrySetIterator。next(); Integervalueentry。getValue()。get(); Stringkeyentry。getKey(); if(valuetargetValue){ targetValuevalue; targetKeykey; } } System。out。println(数量最多的年龄为:targetKey数量为:targetValue); } privatestaticvoidclearTask(){ 清理,同时找出出现的字符最大的数 findMostAge(); System。exit(1); } } 测试结果:总共花了3分钟读取完并统计完所有数据。 内存消耗为2G2。5G,CPU利用率太低,只向上浮动了2025之间: 要想提高CPU的利用率,那么可以使用多线程去处理。下面我们使用多线程去解决这个问题CPU利用率低的问题。 思路二:分治法 使用多线程去消费读取到的数据。采用生产者、消费者模式去消费数据,因为在读取的时候是比较快的,单线程的数据处理能力比较差,因此思路一的性能阻塞在取数据方,又是同步的,所以导致整个链路的性能会变的很差。 所谓分治法就是分而治之,也就是说将海量数据分割处理。根据CPU的能力初始化n个线程,每一个线程去消费一个队列,这样线程在消费的时候不会出现抢占队列的问题。 同时为了保证线程安全和生产者消费者模式的完整,采用阻塞队列,Java中提供了LinkedBlockingQueue就是一个阻塞队列。 初始化阻塞队列 使用linkedList创建一个阻塞队列列表: privatestaticListLinkedBlockingQueueStringblockQueueListsnewLinkedList(); 在static块里初始化阻塞队列的数量和单个阻塞队列的容量为256,上面讲到了30E数据大概2500行,按行塞到队列里,20个队列,那么每个队列125个,因此可以容量可以设计为256即可: 每个队列容量为256 for(inti0;ithreadNums;i){ blockQueueLists。add(newLinkedBlockingQueue(256)); } 生产者 为了实现负载的功能,首先定义一个count计数器,用来记录行数: privatestaticAtomicLongcountnewAtomicLong(0); 按照行数来计算队列的下标:longindexcount。get()threadNums。 下面算法就实现了对队列列表中的队列进行轮询的投放: staticclassSplitData{ publicstaticvoidsplitLine(StringlineData){ System。out。println(lineData。length()); String〔〕arrlineData。split(); for(Stringstr:arr){ if(StringUtils。isEmpty(str)){ continue; } longindexcount。get()threadNums; try{ 如果满了就阻塞 blockQueueLists。get((int)index)。put(str); }catch(InterruptedExceptione){ e。printStackTrace(); } count。getAndIncrement(); } } 消费者 队列线程私有化:消费方在启动线程的时候根据index去获取到指定的队列,这样就实现了队列的线程私有化。 privatestaticvoidstartConsumer()throwsFileNotFoundException,UnsupportedEncodingException{ 如果共用一个队列,那么线程不宜过多,容易出现抢占现象 System。out。println(开始消费。。。); for(inti0;ithreadNums;i){ finalintindexi; 每一个线程负责一个queue,这样不会出现线程抢占队列的情况。 newThread((){ while(consumerRunning){ startConsumertrue; try{ StringstrblockQueueLists。get(index)。take(); countNum(str); }catch(InterruptedExceptione){ e。printStackTrace(); } } })。start(); } } 多子线程分割字符串:由于从队列中多到的字符串非常的庞大,如果又是用单线程调用split(,)去分割,那么性能同样会阻塞在这个地方。 按照arr的大小,运用多线程分割字符串 privatestaticvoidcountNum(Stringstr){ int〔〕arrnewint〔2〕; arr〔1〕str。length()3; System。out。println(分割的字符串为start位置为:arr〔0〕,end位置为:arr〔1〕); for(inti0;i3;i){ finalStringinnerStrSplitData。splitStr(str,arr); System。out。println(分割的字符串为start位置为:arr〔0〕,end位置为:arr〔1〕); newThread((){ String〔〕strArrayinnerStr。split(,); for(Strings:strArray){ countMap。computeIfAbsent(s,s1newAtomicInteger(0))。getAndIncrement(); } })。start(); } } 分割字符串算法:分割时从0开始,按照等分的原则,将字符串n等份,每一个线程分到一份。 用一个arr数组的arr〔0〕记录每次的分割开始位置,arr〔1〕记录每次分割的结束位置,如果遇到的开始的字符不为,,那么就startIndex1,如果结束的位置不为,,那么将endIndex向后移一位。 如果endIndex超过了字符串的最大长度,那么就把最后一个字符赋值给arr〔1〕。 按照x坐标来分割字符串,如果切到的字符不为,,那么把坐标向前或者向后移动一位。 paramline paramarr存放x1,x2坐标 return publicstaticStringsplitStr(Stringline,int〔〕arr){ intstartIndexarr〔0〕; intendIndexarr〔1〕; charstartline。charAt(startIndex); charendline。charAt(endIndex); if((startIndex0start,)end,){ arr〔0〕endIndex1; arr〔1〕arr〔0〕line。length()3; if(arr〔1〕line。length()){ arr〔1〕line。length()1; } returnline。substring(startIndex,endIndex); } if(startIndex!0start!,){ startIndexstartIndex1; } if(end!,){ endIndexendIndex1; } arr〔0〕startIndex; arr〔1〕endIndex; if(arr〔1〕line。length()){ arr〔1〕line。length()1; } returnsplitStr(line,arr); } 完整代码: packagebigdata; importcn。hutool。core。collection。CollectionUtil; importorg。apache。commons。lang3。StringUtils; importjava。io。; importjava。util。; importjava。util。concurrent。ConcurrentHashMap; importjava。util。concurrent。LinkedBlockingQueue; importjava。util。concurrent。atomic。AtomicInteger; importjava。util。concurrent。atomic。AtomicLong; importjava。util。concurrent。locks。ReentrantLock; Desc: Author:bingbing Date:202254000419:19 多线程处理 publicclassHandleMaxRepeatProblem{ publicstaticfinalintstart18; publicstaticfinalintend70; publicstaticfinalStringdirD:dataDir; publicstaticfinalStringFILENAMED:User。dat; privatestaticfinalintthreadNums20; key为年龄,value为所有的行列表,使用队列 privatestaticMapInteger,VectorvalueMapnewConcurrentHashMap(); 存放数据的队列 privatestaticListLinkedBlockingQueueStringblockQueueListsnewLinkedList(); 统计数量 privatestaticMapString,AtomicIntegercountMapnewConcurrentHashMap(); privatestaticMapInteger,ReentrantLocklockMapnewConcurrentHashMap(); 队列负载均衡 privatestaticAtomicLongcountnewAtomicLong(0); 开启消费的标志 privatestaticvolatilebooleanstartConsumerfalse; 消费者运行保证 privatestaticvolatilebooleanconsumerRunningtrue; 按照,分割数据,并写入到文件里 staticclassSplitData{ publicstaticvoidsplitLine(StringlineData){ System。out。println(lineData。length()); String〔〕arrlineData。split(); for(Stringstr:arr){ if(StringUtils。isEmpty(str)){ continue; } longindexcount。get()threadNums; try{ 如果满了就阻塞 blockQueueLists。get((int)index)。put(str); }catch(InterruptedExceptione){ e。printStackTrace(); } count。getAndIncrement(); } } 按照x坐标来分割字符串,如果切到的字符不为,,那么把坐标向前或者向后移动一位。 paramline paramarr存放x1,x2坐标 return publicstaticStringsplitStr(Stringline,int〔〕arr){ intstartIndexarr〔0〕; intendIndexarr〔1〕; charstartline。charAt(startIndex); charendline。charAt(endIndex); if((startIndex0start,)end,){ arr〔0〕endIndex1; arr〔1〕arr〔0〕line。length()3; if(arr〔1〕line。length()){ arr〔1〕line。length()1; } returnline。substring(startIndex,endIndex); } if(startIndex!0start!,){ startIndexstartIndex1; } if(end!,){ endIndexendIndex1; } arr〔0〕startIndex; arr〔1〕endIndex; if(arr〔1〕line。length()){ arr〔1〕line。length()1; } returnsplitStr(line,arr); } publicstaticvoidsplitLine0(StringlineData){ String〔〕arrlineData。split(,); for(Stringstr:arr){ if(StringUtils。isEmpty(str)){ continue; } intkeyIndexInteger。parseInt(str); ReentrantLocklocklockMap。computeIfAbsent(keyIndex,lockMapnewReentrantLock()); lock。lock(); try{ valueMap。get(keyIndex)。add(str); }finally{ lock。unlock(); } booleanwaittrue; for(;;){ if(!lockMap。get(Integer。parseInt(str))。isLocked()){ waitfalse; valueMap。computeIfAbsent(Integer。parseInt(str),integernewVector())。add(str); } 当前阻塞,直到释放锁 if(!wait){ break; } } } } } initmap static{ FilefilenewFile(dir); if(!file。exists()){ file。mkdir(); } 每个队列容量为256 for(inti0;ithreadNums;i){ blockQueueLists。add(newLinkedBlockingQueue(256)); } for(intistart;iend;i){ try{ FilesubFilenewFile(diri。dat); if(!file。exists()){ subFile。createNewFile(); } countMap。computeIfAbsent(i,integernewAtomicInteger(0)); lockMap。computeIfAbsent(i,locknewReentrantLock()); }catch(FileNotFoundExceptione){ e。printStackTrace(); }catch(IOExceptione){ e。printStackTrace(); } } } publicstaticvoidmain(String〔〕args){ newThread((){ try{ 读取数据 readData(); }catch(IOExceptione){ e。printStackTrace(); } })。start(); newThread((){ try{ 开始消费 startConsumer(); }catch(FileNotFoundExceptione){ e。printStackTrace(); }catch(UnsupportedEncodingExceptione){ e。printStackTrace(); } })。start(); newThread((){ 监控 monitor(); })。start(); } 每隔60s去检查栈是否为空 privatestaticvoidmonitor(){ AtomicIntegeremptyNumnewAtomicInteger(0); while(consumerRunning){ try{ Thread。sleep(101000); }catch(InterruptedExceptione){ e。printStackTrace(); } if(startConsumer){ 如果所有栈的大小都为0,那么终止进程 AtomicIntegeremptyCountnewAtomicInteger(0); for(inti0;ithreadNums;i){ if(blockQueueLists。get(i)。size()0){ emptyCount。getAndIncrement(); } } if(emptyCount。get()threadNums){ emptyNum。getAndIncrement(); 如果连续检查指定次数都为空,那么就停止消费 if(emptyNum。get()12){ consumerRunningfalse; System。out。println(消费结束。。。); try{ clearTask(); }catch(Exceptione){ System。out。println(e。getCause()); }finally{ System。exit(1); } } } } } } privatestaticvoidreadData()throwsIOException{ BufferedReaderbrnewBufferedReader(newInputStreamReader(newFileInputStream(FILENAME),utf8)); Stringline; longstartSystem。currentTimeMillis(); intcount1; while((linebr。readLine())!null){ 按行读取,并向队列写入数据 SplitData。splitLine(line); if(count1000){ System。out。println(读取100行,总耗时间:(System。currentTimeMillis()start)1000s); try{ Thread。sleep(1000L); System。gc(); }catch(InterruptedExceptione){ e。printStackTrace(); } } count; } br。close(); } privatestaticvoidclearTask(){ 清理,同时找出出现字符最大的数 IntegertargetValue0; StringtargetKeynull; IteratorMap。EntryString,AtomicIntegerentrySetIteratorcountMap。entrySet()。iterator(); while(entrySetIterator。hasNext()){ Map。EntryString,AtomicIntegerentryentrySetIterator。next(); Integervalueentry。getValue()。get(); Stringkeyentry。getKey(); if(valuetargetValue){ targetValuevalue; targetKeykey; } } System。out。println(数量最多的年龄为:targetKey数量为:targetValue); System。exit(1); } 使用linkedBlockQueue throwsFileNotFoundException throwsUnsupportedEncodingException privatestaticvoidstartConsumer()throwsFileNotFoundException,UnsupportedEncodingException{ 如果共用一个队列,那么线程不宜过多,容易出现抢占现象 System。out。println(开始消费。。。); for(inti0;ithreadNums;i){ finalintindexi; 每一个线程负责一个queue,这样不会出现线程抢占队列的情况。 newThread((){ while(consumerRunning){ startConsumertrue; try{ StringstrblockQueueLists。get(index)。take(); countNum(str); }catch(InterruptedExceptione){ e。printStackTrace(); } } })。start(); } } 按照arr的大小,运用多线程分割字符串 privatestaticvoidcountNum(Stringstr){ int〔〕arrnewint〔2〕; arr〔1〕str。length()3; System。out。println(分割的字符串为start位置为:arr〔0〕,end位置为:arr〔1〕); for(inti0;i3;i){ finalStringinnerStrSplitData。splitStr(str,arr); System。out。println(分割的字符串为start位置为:arr〔0〕,end位置为:arr〔1〕); newThread((){ String〔〕strArrayinnerStr。split(,); for(Strings:strArray){ countMap。computeIfAbsent(s,s1newAtomicInteger(0))。getAndIncrement(); } })。start(); } } 后台线程去消费map里数据写入到各个文件里,如果不消费,那么会将内存程爆 privatestaticvoidstartConsumer0()throwsFileNotFoundException,UnsupportedEncodingException{ for(intistart;iend;i){ finalintindexi; BufferedWriterbwnewBufferedWriter(newOutputStreamWriter(newFileOutputStream(diri。dat,false),utf8)); newThread((){ intmiss0; intcountIndex0; while(true){ 每隔100万打印一次 intcountcountMap。get(index)。get(); if(count1000000countIndex){ System。out。println(index岁年龄的个数为:countMap。get(index)。get()); countIndex1; } if(miss1000){ 终止线程 try{ Thread。currentThread()。interrupt(); bw。close(); }catch(IOExceptione){ } } if(Thread。currentThread()。isInterrupted()){ break; } VectorlinesvalueMap。computeIfAbsent(index,vectornewVector()); 写入到文件里 try{ if(CollectionUtil。isEmpty(lines)){ miss; Thread。sleep(1000); }else{ 100个一批 if(lines。size()1000){ Thread。sleep(1000); continue; } 1000个的时候开始处理 ReentrantLocklocklockMap。computeIfAbsent(index,lockIndexnewReentrantLock()); lock。lock(); try{ Iteratoriteratorlines。iterator(); StringBuildersbnewStringBuilder(); while(iterator。hasNext()){ sb。append(iterator。next()); countMap。get(index)。addAndGet(1); } try{ bw。write(sb。toString()); bw。flush(); }catch(IOExceptione){ e。printStackTrace(); } 清除掉vector valueMap。put(index,newVector()); }finally{ lock。unlock(); } } }catch(InterruptedExceptione){ } } })。start(); } } } 测试结果: 内存和CPU初始占用大小: 启动后,运行时稳定在11。7,CPU稳定利用在90以上。 总耗时由180S缩减到103S,效率提升75,得到的结果也与单线程处理的一致! 遇到的问题 如果在运行了的时候,发现GC突然罢工了,开始不工作了,有可能是JVM的堆中存在的垃圾太多,没回收导致内存的突增。 解决方法:在读取一定数量后,可以让主线程暂停几秒,手动调用GC。 提示:本demo的线程创建都是手动创建的,实际开发中使用的是线程池!