TimescaleWriter.cs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using NLog;
  6. using Npgsql;
  7. using NpgsqlTypes;
  8. using OpenHardwareMonitor.Hardware;
  9. namespace OhmGraphite
  10. {
  11. public class TimescaleWriter : IWriteMetrics
  12. {
  13. private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
  14. private readonly string _connStr;
  15. private readonly string _localHost;
  16. private readonly bool _setupTable;
  17. private bool _failure = true;
  18. public TimescaleWriter(string connStr, bool setupTable, string localHost)
  19. {
  20. _connStr = connStr;
  21. _localHost = localHost;
  22. _setupTable = setupTable;
  23. }
  24. public Task ReportMetrics(DateTime reportTime, IEnumerable<ReportedValue> sensors)
  25. {
  26. try
  27. {
  28. if (_failure)
  29. {
  30. Logger.Debug("Clearing connection pool");
  31. NpgsqlConnection.ClearPool(new NpgsqlConnection(_connStr));
  32. }
  33. using (var conn = new NpgsqlConnection(_connStr))
  34. {
  35. conn.Open();
  36. if (_failure)
  37. {
  38. // The reason behind unpreparing is a doozy.
  39. //
  40. // Npgsql persists prepared statements across connections, reason: "This allows
  41. // you to benefit from statements prepared in previous lifetimes, providing all
  42. // the performance benefits to applications using connection pools" -
  43. // http://www.roji.org/prepared-statements-in-npgsql-3-2. I have found this to
  44. // be the correct behavior in 99% situations when either client or server is
  45. // restarted, as the normal flow of exceptions reported on the client when the
  46. // server restarts seems to be:
  47. //
  48. // - System.IO.EndOfStreamException: Attempted to read past the end of the stream
  49. // - 57P03: the database system is starting up
  50. // - Back to normal
  51. //
  52. // However, on 2018-11-29 while upgrading timescale db (0.12.1 to 1.0.0) I
  53. // encountered a bizarre sequence of events
  54. //
  55. // - <start upgrade by restarting server>
  56. // - System.IO.EndOfStreamException: Attempted to read past the end of the stream
  57. // - 57P03: the database system is starting up
  58. // - 58P01: could not access file "timescaledb-0.12.1": No such file or directory
  59. // - <finished with: "ALTER EXTENSION timescaledb UPDATE;">
  60. // - 26000: prepared statement "_p1" does not exist
  61. //
  62. // OhmGraphite could never recover because Npgsql seemed adamant that the
  63. // prepared statement existed. And since Npgsql persists prepared statements in
  64. // it's connection pool all future connections are "poisoned" with this
  65. // prepared statement. The best solution appears to be unpreparing everything on
  66. // db failure. For our use case, recreating these prepared statements is a small
  67. // price to pay even if preparation is redundant.
  68. conn.UnprepareAll();
  69. if (_setupTable)
  70. {
  71. var setupSql = Resourcer.Resource.AsString("schema.sql");
  72. using (var cmd = new NpgsqlCommand(setupSql, conn))
  73. {
  74. cmd.ExecuteNonQuery();
  75. }
  76. }
  77. }
  78. var values = sensors.ToList();
  79. using (var cmd = new NpgsqlCommand(BatchedInsertSql(values), conn))
  80. {
  81. // Note that all parameters must be set before calling Prepare()
  82. // they are part of the information transmitted to PostgreSQL
  83. // and used to effectively plan the statement. You must also set
  84. // the DbType or NpgsqlDbType on your parameters to unambiguously
  85. // specify the data type (setting the value isn't support)
  86. for (int i = 0; i < values.Count; i++)
  87. {
  88. cmd.Parameters.Add($"time{i}", NpgsqlDbType.TimestampTz);
  89. cmd.Parameters.Add($"host{i}", NpgsqlDbType.Text);
  90. cmd.Parameters.Add($"hardware{i}", NpgsqlDbType.Text);
  91. cmd.Parameters.Add($"hardware_type{i}", NpgsqlDbType.Text);
  92. cmd.Parameters.Add($"identifier{i}", NpgsqlDbType.Text);
  93. cmd.Parameters.Add($"sensor{i}", NpgsqlDbType.Text);
  94. cmd.Parameters.Add($"sensor_type{i}", NpgsqlDbType.Text);
  95. cmd.Parameters.Add($"value{i}", NpgsqlDbType.Real);
  96. cmd.Parameters.Add($"sensor_index{i}", NpgsqlDbType.Integer);
  97. }
  98. // A majority of the time, the same number of sensors will be
  99. // reported on, so it's important to prepare the statement
  100. cmd.Prepare();
  101. for (int i = 0; i < values.Count; i++)
  102. {
  103. var sensor = values[i];
  104. cmd.Parameters[$"time{i}"].Value = reportTime;
  105. cmd.Parameters[$"host{i}"].Value = _localHost;
  106. cmd.Parameters[$"hardware{i}"].Value = sensor.Hardware;
  107. cmd.Parameters[$"hardware_type{i}"].Value = Enum.GetName(typeof(HardwareType), sensor.HardwareType);
  108. cmd.Parameters[$"identifier{i}"].Value = sensor.Identifier;
  109. cmd.Parameters[$"sensor{i}"].Value = sensor.Sensor;
  110. cmd.Parameters[$"sensor_type{i}"].Value = Enum.GetName(typeof(SensorType), sensor.SensorType);
  111. cmd.Parameters[$"value{i}"].Value = sensor.Value;
  112. cmd.Parameters[$"sensor_index{i}"].Value = sensor.SensorIndex;
  113. }
  114. cmd.ExecuteNonQuery();
  115. }
  116. _failure = false;
  117. // The synchronous versions of npgsql are more battle tested than asynchronous:
  118. // https://github.com/npgsql/npgsql/issues/2266
  119. return Task.CompletedTask;
  120. }
  121. }
  122. catch (Exception)
  123. {
  124. _failure = true;
  125. throw;
  126. }
  127. }
  128. // Returns a SQL INSERT statement that will insert all the reported values in one go.
  129. // Since there is no batched insert API that is part of Npgsql, we simulate one by
  130. // creating a unique set of sql parameters for each reported value by it's index.
  131. // Sending one insert of 70 values was nearly 10x faster than 70 inserts of 1 value,
  132. // so this circumnavigation around a lack of native batched insert statements is
  133. // worth it.
  134. private static string BatchedInsertSql(IEnumerable<ReportedValue> values)
  135. {
  136. var sqlColumns = values.Select((x, i) =>
  137. $"(@time{i}, @host{i}, @hardware{i}, @hardware_type{i}, @identifier{i}, @sensor{i}, @sensor_type{i}, @sensor_index{i}, @value{i})");
  138. var columns = string.Join(", ", sqlColumns);
  139. return "INSERT INTO ohm_stats " +
  140. "(time, host, hardware, hardware_type, identifier, sensor, sensor_type, sensor_index, value) VALUES " +
  141. columns;
  142. }
  143. }
  144. }