Selaa lähdekoodia

Add Influx 2 Support

Due to influx 2 causing backwards incompatibility, a new configuration
type was necessary:

```xml
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="type" value="influx2" />
    <add key="influx2_address" value="http://localhost:8086" />
    <add key="influx2_org" value="myorg" />
    <add key="influx2_bucket" value="mydb" />
    <add key="influx2_token" value="thisistheinfluxdbtoken" />
    <add key="interval" value="5" />
  </appSettings>
</configuration>
```
Nick Babcock 4 vuotta sitten
vanhempi
commit
e200c218bf

+ 104 - 0
OhmGraphite.Test/InfluxTest.cs

@@ -1,9 +1,12 @@
 using System;
+using System.Configuration;
+using System.Linq;
 using System.Net.Http;
 using System.Threading;
 using DotNet.Testcontainers.Containers.Builders;
 using DotNet.Testcontainers.Containers.Modules;
 using DotNet.Testcontainers.Containers.WaitStrategies;
+using InfluxDB.Client;
 using Xunit;
 
 namespace OhmGraphite.Test
@@ -93,5 +96,106 @@ namespace OhmGraphite.Test
                 }
             }
         }
+
+        [Fact, Trait("Category", "integration")]
+        public async void CanInsertIntoInflux2()
+        {
+            var testContainersBuilder = new TestcontainersBuilder<TestcontainersContainer>()
+                .WithImage("influxdb:2.0-alpine")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_MODE", "setup")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_USERNAME", "my-user")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_PASSWORD", "my-password")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_BUCKET", "mydb")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_ORG", "myorg")
+                .WithPortBinding(8086, assignRandomHostPort: true)
+                .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8086));
+
+            await using var container = testContainersBuilder.Build();
+            await container.StartAsync();
+            var baseUrl = $"http://{container.Hostname}:{container.GetMappedPublicPort(8086)}";
+            var options = new InfluxDBClientOptions.Builder()
+                .Url(baseUrl)
+                .Authenticate("my-user", "my-password".ToCharArray())
+                .Bucket("mydb")
+                .Org("myorg")
+                .Build();
+
+            var config = new Influx2Config(options);
+
+            using var writer = new Influx2Writer(config, "my-pc");
+            for (int attempts = 0; ; attempts++)
+            {
+                try
+                {
+                    await writer.ReportMetrics(DateTime.Now, TestSensorCreator.Values());
+                    var influxDBClient = InfluxDBClientFactory.Create(options);
+                    var flux = "from(bucket:\"mydb\") |> range(start: -1h)";
+                    var queryApi = influxDBClient.GetQueryApi();
+                    var tables = await queryApi.QueryAsync(flux, "myorg");
+                    var fields = tables.SelectMany(x => x.Records).Select(x => x.GetValueByKey("identifier"));
+                    Assert.Contains("/intelcpu/0/temperature/0", fields);
+                    break;
+                }
+                catch (Exception)
+                {
+                    if (attempts >= 10)
+                    {
+                        throw;
+                    }
+
+                    Thread.Sleep(TimeSpan.FromSeconds(1));
+                }
+            }
+        }
+
+         [Fact, Trait("Category", "integration")]
+        public async void CanInsertIntoInflux2Token()
+        {
+            var testContainersBuilder = new TestcontainersBuilder<TestcontainersContainer>()
+                .WithImage("influxdb:2.0-alpine")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_MODE", "setup")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_USERNAME", "my-user")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_PASSWORD", "my-password")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_BUCKET", "mydb")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_ORG", "myorg")
+                .WithEnvironment("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN", "thisistheinfluxdbtoken")
+                .WithPortBinding(8086, assignRandomHostPort: true)
+                .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8086));
+
+            await using var container = testContainersBuilder.Build();
+            await container.StartAsync();
+
+            var baseUrl = $"http://{container.Hostname}:{container.GetMappedPublicPort(8086)}";
+            var configMap = new ExeConfigurationFileMap {ExeConfigFilename = "assets/influx2.config"};
+            var config = ConfigurationManager.OpenMappedExeConfiguration(configMap, ConfigurationUserLevel.None);
+            config.AppSettings.Settings["influx2_address"].Value = baseUrl;
+            var customConfig = new CustomConfig(config);
+            var results = MetricConfig.ParseAppSettings(customConfig);
+
+            using var writer = new Influx2Writer(results.Influx2, "my-pc");
+            for (int attempts = 0; ; attempts++)
+            {
+                try
+                {
+                    await writer.ReportMetrics(DateTime.Now, TestSensorCreator.Values());
+                    var influxDBClient = InfluxDBClientFactory.Create(results.Influx2.Options);
+                    var flux = "from(bucket:\"mydb\") |> range(start: -1h)";
+                    var queryApi = influxDBClient.GetQueryApi();
+                    var tables = await queryApi.QueryAsync(flux, "myorg");
+                    var fields = tables.SelectMany(x => x.Records).Select(x => x.GetValueByKey("identifier"));
+                    Assert.Contains("/intelcpu/0/temperature/0", fields);
+                    break;
+                }
+                catch (Exception)
+                {
+                    if (attempts >= 10)
+                    {
+                        throw;
+                    }
+
+                    Thread.Sleep(TimeSpan.FromSeconds(1));
+                }
+            }
+        }
     }
 }

