[or-cvs] [metrics-db/master] Move materialized view update logic from the database to Java.
karsten at torproject.org
karsten at torproject.org
Tue Jan 18 10:15:26 UTC 2011
commit 57fdd87876d72d4ba895bbd4ce8e14ee170ee2c7
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Tue Jan 18 11:04:03 2011 +0100
Move materialized view update logic from the database to Java.
So far, we used row-level triggers to memorize which dates had new or
updated data and needed to be included in the next materialized view
refresh run. The row-level triggers wrote these dates to a table which
was read by refresh functions. After a successful refresh run, the table
was emptied. There were two problems with this approach:
First, the refresh run is executed in a single long-running transaction.
If someone adds new data to the database while a refresh run is active,
the changed dates won't be included in the next refresh run.
Second, row-level triggers have a negative impact on performance. It
turns out that removing them reduces import time by roughly 18 %.
The new approach uses Java to memorize which dates need to be updated.
---
db/tordir.sql | 145 ++++----------------
.../ernie/db/RelayDescriptorDatabaseImporter.java | 56 ++++++++
2 files changed, 81 insertions(+), 120 deletions(-)
diff --git a/db/tordir.sql b/db/tordir.sql
index a69ddba..689a4b4 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -228,127 +228,31 @@ CREATE TABLE relay_statuses_per_day (
CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
);
--- TABLE updates
--- A helper table which is used to keep track of what tables and where
--- need to be updated upon refreshes.
+-- Dates to be included in the next refresh run.
+CREATE TABLE scheduled_updates (
+ id SERIAL,
+ date DATE NOT NULL
+);
+
+-- Dates in the current refresh run. When starting a refresh run, we copy
+-- the rows from scheduled_updates here in order to delete just those
+-- lines after the refresh run. Otherwise we might forget scheduled dates
+-- that have been added during a refresh run. If this happens we're going
+-- to update these dates in the next refresh run.
CREATE TABLE updates (
- "date" date NOT NULL,
- CONSTRAINT updates_pkey PRIMARY KEY(date)
+ id INTEGER,
+ date DATE
);
CREATE LANGUAGE plpgsql;
--- FUNCTION update_status
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_status() RETURNS TRIGGER AS $$
- BEGIN
- IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(new.validafter)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(NEW.validafter));
- END IF;
- END IF;
- IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(old.validafter)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(OLD.validafter));
- END IF;
- END IF;
- RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_status
--- This calls the function update_status() each time a row is inserted,
--- updated, or deleted from the statusentry table.
-CREATE TRIGGER update_status
-AFTER INSERT OR UPDATE OR DELETE
-ON statusentry
- FOR EACH ROW EXECUTE PROCEDURE update_status();
-
--- FUNCTION update_desc
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_desc() RETURNS TRIGGER AS $$
- BEGIN
- IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
- BEGIN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(new.published)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(NEW.published));
- END IF;
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(new.published)+1) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(NEW.published)+1);
- END IF;
- END;
- END IF;
- IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
- BEGIN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(old.published)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(OLD.published));
- END IF;
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE=DATE(old.published)+1) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(OLD.published)+1);
- END IF;
- END;
- END IF;
- RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_desc
--- This calls the function update_desc() each time a row is inserted,
--- updated, or deleted from the descriptors table.
-CREATE TRIGGER update_desc
-AFTER INSERT OR UPDATE OR DELETE
-ON descriptor
- FOR EACH ROW EXECUTE PROCEDURE update_desc();
-
--- FUNCTION update_bwhist
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_bwhist() RETURNS TRIGGER AS $$
- BEGIN
- IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE = DATE(NEW.intervalend)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(NEW.intervalend));
- END IF;
- END IF;
- IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
- IF (SELECT COUNT(*) FROM updates
- WHERE DATE = DATE(OLD.intervalend)) = 0 THEN
- INSERT INTO updates
- VALUES (DATE(OLD.intervalend));
- END IF;
- END IF;
- RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_desc
--- This calls the function update_desc() each time a row is inserted,
--- updated, or deleted from the descriptors table.
-CREATE TRIGGER update_bwhist
-AFTER INSERT OR UPDATE OR DELETE
-ON bwhist
- FOR EACH ROW EXECUTE PROCEDURE update_bwhist();
-
-- FUNCTION refresh_relay_statuses_per_day()
-- Updates helper table which is used to refresh the aggregate tables.
CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
RETURNS INTEGER AS $$
BEGIN
DELETE FROM relay_statuses_per_day
- WHERE date IN (SELECT * FROM updates);
+ WHERE date IN (SELECT date FROM updates);
INSERT INTO relay_statuses_per_day (date, count)
SELECT DATE(validafter) AS date, COUNT(*) AS count
FROM (SELECT DISTINCT validafter
@@ -374,7 +278,7 @@ CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$
BEGIN
DELETE FROM network_size
- WHERE date IN (SELECT * FROM updates);
+ WHERE date IN (SELECT date FROM updates);
INSERT INTO network_size
(date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable)
@@ -407,7 +311,7 @@ CREATE OR REPLACE FUNCTION refresh_network_size_hour() RETURNS INTEGER AS $$
BEGIN
DELETE FROM network_size_hour
- WHERE DATE(validafter) IN (SELECT * FROM updates);
+ WHERE DATE(validafter) IN (SELECT date FROM updates);
INSERT INTO network_size_hour
(validafter, avg_running, avg_exit, avg_guard, avg_fast, avg_stable)
@@ -432,7 +336,7 @@ CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$
BEGIN
DELETE FROM relay_platforms
- WHERE date IN (SELECT * FROM updates);
+ WHERE date IN (SELECT date FROM updates);
INSERT INTO relay_platforms
(date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other)
@@ -475,7 +379,7 @@ CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$
BEGIN
DELETE FROM relay_versions
- WHERE date IN (SELECT * FROM updates);
+ WHERE date IN (SELECT date FROM updates);
INSERT INTO relay_versions
(date, version, relays)
@@ -508,7 +412,7 @@ CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
BEGIN
DELETE FROM total_bandwidth
- WHERE date IN (SELECT * FROM updates);
+ WHERE date IN (SELECT date FROM updates);
INSERT INTO total_bandwidth
(bwavg, bwburst, bwobserved, bwadvertised, date)
@@ -544,7 +448,7 @@ $$ LANGUAGE plpgsql;
-- FUNCTION refresh_total_bwhist()
CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
BEGIN
- DELETE FROM total_bwhist WHERE date IN (SELECT * FROM updates);
+ DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates);
INSERT INTO total_bwhist (date, read, written, dirread, dirwritten)
SELECT date,
SUM(read) AS read,
@@ -591,7 +495,7 @@ CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
BEGIN
-- Start by deleting user statistics of the dates we're about to
-- regenerate.
- DELETE FROM user_stats WHERE date IN (SELECT * FROM updates);
+ DELETE FROM user_stats WHERE date IN (SELECT date FROM updates);
-- Now insert new user statistics.
INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd,
brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn)
@@ -806,10 +710,11 @@ CREATE TABLE gettor_stats (
CONSTRAINT gettor_stats_pkey PRIMARY KEY("date", bundle)
);
--- FUNCTION refresh_all()
--- This function refreshes all statistics in the database.
+-- Refresh all statistics in the database.
CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
BEGIN
+ DELETE FROM updates;
+ INSERT INTO updates SELECT * FROM scheduled_updates;
PERFORM refresh_relay_statuses_per_day();
PERFORM refresh_network_size();
PERFORM refresh_network_size_hour();
@@ -818,7 +723,7 @@ CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
PERFORM refresh_total_bandwidth();
PERFORM refresh_total_bwhist();
PERFORM refresh_user_stats();
- DELETE FROM updates;
+ DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
RETURN 1;
END;
$$ LANGUAGE plpgsql;
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index b023227..e423ef9 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -93,6 +93,18 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psQs;
/**
+ * Set of dates that have been inserted into the database for being
+ * included in the next refresh run.
+ */
+ private Set<Long> scheduledUpdates;
+
+ /**
+ * Prepared statement to insert a date into the database that shall be
+ * included in the next refresh run.
+ */
+ private PreparedStatement psU;
+
+ /**
* Prepared statement to insert a network status consensus entry into
* the database.
*/
@@ -280,6 +292,9 @@ public final class RelayDescriptorDatabaseImporter {
this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
+ "(source, statsend, seconds, country, requests) VALUES "
+ "(?, ?, ?, ?, ?)");
+ this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
+ + "(date) VALUES (?)");
+ this.scheduledUpdates = new HashSet<Long>();
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not connect to database or "
+ "prepare statements.", e);
@@ -298,6 +313,24 @@ public final class RelayDescriptorDatabaseImporter {
this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
+ private void addDateToScheduledUpdates(long timestamp)
+ throws SQLException {
+ long dateMillis = 0L;
+ try {
+ dateMillis = this.dateTimeFormat.parse(
+ this.dateTimeFormat.format(timestamp).substring(0, 10)
+ + " 00:00:00").getTime();
+ } catch (ParseException e) {
+ this.logger.log(Level.WARNING, "Internal parsing error.", e);
+ return;
+ }
+ if (!this.scheduledUpdates.contains(dateMillis)) {
+ this.psU.setDate(1, new java.sql.Date(dateMillis));
+ this.psU.execute();
+ this.scheduledUpdates.add(dateMillis);
+ }
+ }
+
/**
* Insert network status consensus entry into database.
*/
@@ -308,6 +341,7 @@ public final class RelayDescriptorDatabaseImporter {
String ports, byte[] rawDescriptor) {
try {
if (this.psSs != null && this.psRs != null && this.psR != null) {
+ this.addDateToScheduledUpdates(validAfter);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
Timestamp validAfterTimestamp = new Timestamp(validAfter);
if (lastCheckedStatusEntries != validAfter) {
@@ -430,6 +464,9 @@ public final class RelayDescriptorDatabaseImporter {
String extraInfoDigest, byte[] rawDescriptor) {
try {
if (this.psDs != null && this.psD != null) {
+ this.addDateToScheduledUpdates(published);
+ this.addDateToScheduledUpdates(
+ published + 24L * 60L * 60L * 1000L);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
this.psDs.setString(1, descriptor);
ResultSet rs = psDs.executeQuery();
@@ -652,6 +689,7 @@ public final class RelayDescriptorDatabaseImporter {
public void addConsensus(long validAfter, byte[] rawDescriptor) {
try {
if (this.psCs != null && this.psC != null) {
+ this.addDateToScheduledUpdates(validAfter);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
Timestamp validAfterTimestamp = new Timestamp(validAfter);
this.psCs.setTimestamp(1, validAfterTimestamp, cal);
@@ -752,6 +790,7 @@ public final class RelayDescriptorDatabaseImporter {
}
if (this.psBs != null && this.psB != null) {
try {
+ this.addDateToScheduledUpdates(statsEndTime);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
this.psBs.setString(1, source);
@@ -814,6 +853,7 @@ public final class RelayDescriptorDatabaseImporter {
}
if (this.psQs != null && this.psQ != null) {
try {
+ this.addDateToScheduledUpdates(statsEndTime);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
this.psQs.setString(1, source);
@@ -874,6 +914,22 @@ public final class RelayDescriptorDatabaseImporter {
+ "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount,
rdsCount, resCount, rhsCount, rqsCount, rbsCount));
+ /* Insert scheduled updates a second time, just in case the refresh
+ * run has started since inserting them the first time in which case
+ * it will miss the data inserted afterwards. We cannot, however,
+ * insert them only now, because if a Java execution fails at a random
+ * point, we might have added data, but not the corresponding dates to
+ * update statistics. */
+ try {
+ for (long dateMillis : this.scheduledUpdates) {
+ this.psU.setDate(1, new java.sql.Date(dateMillis));
+ this.psU.execute();
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add scheduled dates for "
+ + "the next refresh run.", e);
+ }
+
/* Commit any stragglers before closing. */
if (this.conn != null) {
try {
More information about the tor-commits
mailing list