TimescaleWriter.cs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Reflection;
  6. using System.Threading.Tasks;
  7. using NLog;
  8. using Npgsql;
  9. using NpgsqlTypes;
  10. namespace OhmGraphite
  11. {
  12. public class TimescaleWriter : IWriteMetrics
  13. {
  14. private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
  15. private readonly string _connStr;
  16. private readonly string _localHost;
  17. private readonly bool _setupTable;
  18. private bool _failure = true;
  19. public TimescaleWriter(string connStr, bool setupTable, string localHost)
  20. {
  21. _connStr = connStr;
  22. _localHost = localHost;
  23. _setupTable = setupTable;
  24. }
  25. public Task ReportMetrics(DateTime reportTime, IEnumerable<ReportedValue> sensors)
  26. {
  27. try
  28. {
  29. if (_failure)
  30. {
  31. Logger.Debug("Clearing connection pool");
  32. NpgsqlConnection.ClearPool(new NpgsqlConnection(_connStr));
  33. }
  34. using (var conn = new NpgsqlConnection(_connStr))
  35. {
  36. conn.Open();
  37. if (_failure)
  38. {
  39. // The reason behind unpreparing is a doozy.
  40. //
  41. // Npgsql persists prepared statements across connections, reason: "This allows
  42. // you to benefit from statements prepared in previous lifetimes, providing all
  43. // the performance benefits to applications using connection pools" -
  44. // http://www.roji.org/prepared-statements-in-npgsql-3-2. I have found this to
  45. // be the correct behavior in 99% situations when either client or server is
  46. // restarted, as the normal flow of exceptions reported on the client when the
  47. // server restarts seems to be:
  48. //
  49. // - System.IO.EndOfStreamException: Attempted to read past the end of the stream
  50. // - 57P03: the database system is starting up
  51. // - Back to normal
  52. //
  53. // However, on 2018-11-29 while upgrading timescale db (0.12.1 to 1.0.0) I
  54. // encountered a bizarre sequence of events
  55. //
  56. // - <start upgrade by restarting server>
  57. // - System.IO.EndOfStreamException: Attempted to read past the end of the stream
  58. // - 57P03: the database system is starting up
  59. // - 58P01: could not access file "timescaledb-0.12.1": No such file or directory
  60. // - <finished with: "ALTER EXTENSION timescaledb UPDATE;">
  61. // - 26000: prepared statement "_p1" does not exist
  62. //
  63. // OhmGraphite could never recover because Npgsql seemed adamant that the
  64. // prepared statement existed. And since Npgsql persists prepared statements in
  65. // it's connection pool all future connections are "poisoned" with this
  66. // prepared statement. The best solution appears to be unpreparing everything on
  67. // db failure. For our use case, recreating these prepared statements is a small
  68. // price to pay even if preparation is redundant.
  69. conn.UnprepareAll();
  70. if (_setupTable)
  71. {
  72. var assembly = Assembly.GetExecutingAssembly();
  73. var path = assembly.GetManifestResourceNames()
  74. .Single(str => str.EndsWith("schema.sql"));
  75. using (var stream = assembly.GetManifestResourceStream(path))
  76. using (var reader = new StreamReader(stream))
  77. {
  78. var setupSql = reader.ReadToEnd();
  79. using (var cmd = new NpgsqlCommand(setupSql, conn))
  80. {
  81. cmd.ExecuteNonQuery();
  82. }
  83. }
  84. }
  85. }
  86. var values = sensors.ToList();
  87. using (var cmd = new NpgsqlCommand(BatchedInsertSql(values), conn))
  88. {
  89. // Note that all parameters must be set before calling Prepare()
  90. // they are part of the information transmitted to PostgreSQL
  91. // and used to effectively plan the statement. You must also set
  92. // the DbType or NpgsqlDbType on your parameters to unambiguously
  93. // specify the data type (setting the value isn't support)
  94. for (int i = 0; i < values.Count; i++)
  95. {
  96. cmd.Parameters.Add($"time{i}", NpgsqlDbType.TimestampTz);
  97. cmd.Parameters.Add($"host{i}", NpgsqlDbType.Text);
  98. cmd.Parameters.Add($"hardware{i}", NpgsqlDbType.Text);
  99. cmd.Parameters.Add($"hardware_type{i}", NpgsqlDbType.Text);
  100. cmd.Parameters.Add($"identifier{i}", NpgsqlDbType.Text);
  101. cmd.Parameters.Add($"sensor{i}", NpgsqlDbType.Text);
  102. cmd.Parameters.Add($"sensor_type{i}", NpgsqlDbType.Text);
  103. cmd.Parameters.Add($"value{i}", NpgsqlDbType.Real);
  104. cmd.Parameters.Add($"sensor_index{i}", NpgsqlDbType.Integer);
  105. }
  106. // A majority of the time, the same number of sensors will be
  107. // reported on, so it's important to prepare the statement
  108. cmd.Prepare();
  109. for (int i = 0; i < values.Count; i++)
  110. {
  111. var sensor = values[i];
  112. cmd.Parameters[$"time{i}"].Value = reportTime;
  113. cmd.Parameters[$"host{i}"].Value = _localHost;
  114. cmd.Parameters[$"hardware{i}"].Value = sensor.Hardware;
  115. cmd.Parameters[$"hardware_type{i}"].Value = Enum.GetName(typeof(HardwareType), sensor.HardwareType);
  116. cmd.Parameters[$"identifier{i}"].Value = sensor.Identifier;
  117. cmd.Parameters[$"sensor{i}"].Value = sensor.Sensor;
  118. cmd.Parameters[$"sensor_type{i}"].Value = Enum.GetName(typeof(SensorType), sensor.SensorType);
  119. cmd.Parameters[$"value{i}"].Value = sensor.Value;
  120. cmd.Parameters[$"sensor_index{i}"].Value = sensor.SensorIndex;
  121. }
  122. cmd.ExecuteNonQuery();
  123. }
  124. _failure = false;
  125. // The synchronous versions of npgsql are more battle tested than asynchronous:
  126. // https://github.com/npgsql/npgsql/issues/2266
  127. return Task.CompletedTask;
  128. }
  129. }
  130. catch (Exception)
  131. {
  132. _failure = true;
  133. throw;
  134. }
  135. }
  136. // Returns a SQL INSERT statement that will insert all the reported values in one go.
  137. // Since there is no batched insert API that is part of Npgsql, we simulate one by
  138. // creating a unique set of sql parameters for each reported value by it's index.
  139. // Sending one insert of 70 values was nearly 10x faster than 70 inserts of 1 value,
  140. // so this circumnavigation around a lack of native batched insert statements is
  141. // worth it.
  142. private static string BatchedInsertSql(IEnumerable<ReportedValue> values)
  143. {
  144. var sqlColumns = values.Select((x, i) =>
  145. $"(@time{i}, @host{i}, @hardware{i}, @hardware_type{i}, @identifier{i}, @sensor{i}, @sensor_type{i}, @sensor_index{i}, @value{i})");
  146. var columns = string.Join(", ", sqlColumns);
  147. return "INSERT INTO ohm_stats " +
  148. "(time, host, hardware, hardware_type, identifier, sensor, sensor_type, sensor_index, value) VALUES " +
  149. columns;
  150. }
  151. public void Dispose()
  152. {
  153. NpgsqlConnection.ClearPool(new NpgsqlConnection(_connStr));
  154. }
  155. }
  156. }