+ 1 - 0
OhmGraphite.Test/OhmGraphite.Test.csproj

@@ -21,6 +21,7 @@
     <None Include="..\assets\timescale.config" Link="assets/timescale.config" CopyToOutputDirectory="PreserveNewest" />
     <None Include="..\assets\prometheus.config" Link="assets/prometheus.config" CopyToOutputDirectory="PreserveNewest" />
     <None Include="..\assets\influx.config" Link="assets/influx.config" CopyToOutputDirectory="PreserveNewest" />
+    <None Include="..\assets\influx2.config" Link="assets/influx2.config" CopyToOutputDirectory="PreserveNewest" />
     <None Include="..\assets\default.config" Link="assets/default.config" CopyToOutputDirectory="PreserveNewest" />
     <None Include="..\assets\graphite.config" Link="assets/graphite.config" CopyToOutputDirectory="PreserveNewest" />
     <None Include="..\assets\static-name.config" Link="assets/static-name.config" CopyToOutputDirectory="PreserveNewest" />

+ 57 - 0
OhmGraphite/Influx2Config.cs

@@ -0,0 +1,57 @@
+using System;
+using InfluxDB.Client;
+
+namespace OhmGraphite
+{
+    public class Influx2Config
+    {
+        public InfluxDBClientOptions Options { get; }
+
+        public Influx2Config(InfluxDBClientOptions options)
+        {
+            Options = options;
+        }
+
+        public static Influx2Config ParseAppSettings(IAppConfig config)
+        {
+            var builder = new InfluxDBClientOptions.Builder();
+
+            string influxAddress = config["influx2_address"];
+            if (!Uri.TryCreate(influxAddress, UriKind.Absolute, out var addr))
+            {
+                throw new ApplicationException($"Unable to parse {influxAddress} into a Uri");
+            }
+
+            builder.Url(influxAddress);
+
+            var token = config["influx2_token"];
+            var user = config["influx2_user"];
+            var password = config["influx2_password"];
+
+            if (!string.IsNullOrEmpty(token))
+            {
+                builder.AuthenticateToken(token);
+            }
+            else
+            {
+                builder.Authenticate(user, password.ToCharArray());
+            }
+
+            var bucket = config["influx2_bucket"];
+            if (string.IsNullOrEmpty(bucket))
+            {
+                throw new ApplicationException($"influx2 needs a bucket to be configured");
+            }
+            builder.Bucket(bucket);
+
+            var org = config["influx2_org"];
+            if (string.IsNullOrEmpty(org))
+            {
+                throw new ApplicationException($"influx2 needs an org to be configured");
+            }
+            builder.Org(org);
+
+            return new Influx2Config(builder.Build());
+        }
+    }
+}

+ 52 - 0
OhmGraphite/Influx2Writer.cs

@@ -0,0 +1,52 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using InfluxDB.Client;
+using InfluxDB.Client.Api.Domain;
+using InfluxDB.Client.Writes;
+using NLog;
+
+namespace OhmGraphite
+{
+    public class Influx2Writer : IWriteMetrics
+    {
+        private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
+
+        private readonly Influx2Config _config;
+        private readonly string _localHost;
+
+        public Influx2Writer(Influx2Config config, string localHost)
+        {
+            _config = config;
+            _localHost = localHost;
+        }
+
+        public async Task ReportMetrics(DateTime reportTime, IEnumerable<ReportedValue> sensors)
+        {
+            var influxDbClient = InfluxDBClientFactory.Create(_config.Options);
+            var writeApi = influxDbClient.GetWriteApiAsync();
+            var points = sensors.Select(x => NewPoint(reportTime, x)).ToList();
+            await writeApi.WritePointsAsync(points);
+        }
+
+        private PointData NewPoint(DateTime reportTime, ReportedValue sensor)
+        {
+            var sensorType = Enum.GetName(typeof(SensorType), sensor.SensorType);
+            return PointData.Measurement(sensorType)
+                .Tag("host", _localHost)
+                .Tag("app", "ohm")
+                .Tag("hardware", sensor.Hardware)
+                .Tag("hardware_type", Enum.GetName(typeof(HardwareType), sensor.HardwareType))
+                .Tag("identifier", sensor.Identifier)
+                .Tag("sensor", sensor.Sensor)
+                .Field("value", sensor.Value)
+                .Field("sensor_index", sensor.SensorIndex)
+                .Timestamp(reportTime.ToUniversalTime(), WritePrecision.S);
+        }
+
+        public void Dispose()
+        {
+        }
+    }
+}

