测试数据准备
为了方便测试查询,我们首先准备一个静态的测试数据源:var now = DateTime.Parse("09/12/2011 8:57:00 PM"); var input = new[] { new { Time = now + TimeSpan.FromSeconds(1), Value = 20}, new { Time = now + TimeSpan.FromSeconds(2), Value = 30}, new { Time = now + TimeSpan.FromSeconds(3), Value = 120}, new { Time = now + TimeSpan.FromSeconds(4), Value = 200}, new { Time = now + TimeSpan.FromSeconds(5), Value = 20}, new { Time = now + TimeSpan.FromSeconds(6), Value = 110}, new { Time = now + TimeSpan.FromSeconds(7), Value = 110}, new { Time = now + TimeSpan.FromSeconds(8), Value = 210}, new { Time = now + TimeSpan.FromSeconds(9), Value = 120}, new { Time = now + TimeSpan.FromSeconds(10), Value = 130}, new { Time = now + TimeSpan.FromSeconds(11), Value = 20}, new { Time = now + TimeSpan.FromSeconds(12), Value = 30}, };
接下去将上述数据源转变为点类型复杂事件流:
var inputStream = input.ToPointStream(Application, t => PointEvent.CreateInsert(t.Time.ToLocalTime(), new { Value = t.Value }), AdvanceTimeSettings.IncreasingStartTime);
异常检测
问题:怎样每秒1次的计算过去5秒内Value字段值超过阈值100的事件数超过事件总数目80%的异常事件?首先我们定义一下结果事件流中的负载类型SpikeEvent如下:
struct SpikeEvent { public double Ratio { get; set; } }
我们最终希望调用查询的方式如下:
int threshold = 100; double ratio = 0.8; var resultStream = DetectSpikes( inputStream, threshold, // 指定阈值(超过该阈值的事件被认为是“特殊事件”) ratio, // “特殊事件”占事件总数的百分比 TimeSpan.FromSeconds(5), // 窗口大小 TimeSpan.FromSeconds(1), // 跳跃大小 e => e.Value); // 指定的比较字段
因此最关键的部分就是如何实现DetectSpikes。阅读过
《StreamInsight查询系列(十五)——查询模式之窗口比率》文章的读者应该对此类查询并不陌生。
这里不加过多描述地给出DetectSpikes的实现:
///
/// 异常定义阈值
/// 异常事件占事件总数的百分比
/// 衡量事件数目的窗口大小
/// 跳跃大小
/// 选择输入事件中的某个字段来检测事件类型
///
输出结果如下:
下一篇将介绍StreamInsight查询模式中如何检测空隙事件。
最新评论