InfluxData是一个开源的时间序列数据库平台。下面介绍了它是如何被用于边缘应用案例的。 收集到的随时间变化的数据称为时间序列数据。今天,它已经成为每个行业和生态系统的一部分。它是不断增长的物联网行业的一大组成部分,将成为人们日常生活的重要部分。但时间序列数据及其需求很难处理。这是因为没有专门为处理时间序列数据而构建的工具。在这篇文章中,我将详细介绍这些问题,以及过去10年来InfluxData如何解决这些问题。 InfluxData InfluxData是一个开源的时间序列数据库平台。你可能通过InfluxDB了解该公司,但你可能不知道它专门从事时间序列数据库开发。这很重要,因为在管理时间序列数据时,你要处理两个问题:存储生命周期和查询。 在存储生命周期中,开发人员通常首先收集和分析非常详细的数据。但开发人员希望存储较小的、降低采样率的数据集,以描述其趋势,而不占用太多的存储空间。 查询数据库时,你不希望基于ID查询数据,而是希望基于时间范围进行查询。使用时间序列数据最常见的一件事是在一段时间内对其进行汇总。在典型的关系型数据库中存储数据时,这种查询是很慢的,这种数据库使用行和列来描述不同数据点的关系。专门为处理时间序列数据而设计的数据库可以更快地处理这类查询。InfluxDB有自己的内置查询语言:Flux,这是专门为查询时间序列数据集而构建的。 数据采集 数据采集和数据处理都有一些很棒的工具。InfluxData有12个以上的客户端库,允许你使用自己选择的编程语言编写和查询数据。这是自定义用法的一个很好的工具。开源摄取代理Telegraf包括300多个输入和输出插件。如果你是一个开发者,你也可以贡献自己的插件。 InfluxDB还可以接受上传小体积历史数据集的CSV文件,以及大数据集的批量导入。importmathbicycles3from(bucket:smartcity)range(start:20210301T00:00:00z,stop:20210401T00:00:00z)filter(fn:(r)r。measurementcityIoT)filter(fn:(r)r。fieldcounter)filter(fn:(r)r。sourcebicycle)filter(fn:(r)r。neighborhoodid3)aggregateWindow(every:1h,fn:mean,createEmpty:false)bicycles4from(bucket:smartcity)range(start:20210301T00:00:00z,stop:20210401T00:00:00z)filter(fn:(r)r。measurementcityIoT)filter(fn:(r)r。fieldcounter)filter(fn:(r)r。sourcebicycle)filter(fn:(r)r。neighborhoodid4)aggregateWindow(every:1h,fn:mean,createEmpty:false)join(tables:{neighborhood3:bicycles3,neighborhood4:bicycles4},on〔time〕,method:inner)keep(columns:〔time,valueneighborhood3,valueneighborhood4〕)map(fn:(r)({rwithdifferencevalue:math。abs(x:(r。valueneighborhood3r。valueneighborhood4))})) Flux Flux是我们的内部查询语言,从零开始建立,用于处理时间序列数据。它也是我们一些工具的基础动力,包括任务task、警报alert和通知notification。要剖析上面的Flux查询,需要定义一些东西。首先,桶bucket就是我们所说的数据库。你可以配置存储桶,然后将数据流添加到其中。查询会调用smartcity存储桶,其范围为特定的一天(准确地说是24小时)。你可以从存储桶中获取所有数据,但大多数用户都包含一个数据范围。这是你能做的最基本的Flux查询。 接下来,我添加过滤器,将数据过滤到更精确、更易于管理的地方。例如,我过滤分配给id为3的社区中的自行车数量。从那里,我使用aggregateWindow获取每小时的平均值。这意味着我希望收到一个包含24列的表,每小时一列。我也对id为4的社区进行同样的查询。最后,我将这两张表相叠加,得出这两个社区自行车使用量的差异。 如果你想知道什么时候是交通高峰,这是不错的选择。显然,这只是Flux查询功能的一个小例子。但它提供了一个很好的例子,使用了Flux附带的一些工具。我还有很多的数据分析和统计功能。但对于这一点,我建议查看Flux文档。importinfluxdatainfluxdbtasksoptiontask{name:PBdownsample,every:1h,offset:10s}from(bucket:plantbuddy)range(start:tasks。lastSuccess(orTime:task。every))filter(fn:(r)r〔measurement〕sensordata)aggregateWindow(every:10m,fn:last,createEmpty:false)yield(name:last)to(bucket:downsampled) 任务 InfluxDB任务task是一个定时Flux脚本,它接收输入数据流并以某种方式修改或分析它。然后,它将修改后的数据存储在新的存储桶中或执行其他操作。将较小的数据集存储到新的存储桶中,称为降采样downsampling,这是数据库的核心功能,也是时间序列数据生命周期的核心部分。 你可以在当前任务示例中看到,我已经对数据进行了降采样。我得到每10分钟增量的最后一个值,并将该值存储在降采样桶中。原始数据集在这10分钟内可能有数千个数据点,但现在降采样桶只有60个新值。需要注意的一点是,我还使用了范围内的lastSuccess函数。这会告诉InfluxDB从上次成功运行的时间开始运行此任务,以防它在过去2小时内失败,在这种情况下,它可以追溯3个小时内的最后一次成功运行。这对于内置错误处理非常有用。 检查和警报 InfluxDB包含一个警报Alert或检查Check和通知notification系统。这个系统非常简单直白。首先进行检查,定期查看数据以查找你定义的异常。通常,这是用阈值定义的。例如,任何低于32F的温度值都被指定为WARN值,高于32F都被分配为OK值,低于0F都被赋予CRITICAL值。从那开始,你的检查可以按你认为必要的频率运行。你的检查以及每个检查的当前状态都有历史记录。在不需要的时候,你不需要设置通知。你可以根据需要参考你的警报历史记录。 许多人选择设置通知。为此,你需要定义一个通知端点notificationendpoint。例如,聊天应用程序可以进行HTTP调用以接收通知。然后你定义何时接收通知,例如,你可以每小时运行一次检查。你可以每24小时运行一次通知。你可以让通知响应值更改,例如,WARN更改为CRITICAL,或者当值为CRITICAL时,无论如何都从OK更改为WARN。这是一个高度可定制的系统。从这个系统创建的Flux代码也可以编辑。 边缘 最后,我想把所有的核心功能放在一起,包括最近发布的一个非常特别的新功能。Edgetocloud是一个非常强大的工具,允许你运行开源InfluxDB,并在出现连接问题时在本地存储数据。连接修复后,它会将数据流传输到InfluxData云平台。 这对于边缘设备和重要数据非常重要,因为任何数据丢失都是有害的。你定义一个要复制到云的存储桶,然后该存储桶有一个磁盘支持的队列来本地存储数据。然后定义云存储桶应该复制到的内容。在连接到云端之前,数据都存储在本地。 InfluxDB和物联网边缘 假设你有一个项目,你想使用连接到植物上的物联网传感器监测家里植物的健康状况。该项目是使用你的笔记本电脑作为边缘设备设置的。当你的笔记本电脑合上或关闭时,它会在本地存储数据,然后在重新连接时将数据流传到我的云存储桶。 需要注意的一点是,在将数据存储到复制桶之前,这会对本地设备上的数据进行降采样。你的植物传感器每秒提供一个数据点。但它将数据压缩为一分钟的平均数,因此存储的数据更少了。在云账户中,你可以添加一些警报和通知,让你知道植物的水分何时低于某个水平,需要浇水。也可以在网站上使用视觉效果来告诉用户植物的健康状况。 数据库是许多应用程序的主干。在像InfluxDB的时间序列数据库平台中使用带有时间戳的数据可以节省开发人员的时间,并使他们能够访问各种工具和服务。InfluxDB的维护者喜欢看到人们在我们的开源社区中构建什么,所以请与我们联系,并与其他人共享你的项目和代码! via:https:opensource。comarticle231timeseriesdataedgeopensourcetools 作者:ZoeSteinkamp选题:lkxed译者:ZhangZhanhaoxiang校对:wxy 本文由LCTT原创编译,Linux中国荣誉推出