+ 8 - 2
OhmGraphite/MetricConfig.cs

@@ -9,7 +9,7 @@ namespace OhmGraphite
         private readonly INameResolution _nameLookup;
 
         public MetricConfig(TimeSpan interval, INameResolution nameLookup, GraphiteConfig graphite, InfluxConfig influx,
-            PrometheusConfig prometheus, TimescaleConfig timescale, Dictionary<string, string> aliases, ISet<string> hiddenSensors)
+            PrometheusConfig prometheus, TimescaleConfig timescale, Dictionary<string, string> aliases, ISet<string> hiddenSensors, Influx2Config influx2)
         {
             _nameLookup = nameLookup;
             Interval = interval;
@@ -19,12 +19,14 @@ namespace OhmGraphite
             Timescale = timescale;
             Aliases = aliases;
             HiddenSensors = hiddenSensors;
+            Influx2 = influx2;
         }
 
         public string LookupName() => _nameLookup.LookupName();
         public TimeSpan Interval { get; }
         public GraphiteConfig Graphite { get; }
         public InfluxConfig Influx { get; }
+        public Influx2Config Influx2 { get; }
         public PrometheusConfig Prometheus { get; }
         public TimescaleConfig Timescale { get; }
         public Dictionary<string, string> Aliases { get; }
@@ -46,6 +48,7 @@ namespace OhmGraphite
             InfluxConfig iconfig = null;
             PrometheusConfig pconfig = null;
             TimescaleConfig timescale = null;
+            Influx2Config influx2 = null;
 
             switch (type.ToLowerInvariant())
             {
@@ -56,6 +59,9 @@ namespace OhmGraphite
                 case "influx":
                     iconfig = InfluxConfig.ParseAppSettings(config);
                     break;
+                case "influx2":
+                    influx2 = Influx2Config.ParseAppSettings(config);
+                    break;
                 case "prometheus":
                     pconfig = PrometheusConfig.ParseAppSettings(config);
                     break;
@@ -77,7 +83,7 @@ namespace OhmGraphite
                 .Select(x => x.Remove(x.LastIndexOf("/hidden", StringComparison.Ordinal)));
             var hiddenSensors = new HashSet<string>(hidden);
 
-            return new MetricConfig(interval, nameLookup, gconfig, iconfig, pconfig, timescale, aliases, hiddenSensors);
+            return new MetricConfig(interval, nameLookup, gconfig, iconfig, pconfig, timescale, aliases, hiddenSensors, influx2);
         }
 
         private static INameResolution NameLookup(string lookup)

+ 1 - 0
OhmGraphite/OhmGraphite.csproj

@@ -35,6 +35,7 @@
   </ItemGroup>
 
   <ItemGroup>
+    <PackageReference Include="InfluxDB.Client" Version="1.18.0" />
     <PackageReference Include="Npgsql" Version="5.0.4" />
     <PackageReference Include="prometheus-net" Version="4.1.1" />
     <PackageReference Include="System.Configuration.ConfigurationManager" Version="5.0.0" />

+ 7 - 1
OhmGraphite/Program.cs

@@ -108,12 +108,18 @@ namespace OhmGraphite
                 var writer = new TimescaleWriter(config.Timescale.Connection, config.Timescale.SetupTable, hostname);
                 return new MetricTimer(config.Interval, collector, writer);
             }
-            else
+            else if (config.Influx != null)
             {
                 Logger.Info($"Influxdb address: {config.Influx.Address} db: {config.Influx.Db}");
                 var writer = new InfluxWriter(config.Influx, hostname);
                 return new MetricTimer(config.Interval, collector, writer);
             }
+            else
+            {
+                Logger.Info($"Influx2 address: {config.Influx2.Options.Url}");
+                var writer = new Influx2Writer(config.Influx2, hostname);
+                return new MetricTimer(config.Interval, collector, writer);
+            }
         }
     }
 }

+ 11 - 0
assets/influx2.config

@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+  <appSettings>
+    <add key="type" value="influx2" />
+    <add key="influx2_address" value="http://localhost:8086" />
+    <add key="influx2_org" value="myorg" />
+    <add key="influx2_bucket" value="mydb" />
+    <add key="influx2_token" value="thisistheinfluxdbtoken" />
+    <add key="interval" value="5" />
+  </appSettings>
+</configuration>