-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingest_postgres.py
More file actions
282 lines (228 loc) · 11.4 KB
/
Copy pathingest_postgres.py
File metadata and controls
282 lines (228 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
ingest_postgres.py — Project 04: Time Series Databases
Fullstack Data — Kenneth
Responsibility:
Load data/sensors.parquet into vanilla PostgreSQL as the benchmark baseline.
No extensions. No hypertables. No continuous aggregates.
This is what time series data looks like in a general purpose database.
What this script does:
1. Connect to postgres_vanilla (port 5435)
2. Drop and recreate sensor_readings table — identical schema to TimescaleDB
3. Create a standard composite index on (device_id, ts DESC)
This is the best a vanilla PG can do for time series queries —
a btree index on the two most common filter columns.
4. Bulk load data/sensors.parquet via COPY
5. Run ANALYZE so the query planner has fresh statistics
6. Report ingest throughput and index size
Why this matters:
PostgreSQL is not a bad database. It has btree indexes, window functions,
CTEs, and COPY. A well-indexed PG table handles moderate time series workloads.
The benchmark will show exactly WHERE the TSDB optimisations start to matter —
and at what data volume the vanilla approach starts to break down.
Expected weakness:
Q4 (hourly downsample) — no continuous aggregate means PG aggregates 18M
raw rows live on every query. TimescaleDB reads a pre-computed view.
Q5 (range scan) — no chunk pruning means PG scans the full index even for
a 24-hour window. TimescaleDB skips all chunks outside the range.
Run after: python generate_sensor.py
Run before: python benchmark_queries.py
"""
import io
import os
import time
import pandas as pd
import psycopg2
from tabulate import tabulate
# ── Config ────────────────────────────────────────────────────────────────────
POSTGRES = {
"host": "localhost",
"port": 5432,
"dbname": "sensors",
"user": "engineer",
"password": "engineer",
"connect_timeout": 10,
}
INPUT_PATH = "data/sensors.parquet"
BATCH_SIZE = 100_000
# ── SQL ───────────────────────────────────────────────────────────────────────
DROP_TABLE = "DROP TABLE IF EXISTS sensor_readings;"
CREATE_TABLE = """
CREATE TABLE sensor_readings (
device_id TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
temperature FLOAT NOT NULL,
humidity FLOAT NOT NULL,
pressure FLOAT NOT NULL,
battery_pct FLOAT NOT NULL,
is_anomaly BOOLEAN NOT NULL
);
"""
# Composite btree index — the best a vanilla PG can do for this workload.
# (device_id, ts DESC) satisfies:
# - Q1: DISTINCT ON (device_id) ORDER BY device_id, ts DESC
# - Q2: WHERE device_id = X AND ts >= Y
# - Q5: WHERE device_id = X AND ts >= Y ORDER BY ts
# Q3 and Q4 will still require full or large partial scans — no chunk pruning.
CREATE_INDEX = """
CREATE INDEX idx_sensor_device_ts
ON sensor_readings (device_id, ts DESC);
"""
# A separate ts-only index helps Q3 (gap detection over all devices, last 7 days)
CREATE_TS_INDEX = """
CREATE INDEX idx_sensor_ts
ON sensor_readings (ts DESC);
"""
ANALYZE = "ANALYZE sensor_readings;"
# ── Helpers ───────────────────────────────────────────────────────────────────
def section(title: str):
print(f"\n{'─' * 60}")
print(f" {title}")
print(f"{'─' * 60}\n")
def get_conn(config: dict):
conn = psycopg2.connect(**config)
conn.autocommit = True
return conn
def copy_batch(conn, df_batch: pd.DataFrame):
buf = io.StringIO()
df_batch.to_csv(buf, index=False, header=False)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_expert(
"COPY sensor_readings (device_id, ts, temperature, humidity, "
"pressure, battery_pct, is_anomaly) FROM STDIN WITH CSV",
buf
)
def index_sizes(conn) -> list[dict]:
with conn.cursor() as cur:
cur.execute("""
SELECT
indexrelname AS index_name,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE relname = 'sensor_readings'
ORDER BY indexrelname;
""")
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
def table_size(conn) -> str:
with conn.cursor() as cur:
cur.execute("""
SELECT pg_size_pretty(pg_total_relation_size('sensor_readings'));
""")
return cur.fetchone()[0]
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
print("\n" + "═" * 60)
print(" PROJECT 04 — TIME SERIES | ingest_postgres.py")
print(" Vanilla PostgreSQL baseline — no extensions")
print(" Fullstack Data — Kenneth")
print("═" * 60)
if not os.path.exists(INPUT_PATH):
print(f"\n ✗ {INPUT_PATH} not found. Run generate_sensor.py first.\n")
raise SystemExit(1)
# ── Connect ───────────────────────────────────────────────────────────────
section("STEP 1 — CONNECT")
print(f" Connecting to postgres_vanilla on port {POSTGRES['port']} …")
conn = get_conn(POSTGRES)
with conn.cursor() as cur:
cur.execute("SELECT version();")
pg_version = cur.fetchone()[0].split(",")[0]
print(f" ✓ Connected — {pg_version}")
print(f" ✓ No extensions loaded — this is vanilla PostgreSQL")
# ── Schema ────────────────────────────────────────────────────────────────
section("STEP 2 — SCHEMA SETUP")
print(f" Dropping existing table …")
with conn.cursor() as cur:
cur.execute(DROP_TABLE)
print(f" ✓ Dropped")
print(f" Creating sensor_readings table (identical schema to TimescaleDB) …")
with conn.cursor() as cur:
cur.execute(CREATE_TABLE)
print(f" ✓ Table created")
# ── Ingest first, index after ─────────────────────────────────────────────
# Indexing after bulk load is significantly faster than indexing during.
# PostgreSQL builds the btree in a single pass rather than updating it
# for every inserted row.
section("STEP 3 — INGEST (index after load)")
print(f" Loading {INPUT_PATH} …")
df = pd.read_parquet(INPUT_PATH)
total_rows = len(df)
total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE
print(f" Rows to load: {total_rows:,}")
print(f" Batch size: {BATCH_SIZE:,}")
print(f" Batches: {total_batches}\n")
t_ingest_start = time.time()
rows_loaded = 0
for batch_num in range(total_batches):
start_idx = batch_num * BATCH_SIZE
end_idx = min(start_idx + BATCH_SIZE, total_rows)
batch = df.iloc[start_idx:end_idx]
copy_batch(conn, batch)
rows_loaded += len(batch)
elapsed = time.time() - t_ingest_start
rate = rows_loaded / elapsed if elapsed > 0 else 0
pct = rows_loaded / total_rows * 100
print(f" Batch {batch_num+1:>4}/{total_batches} | "
f"{rows_loaded:>12,} rows ({pct:5.1f}%) | "
f"{rate:>10,.0f} rows/s")
t_ingest_end = time.time()
ingest_duration = t_ingest_end - t_ingest_start
print(f"\n ✓ Ingest complete in {ingest_duration:.1f}s")
print(f" ✓ Throughput: {total_rows / ingest_duration:,.0f} rows/s")
# ── Verify ────────────────────────────────────────────────────────────────
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM sensor_readings;")
db_count = cur.fetchone()[0]
print(f" ✓ Rows in DB: {db_count:,} "
f"{'✓ match' if db_count == total_rows else '✗ MISMATCH'}")
# ── Build indexes ─────────────────────────────────────────────────────────
section("STEP 4 — BUILD INDEXES")
print(f" Building composite index (device_id, ts DESC) …")
t_idx1 = time.time()
with conn.cursor() as cur:
cur.execute(CREATE_INDEX)
idx1_duration = time.time() - t_idx1
print(f" ✓ idx_sensor_device_ts built in {idx1_duration:.1f}s")
print(f" Building ts-only index (ts DESC) …")
t_idx2 = time.time()
with conn.cursor() as cur:
cur.execute(CREATE_TS_INDEX)
idx2_duration = time.time() - t_idx2
print(f" ✓ idx_sensor_ts built in {idx2_duration:.1f}s")
print(f"\n Running ANALYZE for fresh planner statistics …")
with conn.cursor() as cur:
cur.execute(ANALYZE)
print(f" ✓ ANALYZE complete")
# ── Storage report ────────────────────────────────────────────────────────
section("STEP 5 — STORAGE REPORT")
idx_report = index_sizes(conn)
tbl_size = table_size(conn)
print(tabulate(idx_report, headers="keys", tablefmt="rounded_outline"))
print(f"\n Total table size (data + indexes): {tbl_size}")
print(f"""
NOTE ON INDEX STRATEGY:
idx_sensor_device_ts (device_id, ts DESC)
→ Used by Q1, Q2, Q5. Covers the most selective filters.
→ Does NOT help Q4 — no pre-computed aggregates exist.
idx_sensor_ts (ts DESC)
→ Used by Q3 (gap detection across all devices in last 7 days).
→ A TimescaleDB hypertable skips chunks entirely for this.
Vanilla PG still does a large index scan.
This is the ceiling of what vanilla PG can do without extensions.
""")
# ── Summary ───────────────────────────────────────────────────────────────
section("INGEST SUMMARY")
summary = [
{"metric": "Rows loaded", "value": f"{db_count:,}"},
{"metric": "Ingest duration", "value": f"{ingest_duration:.1f}s"},
{"metric": "Ingest throughput", "value": f"{total_rows / ingest_duration:,.0f} rows/s"},
{"metric": "Index build (comp.)", "value": f"{idx1_duration:.1f}s"},
{"metric": "Index build (ts)", "value": f"{idx2_duration:.1f}s"},
{"metric": "Table total size", "value": tbl_size},
{"metric": "Extensions loaded", "value": "none"},
]
print(tabulate(summary, headers="keys", tablefmt="rounded_outline"))
print(f"\n Next step: python benchmark_queries.py\n")
conn.close()
if __name__ == "__main__":
main()