#!/usr/bin/env python3
"""
DDH Reference Implementation (Python)
-------------------------------------
Logs a real machine's local state vector (LSV_t), computes a stable, semantically-tunable
feature map F via Signed Random Projections + LSH (SimHash-like) to produce X_t, then
updates the Dynamic Device Hash (DDH_t) using a volatile per-epoch salt and domain separation, an append-only Commit, and a rolling Anchor.
It also computes basic stability/sensitivity diagnostics (Hamming distance distribution
of successive X_t) to help tune (d, r) tradeoffs.
This is a minimal research prototype for discussion; it is NOT production code.
Requirements:
- Python 3.9+
- numpy
- psutil
Optional (for quick plots):
- matplotlib
Usage examples:
python ddh_reference.py --epochs 60 --interval 1.0 --d 256 --r 8 --role "edge.sensor"
python ddh_reference.py --epochs 120 --interval 0.5 --d 512 --r 16 --role "gateway"
Outputs:
- Prints per-epoch DDH_t (hex, truncated), Commit_t (hex, truncated), and Anchor_t (hex, truncated)
- Prints Hamming distance stats and acceptance rate for chosen r
- Optionally writes CSV of distances and saves a histogram (if matplotlib installed)
Privacy note:
For debugging transparency, we DO NOT print raw LSV values by default. Use --debug to
log anonymized summaries. In a real deployment, raw LSV NEVER leaves the device.
"""
from __future__ import annotations
import argparse
import hashlib
import os
import secrets
import time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import numpy as np
import psutil
# ------------------------- Utility: hashing & encoding -------------------------
def sha3_256(data: bytes) -> bytes:
return hashlib.sha3_256(data).digest()
def sha3_512(data: bytes) -> bytes:
return hashlib.sha3_512(data).digest()
def hex_trunc(b: bytes, n: int = 12) -> str:
return b.hex()[:n]
# Domain separators to avoid cross-protocol collisions
TAG_DDH = b"DDH.v1"
TAG_COMMIT = b"COMMIT.v1"
TAG_ANCHOR = b"ANCHOR.v1"
# Per-epoch volatile salt (non-repeating, device-local; never transmitted off-box)
# Mirrors the specification's "volatile salt" used in the successor rule.
def next_salt() -> bytes:
# 16 bytes is sufficient; the spec uses 256–512-bit extract/hash digests;
# the salt need only be non-repeating per epoch on the device.
return secrets.token_bytes(16)
# ------------------------- Local State Vector (LSV_t) -------------------------
@dataclass
class LSVState:
last_perf: float
last_disk: psutil._common.sdiskio
last_net: psutil._common.snetio
def collect_lsv(prev: LSVState, role: str) -> Tuple[np.ndarray, LSVState]:
"""Collect a fixed-order numeric feature vector from local, observable signals.
Returns vector in R^n and updated state for deltas.
"""
# Timers and jitter
perf_now = time.perf_counter() # high-resolution monotonic
delta_perf = perf_now - prev.last_perf if prev else 0.0
wall_ns = time.time_ns()
wall_mod_1ms = (wall_ns % 1_000_000) / 1_000_000.0 # [0,1) within 1ms
# CPU & memory
cpu_percent = psutil.cpu_percent(interval=None) # non-blocking since we have interval elsewhere
cpu_count_logical = psutil.cpu_count(logical=True) or 1
vm = psutil.virtual_memory()
mem_used_frac = (vm.total - vm.available) / max(vm.total, 1)
# Load average (POSIX); on non-POSIX, approximate via cpu_percent
try:
la1, la5, la15 = os.getloadavg()
except (AttributeError, OSError):
la1 = la5 = la15 = cpu_percent / 100.0 * cpu_count_logical
# Disk and network deltas
disk = psutil.disk_io_counters()
net = psutil.net_io_counters()
if prev:
d_reads = max(disk.read_count - prev.last_disk.read_count, 0)
d_writes = max(disk.write_count - prev.last_disk.write_count, 0)
d_rbytes = max(disk.read_bytes - prev.last_disk.read_bytes, 0)
d_wbytes = max(disk.write_bytes - prev.last_disk.write_bytes, 0)
d_pkts_sent = max(net.packets_sent - prev.last_net.packets_sent, 0)
d_pkts_recv = max(net.packets_recv - prev.last_net.packets_recv, 0)
d_b_sent = max(net.bytes_sent - prev.last_net.bytes_sent, 0)
d_b_recv = max(net.bytes_recv - prev.last_net.bytes_recv, 0)
else:
d_reads = d_writes = d_rbytes = d_wbytes = 0
d_pkts_sent = d_pkts_recv = d_b_sent = d_b_recv = 0
# Process-level summary: count and light histogram over nice values
proc_count = len(psutil.pids())
try:
niceness = [min(max(p.nice(), -20), 19) for p in psutil.process_iter(["pid", "nice"])]
except Exception:
niceness = []
# 4-bin histogram of niceness: [-20..-1], [0..5], [6..10], [11..19]
bins = [0, 0, 0, 0]
for n in niceness:
if n < 0:
bins[0] += 1
elif n <= 5:
bins[1] += 1
elif n <= 10:
bins[2] += 1
else:
bins[3] += 1
# Role/context hashed to a small numeric footprint
role_hash = int.from_bytes(sha3_256(role.encode()), "big")
role_mod = (role_hash % 1_000_000) / 1_000_000.0 # [0,1)
# Assemble vector (order is fixed; scalings kept simple/robust)
feats = np.array([
delta_perf, # seconds since last sample
wall_mod_1ms, # coarse jitter position
cpu_percent / 100.0, # [0,1]
float(cpu_count_logical),
mem_used_frac, # [0,1]
la1, la5, la15,
d_reads / 1e3, d_writes / 1e3,
d_rbytes / 1e6, d_wbytes / 1e6,
d_pkts_sent / 1e3, d_pkts_recv / 1e3,
d_b_sent / 1e6, d_b_recv / 1e6,
proc_count / 1e3,
*[b / 1e3 for b in bins],
role_mod,
], dtype=float)
new_state = LSVState(perf_now, disk, net)
return feats, new_state
# ------------------------- Feature map F: SRP + LSH ---------------------------
class FeatureMap:
"""Signed Random Projection -> binary LSH (SimHash-like), with public seed.
Produces X_t as a bitvector of length d.
"""
def __init__(self, d: int = 256, seed: int = 0xC0FFEE):
self.d = int(d)
self.seed = int(seed)
self._R = None # projection matrix
self._mu = None # running mean for simple normalization
self._sigma = None # running std
self._n = 0
np.random.seed(self.seed)
def _ensure_proj(self, n_features: int):
if self._R is None or self._R.shape[1] != n_features:
# Gaussian projections (public)
self._R = np.random.randn(self.d, n_features).astype(np.float64)
def _normalize(self, x: np.ndarray) -> np.ndarray:
# Simple running mean/std to reduce drift; avoid zero std
if self._mu is None:
self._mu = np.zeros_like(x)
self._sigma = np.ones_like(x)
self._n = 0
self._n += 1
alpha = 1.0 / min(self._n, 1000) # EMA horizon ~1000 samples
self._mu = (1 - alpha) * self._mu + alpha * x
dev = x - self._mu
self._sigma = (1 - alpha) * self._sigma + alpha * np.abs(dev)
return dev / np.maximum(self._sigma, 1e-6)
def to_bits(self, lsv_vec: np.ndarray, role: str) -> np.ndarray:
x = self._normalize(lsv_vec)
self._ensure_proj(x.shape[0])
y = self._R @ x # project to R^d
bits = (y >= 0).astype(np.uint8) # sign -> {0,1}
# Append a few context bits from role hash for semantic sensitivity
role_hash = sha3_256(role.encode())
role_bits = np.unpackbits(np.frombuffer(role_hash[:8], dtype=np.uint8)) # 64 bits
# XOR the last 64 bits with role bits (broadcast if d<64)
for i in range(min(64, self.d)):
bits[-1 - i] ^= role_bits[-1 - i]
return bits # shape (d,)
@staticmethod
def hamming(a: np.ndarray, b: np.ndarray) -> int:
return int(np.count_nonzero(a != b))
@staticmethod
def bits_to_bytes(bits: np.ndarray) -> bytes:
# Pack bits into bytes (big-endian)
pad = (-len(bits)) % 8
if pad:
bits = np.concatenate([bits, np.zeros(pad, dtype=np.uint8)])
by = np.packbits(bits, bitorder="big").tobytes()
return by
# ------------------------- Slope, Commit, Anchor ------------------------------
@dataclass
class SlopeState:
ddh: bytes
anchor: bytes
def update_ddh(prev_ddh: bytes, x_bits: np.ndarray, salt_t: bytes) -> bytes:
# Ext(X_t): extractor output per spec (naming aligns with § "Identity Generation and Trust Slope")
y = sha3_512(FeatureMap.bits_to_bytes(x_bits)) # Ext(X_t)
# Successor rule per spec:
# DDH_t = H( DDH_{t-1} || Ext(X_t) || salt_t || TAG_DDH )
# where salt_t is a volatile per-epoch salt and TAG_DDH provides domain separation.
return sha3_256(prev_ddh + y + salt_t + TAG_DDH)def commit(ddh: bytes, x_bits: np.ndarray, meta: bytes, salt_t: bytes) -> bytes:
# Commit over successor materials (including salt_t) to enable bounded proofs that disclose y_i and salt_i.
return sha3_256(ddh + FeatureMap.bits_to_bytes(x_bits) + salt_t + meta + TAG_COMMIT)def update_anchor(prev_anchor: bytes, commit_t: bytes) -> bytes:
return sha3_256(prev_anchor + commit_t + TAG_ANCHOR)
# ------------------------- Runner & Diagnostics -------------------------------
def run(epochs: int, interval: float, d: int, r: int, role: str, debug: bool, plot: bool, csv_path: str | None):
fm = FeatureMap(d=d)
# Initialize state
lsv_prev = LSVState(time.perf_counter(), psutil.disk_io_counters(), psutil.net_io_counters())
ddh_prev = sha3_256(b"genesis")
anchor_prev = sha3_256(b"anchor0")
x_prev = None
dists: List[int] = []
print(f"role={role} d={d} r={r} epochs={epochs} interval={interval}s")
print("epoch ddh commit anchor dH(X_t,X_{t-1}) accept? salt")
rows = []
for t in range(1, epochs + 1):
lsv_vec, lsv_prev = collect_lsv(lsv_prev, role)
x_bits = fm.to_bits(lsv_vec, role)
salt_t = next_salt()
ddh_t = update_ddh(ddh_prev, x_bits, salt_t)
# Meta: timestamp (ns) and role (hash)
meta = int(time.time_ns()).to_bytes(8, "big") + sha3_256(role.encode())[:8]
commit_t = commit(ddh_t, x_bits, meta, salt_t)
anchor_t = update_anchor(anchor_prev, commit_t)
dH = FeatureMap.hamming(x_bits, x_prev) if x_prev is not None else 0
accept = (dH <= r) or (x_prev is None)
if x_prev is not None:
dists.append(dH)
print(f"{t:5d} {hex_trunc(ddh_t)} {hex_trunc(commit_t)} {hex_trunc(anchor_t)} {dH:5d} {'Y' if accept else 'N'}")
if debug:
print(f"debug: |X_t|={x_bits.sum()} ones, dH={dH} (Ext(X_t)=SHA3-512(bits), salt_t present)")
rows.append((t, hex_trunc(ddh_t,24), hex_trunc(commit_t,24), hex_trunc(anchor_t,24), dH, int(accept)))
# Advance
x_prev = x_bits
ddh_prev = ddh_t
anchor_prev = anchor_t
time.sleep(max(0.0, interval))
# Diagnostics
if dists:
arr = np.array(dists)
acc_rate = float(np.mean(arr <= r))
print("\nStability diagnostics:")
print(f" mean dH = {arr.mean():.2f}")
print(f" 50th pct = {np.percentile(arr,50):.1f}")
print(f" 90th pct = {np.percentile(arr,90):.1f}")
print(f" 99th pct = {np.percentile(arr,99):.1f}")
print(f" accept rate at r={r}: {acc_rate*100:.1f}%\n")
if csv_path:
with open(csv_path, "w") as f:
f.write("epoch,ddh,commit,anchor,dH,accept\n")
for row in rows:
f.write(",".join(map(str,row)) + "\n")
print(f"Wrote CSV: {csv_path}")
if plot:
try:
import matplotlib.pyplot as plt
plt.figure()
plt.hist(arr, bins=min(50, max(10, len(arr)//2)))
plt.title("Hamming distance of successive X_t")
plt.xlabel("dH(X_t, X_{t-1})")
plt.ylabel("count")
plt.tight_layout()
out_png = "ddh_hamming_hist.png"
plt.savefig(out_png)
print(f"Saved histogram: {out_png}")
except Exception as e:
print(f"Plotting skipped: {e}")
# ------------------------- Main CLI -------------------------------------------
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="DDH reference: LSV_t -> SRP/LSH -> DDH/Commit/Anchor")
ap.add_argument("--epochs", type=int, default=60, help="number of epochs/samples")
ap.add_argument("--interval", type=float, default=1.0, help="seconds between samples")
ap.add_argument("--d", type=int, default=256, help="SRP/LSH bit-length for X_t")
ap.add_argument("--r", type=int, default=8, help="acceptance radius in Hamming space")
ap.add_argument("--role", type=str, default="default", help="semantic role/context label")
ap.add_argument("--debug", action="store_true", help="print extra diagnostics")
ap.add_argument("--plot", action="store_true", help="save a histogram of dH distances")
ap.add_argument("--csv", type=str, default=None, help="write CSV of per-epoch outputs")
args = ap.parse_args()
run(
epochs=args.epochs,
interval=args.interval,
d=args.d,
r=args.r,
role=args.role,
debug=args.debug,
plot=args.plot,
csv_path=args.csv,
)
#!/usr/bin/env python3
import sys
def main():
input_data = sys.stdin.read()
print(f"Received input: #!/usr/bin/env python3
"""
DDH Reference Implementation (Python)
-------------------------------------
Logs a real machine's local state vector (LSV_t), computes a stable, semantically-tunable
feature map F via Signed Random Projections + LSH (SimHash-like) to produce X_t, then
updates the Dynamic Device Hash (DDH_t) using a volatile per-epoch salt and domain separation, an append-only Commit, and a rolling Anchor.
It also computes basic stability/sensitivity diagnostics (Hamming distance distribution
of successive X_t) to help tune (d, r) tradeoffs.
This is a minimal research prototype for discussion; it is NOT production code.
Requirements:
- Python 3.9+
- numpy
- psutil
Optional (for quick plots):
- matplotlib
Usage examples:
python ddh_reference.py --epochs 60 --interval 1.0 --d 256 --r 8 --role "edge.sensor"
python ddh_reference.py --epochs 120 --interval 0.5 --d 512 --r 16 --role "gateway"
Outputs:
- Prints per-epoch DDH_t (hex, truncated), Commit_t (hex, truncated), and Anchor_t (hex, truncated)
- Prints Hamming distance stats and acceptance rate for chosen r
- Optionally writes CSV of distances and saves a histogram (if matplotlib installed)
Privacy note:
For debugging transparency, we DO NOT print raw LSV values by default. Use --debug to
log anonymized summaries. In a real deployment, raw LSV NEVER leaves the device.
"""
from __future__ import annotations
import argparse
import hashlib
import os
import secrets
import time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import numpy as np
import psutil
# ------------------------- Utility: hashing & encoding -------------------------
def sha3_256(data: bytes) -> bytes:
return hashlib.sha3_256(data).digest()
def sha3_512(data: bytes) -> bytes:
return hashlib.sha3_512(data).digest()
def hex_trunc(b: bytes, n: int = 12) -> str:
return b.hex()[:n]
# Domain separators to avoid cross-protocol collisions
TAG_DDH = b"DDH.v1"
TAG_COMMIT = b"COMMIT.v1"
TAG_ANCHOR = b"ANCHOR.v1"
# Per-epoch volatile salt (non-repeating, device-local; never transmitted off-box)
# Mirrors the specification's "volatile salt" used in the successor rule.
def next_salt() -> bytes:
# 16 bytes is sufficient; the spec uses 256–512-bit extract/hash digests;
# the salt need only be non-repeating per epoch on the device.
return secrets.token_bytes(16)
# ------------------------- Local State Vector (LSV_t) -------------------------
@dataclass
class LSVState:
last_perf: float
last_disk: psutil._common.sdiskio
last_net: psutil._common.snetio
def collect_lsv(prev: LSVState, role: str) -> Tuple[np.ndarray, LSVState]:
"""Collect a fixed-order numeric feature vector from local, observable signals.
Returns vector in R^n and updated state for deltas.
"""
# Timers and jitter
perf_now = time.perf_counter() # high-resolution monotonic
delta_perf = perf_now - prev.last_perf if prev else 0.0
wall_ns = time.time_ns()
wall_mod_1ms = (wall_ns % 1_000_000) / 1_000_000.0 # [0,1) within 1ms
# CPU & memory
cpu_percent = psutil.cpu_percent(interval=None) # non-blocking since we have interval elsewhere
cpu_count_logical = psutil.cpu_count(logical=True) or 1
vm = psutil.virtual_memory()
mem_used_frac = (vm.total - vm.available) / max(vm.total, 1)
# Load average (POSIX); on non-POSIX, approximate via cpu_percent
try:
la1, la5, la15 = os.getloadavg()
except (AttributeError, OSError):
la1 = la5 = la15 = cpu_percent / 100.0 * cpu_count_logical
# Disk and network deltas
disk = psutil.disk_io_counters()
net = psutil.net_io_counters()
if prev:
d_reads = max(disk.read_count - prev.last_disk.read_count, 0)
d_writes = max(disk.write_count - prev.last_disk.write_count, 0)
d_rbytes = max(disk.read_bytes - prev.last_disk.read_bytes, 0)
d_wbytes = max(disk.write_bytes - prev.last_disk.write_bytes, 0)
d_pkts_sent = max(net.packets_sent - prev.last_net.packets_sent, 0)
d_pkts_recv = max(net.packets_recv - prev.last_net.packets_recv, 0)
d_b_sent = max(net.bytes_sent - prev.last_net.bytes_sent, 0)
d_b_recv = max(net.bytes_recv - prev.last_net.bytes_recv, 0)
else:
d_reads = d_writes = d_rbytes = d_wbytes = 0
d_pkts_sent = d_pkts_recv = d_b_sent = d_b_recv = 0
# Process-level summary: count and light histogram over nice values
proc_count = len(psutil.pids())
try:
niceness = [min(max(p.nice(), -20), 19) for p in psutil.process_iter(["pid", "nice"])]
except Exception:
niceness = []
# 4-bin histogram of niceness: [-20..-1], [0..5], [6..10], [11..19]
bins = [0, 0, 0, 0]
for n in niceness:
if n < 0:
bins[0] += 1
elif n <= 5:
bins[1] += 1
elif n <= 10:
bins[2] += 1
else:
bins[3] += 1
# Role/context hashed to a small numeric footprint
role_hash = int.from_bytes(sha3_256(role.encode()), "big")
role_mod = (role_hash % 1_000_000) / 1_000_000.0 # [0,1)
# Assemble vector (order is fixed; scalings kept simple/robust)
feats = np.array([
delta_perf, # seconds since last sample
wall_mod_1ms, # coarse jitter position
cpu_percent / 100.0, # [0,1]
float(cpu_count_logical),
mem_used_frac, # [0,1]
la1, la5, la15,
d_reads / 1e3, d_writes / 1e3,
d_rbytes / 1e6, d_wbytes / 1e6,
d_pkts_sent / 1e3, d_pkts_recv / 1e3,
d_b_sent / 1e6, d_b_recv / 1e6,
proc_count / 1e3,
*[b / 1e3 for b in bins],
role_mod,
], dtype=float)
new_state = LSVState(perf_now, disk, net)
return feats, new_state
# ------------------------- Feature map F: SRP + LSH ---------------------------
class FeatureMap:
"""Signed Random Projection -> binary LSH (SimHash-like), with public seed.
Produces X_t as a bitvector of length d.
"""
def __init__(self, d: int = 256, seed: int = 0xC0FFEE):
self.d = int(d)
self.seed = int(seed)
self._R = None # projection matrix
self._mu = None # running mean for simple normalization
self._sigma = None # running std
self._n = 0
np.random.seed(self.seed)
def _ensure_proj(self, n_features: int):
if self._R is None or self._R.shape[1] != n_features:
# Gaussian projections (public)
self._R = np.random.randn(self.d, n_features).astype(np.float64)
def _normalize(self, x: np.ndarray) -> np.ndarray:
# Simple running mean/std to reduce drift; avoid zero std
if self._mu is None:
self._mu = np.zeros_like(x)
self._sigma = np.ones_like(x)
self._n = 0
self._n += 1
alpha = 1.0 / min(self._n, 1000) # EMA horizon ~1000 samples
self._mu = (1 - alpha) * self._mu + alpha * x
dev = x - self._mu
self._sigma = (1 - alpha) * self._sigma + alpha * np.abs(dev)
return dev / np.maximum(self._sigma, 1e-6)
def to_bits(self, lsv_vec: np.ndarray, role: str) -> np.ndarray:
x = self._normalize(lsv_vec)
self._ensure_proj(x.shape[0])
y = self._R @ x # project to R^d
bits = (y >= 0).astype(np.uint8) # sign -> {0,1}
# Append a few context bits from role hash for semantic sensitivity
role_hash = sha3_256(role.encode())
role_bits = np.unpackbits(np.frombuffer(role_hash[:8], dtype=np.uint8)) # 64 bits
# XOR the last 64 bits with role bits (broadcast if d<64)
for i in range(min(64, self.d)):
bits[-1 - i] ^= role_bits[-1 - i]
return bits # shape (d,)
@staticmethod
def hamming(a: np.ndarray, b: np.ndarray) -> int:
return int(np.count_nonzero(a != b))
@staticmethod
def bits_to_bytes(bits: np.ndarray) -> bytes:
# Pack bits into bytes (big-endian)
pad = (-len(bits)) % 8
if pad:
bits = np.concatenate([bits, np.zeros(pad, dtype=np.uint8)])
by = np.packbits(bits, bitorder="big").tobytes()
return by
# ------------------------- Slope, Commit, Anchor ------------------------------
@dataclass
class SlopeState:
ddh: bytes
anchor: bytes
def update_ddh(prev_ddh: bytes, x_bits: np.ndarray, salt_t: bytes) -> bytes:
# Ext(X_t): extractor output per spec (naming aligns with § "Identity Generation and Trust Slope")
y = sha3_512(FeatureMap.bits_to_bytes(x_bits)) # Ext(X_t)
# Successor rule per spec:
# DDH_t = H( DDH_{t-1} || Ext(X_t) || salt_t || TAG_DDH )
# where salt_t is a volatile per-epoch salt and TAG_DDH provides domain separation.
return sha3_256(prev_ddh + y + salt_t + TAG_DDH)def commit(ddh: bytes, x_bits: np.ndarray, meta: bytes, salt_t: bytes) -> bytes:
# Commit over successor materials (including salt_t) to enable bounded proofs that disclose y_i and salt_i.
return sha3_256(ddh + FeatureMap.bits_to_bytes(x_bits) + salt_t + meta + TAG_COMMIT)def update_anchor(prev_anchor: bytes, commit_t: bytes) -> bytes:
return sha3_256(prev_anchor + commit_t + TAG_ANCHOR)
# ------------------------- Runner & Diagnostics -------------------------------
def run(epochs: int, interval: float, d: int, r: int, role: str, debug: bool, plot: bool, csv_path: str | None):
fm = FeatureMap(d=d)
# Initialize state
lsv_prev = LSVState(time.perf_counter(), psutil.disk_io_counters(), psutil.net_io_counters())
ddh_prev = sha3_256(b"genesis")
anchor_prev = sha3_256(b"anchor0")
x_prev = None
dists: List[int] = []
print(f"role={role} d={d} r={r} epochs={epochs} interval={interval}s")
print("epoch ddh commit anchor dH(X_t,X_{t-1}) accept? salt")
rows = []
for t in range(1, epochs + 1):
lsv_vec, lsv_prev = collect_lsv(lsv_prev, role)
x_bits = fm.to_bits(lsv_vec, role)
salt_t = next_salt()
ddh_t = update_ddh(ddh_prev, x_bits, salt_t)
# Meta: timestamp (ns) and role (hash)
meta = int(time.time_ns()).to_bytes(8, "big") + sha3_256(role.encode())[:8]
commit_t = commit(ddh_t, x_bits, meta, salt_t)
anchor_t = update_anchor(anchor_prev, commit_t)
dH = FeatureMap.hamming(x_bits, x_prev) if x_prev is not None else 0
accept = (dH <= r) or (x_prev is None)
if x_prev is not None:
dists.append(dH)
print(f"{t:5d} {hex_trunc(ddh_t)} {hex_trunc(commit_t)} {hex_trunc(anchor_t)} {dH:5d} {'Y' if accept else 'N'}")
if debug:
print(f"debug: |X_t|={x_bits.sum()} ones, dH={dH} (Ext(X_t)=SHA3-512(bits), salt_t present)")
rows.append((t, hex_trunc(ddh_t,24), hex_trunc(commit_t,24), hex_trunc(anchor_t,24), dH, int(accept)))
# Advance
x_prev = x_bits
ddh_prev = ddh_t
anchor_prev = anchor_t
time.sleep(max(0.0, interval))
# Diagnostics
if dists:
arr = np.array(dists)
acc_rate = float(np.mean(arr <= r))
print("\nStability diagnostics:")
print(f" mean dH = {arr.mean():.2f}")
print(f" 50th pct = {np.percentile(arr,50):.1f}")
print(f" 90th pct = {np.percentile(arr,90):.1f}")
print(f" 99th pct = {np.percentile(arr,99):.1f}")
print(f" accept rate at r={r}: {acc_rate*100:.1f}%\n")
if csv_path:
with open(csv_path, "w") as f:
f.write("epoch,ddh,commit,anchor,dH,accept\n")
for row in rows:
f.write(",".join(map(str,row)) + "\n")
print(f"Wrote CSV: {csv_path}")
if plot:
try:
import matplotlib.pyplot as plt
plt.figure()
plt.hist(arr, bins=min(50, max(10, len(arr)//2)))
plt.title("Hamming distance of successive X_t")
plt.xlabel("dH(X_t, X_{t-1})")
plt.ylabel("count")
plt.tight_layout()
out_png = "ddh_hamming_hist.png"
plt.savefig(out_png)
print(f"Saved histogram: {out_png}")
except Exception as e:
print(f"Plotting skipped: {e}")
# ------------------------- Main CLI -------------------------------------------
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="DDH reference: LSV_t -> SRP/LSH -> DDH/Commit/Anchor")
ap.add_argument("--epochs", type=int, default=60, help="number of epochs/samples")
ap.add_argument("--interval", type=float, default=1.0, help="seconds between samples")
ap.add_argument("--d", type=int, default=256, help="SRP/LSH bit-length for X_t")
ap.add_argument("--r", type=int, default=8, help="acceptance radius in Hamming space")
ap.add_argument("--role", type=str, default="default", help="semantic role/context label")
ap.add_argument("--debug", action="store_true", help="print extra diagnostics")
ap.add_argument("--plot", action="store_true", help="save a histogram of dH distances")
ap.add_argument("--csv", type=str, default=None, help="write CSV of per-epoch outputs")
args = ap.parse_args()
run(
epochs=args.epochs,
interval=args.interval,
d=args.d,
r=args.r,
role=args.role,
debug=args.debug,
plot=args.plot,
csv_path=args.csv,
)
")
if __name__ == "__main__":
main()