-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_sensor.py
More file actions
185 lines (151 loc) · 7.08 KB
/
Copy pathgenerate_sensor.py
File metadata and controls
185 lines (151 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
generate_sensor.py — Project 04: Time Series Databases
Fullstack Data — Kenneth
Responsibility:
Generate a synthetic IoT sensor dataset and persist it to data/sensors.parquet.
This is the single source of truth for both ingest scripts — both TimescaleDB
and InfluxDB load from this file so the comparison is against identical data.
Dataset spec:
Devices: 30 sensors (sensor_001 … sensor_030)
Interval: 1 reading per second per device
Duration: 7 days
Total rows: 30 × 7 × 24 × 60 × 60 = 18,144,000 rows
Schema:
device_id string sensor_001 … sensor_030
ts datetime UTC, 1-second intervals, last 7 days
temperature float Gaussian, mean per device ± noise
humidity float Gaussian, mean per device ± noise
pressure float Gaussian, mean per device ± noise
battery_pct float slow linear drain 100 → ~70% over 7 days + noise
is_anomaly bool 1% of readings flagged as anomalous
Design decisions:
- Each device has its own baseline temperature/humidity/pressure so readings
are realistic and distinguishable — not all devices return identical values.
- battery_pct drains linearly across the 7 days + small noise. This makes
the rolling average and range scan queries produce meaningful trends.
- is_anomaly at 1% gives gap detection and anomaly queries real signal to find.
- Parquet output — columnar, compressed, fast to read for both ingest scripts.
- Seeded with random.Random(42) for full reproducibility.
RAM note:
18.1M rows × ~8 columns × 8 bytes ≈ 1.1GB in memory during generation.
Generated in per-device chunks (604,800 rows each) to stay well inside 15GB.
Final Parquet write is Snappy compressed — expect ~150–200MB on disk.
Run:
python generate_sensor.py
python generate_sensor.py --devices 5 --days 1 # fast smoke test (~432k rows)
"""
import argparse
import os
import random
import time
from datetime import datetime, timezone, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# ── Constants ─────────────────────────────────────────────────────────────────
SEED = 42
OUTPUT_PATH = "data/sensors.parquet"
DEFAULT_DEVICES = 30
DEFAULT_DAYS = 7
# Per-device baseline ranges — each device has a fixed characteristic profile
TEMP_BASELINES = {} # populated at runtime
HUM_BASELINES = {}
PRES_BASELINES = {}
# ── Generator ─────────────────────────────────────────────────────────────────
def generate_device_chunk(
device_id: str,
start_ts: datetime,
n_seconds: int,
rng: random.Random,
) -> pd.DataFrame:
"""
Generate all readings for a single device.
Returns a DataFrame with n_seconds rows.
"""
temp_base = rng.uniform(15.0, 35.0) # each device has a fixed baseline
hum_base = rng.uniform(30.0, 80.0)
pres_base = rng.uniform(980.0, 1040.0)
timestamps = [start_ts + timedelta(seconds=i) for i in range(n_seconds)]
temperatures = [round(temp_base + rng.gauss(0, 0.5), 3) for _ in range(n_seconds)]
humidities = [round(max(0, min(100, hum_base + rng.gauss(0, 1.0))), 3) for _ in range(n_seconds)]
pressures = [round(pres_base + rng.gauss(0, 0.3), 3) for _ in range(n_seconds)]
# Battery drains linearly from 100% to ~70% over the full window + noise
battery = [
round(100.0 - (30.0 * i / n_seconds) + rng.gauss(0, 0.1), 2)
for i in range(n_seconds)
]
# 1% anomaly flag — uniform random
is_anomaly = [rng.random() < 0.01 for _ in range(n_seconds)]
return pd.DataFrame({
"device_id": device_id,
"ts": timestamps,
"temperature": temperatures,
"humidity": humidities,
"pressure": pressures,
"battery_pct": battery,
"is_anomaly": is_anomaly,
})
def main():
parser = argparse.ArgumentParser(description="Generate IoT sensor dataset")
parser.add_argument("--devices", type=int, default=DEFAULT_DEVICES,
help=f"Number of devices (default: {DEFAULT_DEVICES})")
parser.add_argument("--days", type=int, default=DEFAULT_DAYS,
help=f"Days of history (default: {DEFAULT_DAYS})")
args = parser.parse_args()
n_devices = args.devices
n_days = args.days
n_seconds = n_days * 24 * 60 * 60
total_rows = n_devices * n_seconds
print("\n" + "═" * 60)
print(" PROJECT 04 — TIME SERIES | generate_sensor.py")
print(" Fullstack Data — Kenneth")
print("═" * 60)
print(f"\n Devices: {n_devices}")
print(f" Duration: {n_days} days ({n_seconds:,} seconds per device)")
print(f" Total rows: {total_rows:,}")
print(f" Output: {OUTPUT_PATH}\n")
rng = random.Random(SEED)
# Start of the 7-day window — aligned to midnight UTC
now = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
start = now - timedelta(days=n_days)
os.makedirs("data", exist_ok=True)
# Write device chunks incrementally to a single Parquet file
# ParquetWriter keeps the file open across chunks — avoids holding all
# 18M rows in memory at once
schema = pa.schema([
pa.field("device_id", pa.string()),
pa.field("ts", pa.timestamp("us", tz="UTC")),
pa.field("temperature", pa.float32()),
pa.field("humidity", pa.float32()),
pa.field("pressure", pa.float32()),
pa.field("battery_pct", pa.float32()),
pa.field("is_anomaly", pa.bool_()),
])
t_start = time.time()
rows_written = 0
with pq.ParquetWriter(OUTPUT_PATH, schema, compression="snappy") as writer:
for i in range(n_devices):
device_id = f"sensor_{i+1:03d}"
chunk = generate_device_chunk(device_id, start, n_seconds, rng)
# Cast to match schema types
table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
writer.write_table(table)
rows_written += len(chunk)
elapsed = time.time() - t_start
rate = rows_written / elapsed if elapsed > 0 else 0
print(f" [{i+1:02d}/{n_devices}] {device_id} "
f"{len(chunk):>10,} rows | "
f"total: {rows_written:>12,} | "
f"{rate:>10,.0f} rows/s")
elapsed_total = time.time() - t_start
file_size_mb = os.path.getsize(OUTPUT_PATH) / (1024 * 1024)
print(f"\n ✓ Generation complete")
print(f" ✓ Rows written: {rows_written:,}")
print(f" ✓ File size: {file_size_mb:.1f} MB (Snappy compressed)")
print(f" ✓ Duration: {elapsed_total:.1f}s")
print(f" ✓ Throughput: {rows_written / elapsed_total:,.0f} rows/s")
print(f"\n Next steps:")
print(f" python ingest_timescale.py")
print(f" python ingest_influx.py\n")
if __name__ == "__main__":
main()