大数据Kafka进行Avro序列化(高级版)
一、优化点对序列化共同进行封装成工具类,解决代码冗余性,提高代码复用性。schema文件和topic等信息书写在配置文件中,下次使用,直接修改配置文件信息即可,就能实现代码共用性。二、pom文件<?xml version="1.0" encoding="UTF-8"?> BigData2021 org.example 1.0-SNAPSHOT 4.0.0 Avro 8 8 5.6.3 org.checkerframework checker-qual 2.5.2 compile ch.qos.logback logback-classic 1.2.3 junit junit 4.13 test com.jcraft jsch 0.1.52 org.bouncycastle bcprov-jdk15to18 1.68 com.typesafe config 1.3.3 cn.hutool hutool-log ${hutool.version} cn.hutool hutool-json ${hutool.version} cn.hutool hutool-db ${hutool.version} cn.hutool hutool-poi ${hutool.version} cn.hutool hutool-crypto ${hutool.version} cn.hutool hutool-cache ${hutool.version} cn.hutool hutool-cron ${hutool.version} org.apache.avro avro 1.8.2 com.facebook.presto presto-jdbc 0.254.1 org.apache.poi poi-ooxml 4.1.2 net.lingala.zip4j zip4j 2.8.0 org.apache.commons commons-compress 1.20 io.confluent kafka-avro-serializer 5.3.0 org.apache.kafka kafka-clients 1.0.2 org.apache.httpcomponents httpclient 4.5.13 confluent http://packages.confluent.io/maven/ org.apache.maven.plugins maven-assembly-plugin 2.6 com.iie.quality.SaveToDM jar-with-dependencies make-assembly package single 二、配置文件
LocalToKafka.conf#kafka地址 kafka_ip:"bigdata:9092" #本地路径 local_path="E:test" #topic的名称 topic_name="h_test" #分隔符 split_flag="^" #分隔符数量 split_num=4 #新文件名称拼接符 new_file_flag="_" #平均文件大小(5M=5*1024*1024) eachSize=5242880 #scheme文件路径 scheme_name="D:BigData2021Avrosrcmainresourcesscheme.json" #输出路径 out_path="E:out_test"三、scheme文件{ "type": "record", "name": "h_test", "fields": [ { "name": "field1", "type": [ "string", "null" ] }, { "name": "field2", "type": [ "string", "null" ] }, { "name": "field3", "type": [ "string", "null" ] }, { "name": "field4", "type": [ "string", "null" ] } ] }四、文件工具类package utils; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import java.io.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @program: bigData_learn * @description: 文件工具类 * @author: Mr.逗 * @create: 2021-11-15 11:44 **/ public class FileUtil { private static final Log log = LogFactory.get(); //1、删除指定目录下文件指定关键字的文件 public static void deleteNameFile(File f,String keyWord) { //数组指向文件夹中的文件和文件夹 File[] fi=f.listFiles(); //遍历文件和文件夹 for(File file:fi) { //如果是文件夹,递归查找 if(file.isDirectory()) deleteNameFile(file,keyWord); else if(file.isFile()) { //如果是"csv.error"后缀文件,则进行删除 if(file.getName().contains(keyWord)) { file.delete(); log.info("成功删除::"+file.getName()); } } } } //删除单个文件 public static void deleteFile(File file) { // 如果文件路径所对应的文件存在,并且是一个文件,则直接删除 if(file.exists()&&file.isFile()) { if(file.delete()) { log.info(file.getName()+"删除成功!"); }else { log.warn(file.getName()+"删除失败!"); } }else log.warn(file.getName()+"删除成功!"); } //2、把数据写到本地 public static void writeLocalFile(String source, String out_path){ FileOutputStream fileOutputStream = null; try { File tmpFile = new File(out_path); if (!tmpFile.exists()){ tmpFile.createNewFile(); } fileOutputStream = new FileOutputStream(tmpFile, true); fileOutputStream.write(source.getBytes("UTF-8")); //fileOutputStream.write(" ".getBytes("UTF-8")); // System.out.println(source); } catch (IOException e) { e.printStackTrace(); }finally { if (fileOutputStream != null) { try { fileOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } //3、按行写入到文本 public static void writeFileByLine(String source, String out_path){ try { FileWriter fw = new FileWriter(out_path,true); BufferedWriter bw = new BufferedWriter(fw); bw.write(source); bw.newLine(); bw.flush(); bw.close(); fw.close(); } catch (IOException e) { e.printStackTrace(); } } //4、读取单个本地文件 public static String readSingleFile(String path) { File file = new File(path); StringBuilder result = new StringBuilder(); try{ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));//构造一个BufferedReader类来读取文件 String s = null; while((s = br.readLine())!=null){//使用readLine方法,一次读一行 result.append( System.lineSeparator() + s); } br.close(); }catch(Exception e){ e.printStackTrace(); } return result.toString(); } //5、遍历本指定目录下所有文件 public static List readAllFile(File file) { List files = new ArrayList(); File[] fs = file.listFiles(); for(File f:fs) { if (!f.isDirectory()) { files.add(f.getName()); } } return files; } //6、指定文件大小进行大文件的拆分 public static void splitFile(File srcFile,int eachSize,String new_file_flag){ //判断文件是否符合拆分要求 if(srcFile.length()==0){ throw new RuntimeException("文件不符合拆分要求"); } byte[] fileContent= new byte[(int) srcFile.length()]; try { //将文件内容读取到内存中 FileInputStream fis=new FileInputStream(srcFile); fis.read(fileContent); fis.close(); } catch (Exception e) { e.printStackTrace(); } //计算要次要拆分为多少份 int fileNumber; if(fileContent.length%eachSize==0){ fileNumber = fileContent.length/eachSize; } else{ fileNumber = fileContent.length/eachSize+1; } for (int i=0;i arrayList = new ArrayList<>(); try { File file = new File(name); RandomAccessFile fileR = new RandomAccessFile(file,"r"); // 按行读取字符串 String str = null; while ((str = fileR.readLine())!= null) { arrayList.add(str); } fileR.close(); } catch (IOException e) { e.printStackTrace(); } // 对ArrayList中存储的字符串进行处理 int length = arrayList.size(); String[] array = new String[length]; for (int i = 0; i < length; i++) { array[i] = arrayList.get(i); } // 返回数组 return array; } public static void main(String[] args) { //指定5M大小进行拆分 int eachSize=5*1024*1024; File srcFile =new File("E:/test/"); File[] files = srcFile.listFiles(); for (File f:files) { splitFile(f,eachSize,"_"); } List file = readAllFile(srcFile); Iterator it = file.iterator(); while (it.hasNext()) { System.out.println(it.next()); } } } 五、Avro工具类package utils; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import java.util.ArrayList; import java.util.List; import static utils.FileUtil.toArrayByRandomAccessFile; public class AvroUtil { //1、通过scheme字段映射内容字段 public static Object buildValueByField(Field field, String s) { if(s == null) { return null; } List types = field.schema().getTypes(); for (Schema schema : types) { Type type = schema.getType(); switch (type) { case STRING: return s; case BYTES: return s.getBytes(); case INT: return Integer.getInteger(s); case LONG: return Long.getLong(s); case FLOAT: Float f = null; try { f = Float.valueOf(s); }catch (Exception e) { } return f; case DOUBLE: Double d = null; try { d = Double.valueOf(s); }catch (Exception e) { } return d; case BOOLEAN: return Boolean.getBoolean(s); //case NULL: return new NullSchema(); default: throw new AvroRuntimeException("Can"t create a: "+type); } } return null; } //2、通过scheme和数据目录下获得record(列分隔符和标准列数) public static List buildRecord(Schema schema, String in_path,String split_flag,int split_num) { List results = new ArrayList(); String[] fileds = toArrayByRandomAccessFile(in_path); for(String value:fileds) { String[] strs = value.split(split_flag); if(strs.length==split_num) { GenericData.Record record = new GenericData.Record(schema); List fields = schema.getFields(); int index=0; for(String s:strs) { Field field = fields.get(index); record.put(field.name(),buildValueByField(field,s)); index++; } results.add(record); } } return results; } }六、生产信息package avro.kafka; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.List; import java.util.Properties; import static utils.AvroUtil.buildRecord; import static utils.FileUtil.*; public class KafkaByWrite { private static final Log log = LogFactory.get(); private static String configDir = System.getProperty("config"); public static void main(String[] args) throws IOException { String kafka_ip = null; String local_path = null; String topic_name = null; String scheme_name=null; String split_flag=null; int split_num=0; int eachSize=0; String new_file_flag=null; //String local_file=null; if(StrUtil.isNotBlank(configDir)) { File configFile = new File(configDir); Config config = ConfigFactory.parseFile(configFile); kafka_ip = config.getString("kafka_ip"); local_path=config.getString("local_path"); topic_name=config.getString("topic_name"); scheme_name=config.getString("scheme_name"); split_flag=config.getString("split_flag"); split_num=config.getInt("split_num"); eachSize=config.getInt("eachSize"); new_file_flag=config.getString("new_file_flag"); //local_file=config.getString("local_file"); }else { log.error("config为空,程序退出"); } Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafka_ip); //leader 需要等待所有备份都成功写入日志 //properties.put("acks", "all"); //重试次数 properties.put("retries", 10); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); //kafka授权 properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="yahong_test" password="yahong_test";"); // properties.setProperty("security.protocol", "SASL_PLAINTEXT"); // properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); // properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="alitest" password="alitest@2021";"); KafkaProducer kafkaProducer = new KafkaProducer<>(properties); String scheme=readSingleFile(scheme_name); Schema schema = new Schema.Parser().parse(scheme); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); GenericDatumWriter genericDataGenericDatumWriter = new GenericDatumWriter<>(schema); File srcFile =new File(local_path); File[] files = srcFile.listFiles(); for (File f:files) { splitFile(f,eachSize,new_file_flag); } List allFile = readAllFile(srcFile); for(String fileName:allFile) { List recordList = buildRecord(schema,local_path+"/"+fileName,split_flag,split_num); for(GenericData.Record record:recordList) { genericDataGenericDatumWriter.write(record,binaryEncoder); byteArrayOutputStream.flush(); binaryEncoder.flush(); byte[] values = byteArrayOutputStream.toByteArray(); ProducerRecord producerRecord = new ProducerRecord<>(topic_name, fileName, values); kafkaProducer.send(producerRecord); } //消费完删除文件 deleteFile(new File(local_path+"/"+fileName)); } kafkaProducer.close(); } } 七、消费信息package avro.kafka; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import utils.FileUtil; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Properties; import static utils.FileUtil.readSingleFile; public class KafkaByRead { private static final Log log = LogFactory.get(); private static String configDir = System.getProperty("config"); public static void main(String[] args) throws IOException { String kafka_ip = null; String topic_name = null; String scheme_name=null; String out_path=null; if(StrUtil.isNotBlank(configDir)) { File configFile = new File(configDir); Config config = ConfigFactory.parseFile(configFile); kafka_ip = config.getString("kafka_ip"); topic_name=config.getString("topic_name"); scheme_name=config.getString("scheme_name"); out_path=config.getString("out_path"); }else { log.error("config为空,程序退出"); } Properties properties=new Properties(); //主机信息 properties.put("bootstrap.servers",kafka_ip); //群组id properties.put("group.id", topic_name); /** *消费者是否自动提交偏移量,默认是true * 为了经量避免重复数据和数据丢失,可以把它设为true, * 由自己控制核实提交偏移量。 * 如果设置为true,可以通过auto.commit.interval.ms属性来设置提交频率 */ properties.put("enable.auto.commit", "true"); /** * 自动提交偏移量的提交频率 */ properties.put("auto.commit.interval.ms", "1000"); /** * 默认值latest. * latest:在偏移量无效的情况下,消费者将从最新的记录开始读取数据 * erliest:偏移量无效的情况下,消费者将从起始位置读取分区的记录。 */ properties.put("auto.offset.reset", "earliest"); /** * 消费者在指定的时间内没有发送心跳给群组协调器,就被认为已经死亡, * 协调器就会触发再均衡,把它的分区分配给其他消费者。 */ properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); //kafka授权 properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="yahong_test" password="yahong_test";"); // properties.setProperty("security.protocol", "SASL_PLAINTEXT"); // properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); // properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="alitest" password="alitest@2021";"); KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); /** * 订阅主题,这个地方只传了一个主题:gys. * 这个地方也可以有正则表达式。 */ kafkaConsumer.subscribe(Arrays.asList(topic_name)); String scheme=readSingleFile(scheme_name); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(scheme); kafkaConsumer.subscribe(Arrays.asList(topic_name)); //无限循环轮询 while (true) { /** * 消费者必须持续对Kafka进行轮询,否则会被认为已经死亡,他的分区会被移交给群组里的其他消费者。 * poll返回一个记录列表,每个记录包含了记录所属主题的信息, * 记录所在分区的信息,记录在分区里的偏移量,以及键值对。 * poll需要一个指定的超时参数,指定了方法在多久后可以返回。 * 发送心跳的频率,告诉群组协调器自己还活着。 */ ConsumerRecords records = kafkaConsumer.poll(100); for (ConsumerRecord record : records) { //Thread.sleep(1000); byte[] arr_value = record.value(); //反序列化 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr_value); GenericDatumReader recordGenericDatumReader = new GenericDatumReader<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null); try { while (!binaryDecoder.isEnd()) { String key = record.key(); GenericData.Record value = recordGenericDatumReader.read(null, binaryDecoder); String out_file=out_path+"/"+key.split("_")[0]; FileUtil.writeFileByLine(value.toString(),out_file); System.out.println(key+"------"+value); } }catch (Exception e) { log.error("反序列化错误!",e); } //System.out.printf("offset = %d, value = %s", record.offset(), record.value()); //System.out.println(); } } } }
只有看懂基金赚钱的逻辑,才能掌握基金投资前言最近新关注的粉丝数量比较多,问的问题也比较简单,很多人还没有正确理解基金到底该怎么操作。恰巧这段时间我也对个人基金投资一些困惑的地方进行了深度思考,本文就从底层逻辑出发,将我近
A股恐慌性下跌,千万别急着抄底今日A股恐慌性大跌,私信爆了。有跟着我清仓表示感谢的更多的是问我能不能加仓抄底的。今天我就把A股的盘面情况和下跌逻辑给大家复盘一下,再讲讲我对后市的看法。一今日行情今天造成情绪崩盘
阿迪达斯太大意,竟被这款小众跑鞋超越?Nike心慌,小米坐等收钱说起跑鞋,大家想到的肯定是adidasNike安踏等大牌。确实,在质量做工等方面,它们有着许多小众品牌难以比拟的优势,用户忠诚度很高,品牌名声也非常响。然而,这些小众品牌不包括小米
秦pro低首付秦pro低首付惠州惠迪比亚迪4S店4008681785110万级纯电精品焕新升级。2颜值先行时尚外观。3DiLink2。0智能网联系统。3舒适空间安全随行。43H高强度车身结构惠州
众口难调?不存在的!众口难调?不存在的!审核状态已通过类别企业新闻发布人100137055发布时间202110060917惠州惠迪比亚迪4S店低首付,首付5800元起,免居住证上牌,24期0息,至高6
比亚迪F5谍照曝光自比亚迪的DMi超级混动问世后,搭载这项技术的车型销量普遍走高,甚至目前已经到了一车难求的境地。在一个月之前,比亚迪对全新车型F5完成了申报工作,这台车将搭载与秦PLUSDMi相同
比亚迪F5谍照曝光自比亚迪的DMi超级混动问世后,搭载这项技术的车型销量普遍走高,甚至目前已经到了一车难求的境地。在一个月之前,比亚迪对全新车型F5完成了申报工作,这台车将搭载与秦PLUSDMi相同
续航301km补贴后售8。98万元续航301km补贴后售8。98万元比亚迪e2标准型上市今天是国庆节假期,据了解,比亚迪e2新增一款标准型车型,补贴后售价为8。98万元。比亚迪e2标准型配备了比亚迪引以为傲的磷酸铁
我今天看到的10张好照片(154)AlistairTaylorYoung这是这一系列的第154次推送。逐相君每次会推送10张我今天看到的好照片。在你参观照片之前,我不愿意给照片加任何文字阐释。这可能会是一种干扰。废
我今天看到的10张好照片(157)AshleyMarkle这是这一系列的第157次推送。逐相君每次会推送10张我今天看到的好照片。在你参观照片之前,我不愿意给照片加任何文字阐释。这可能会是一种干扰。废话不说,一起逐
又一厂商的多屏协同来了!绿厂上线跨界互联,ColorOS12有望搭载当手机和我们生活的方方面面都建立联系后,手机与其他设备进行互联互通的需求也被不断扩大。针对这一问题,OPPO近日上线了跨界互联官网,并开放了Windows客户端下载,这倒是给了不少