From 8bd85d7dcc93e9424c00c06156963694ddc2feff Mon Sep 17 00:00:00 2001 From: elecbug Date: Fri, 19 Jun 2026 00:54:09 +0900 Subject: [PATCH 1/5] feat: enhance Gossip protocol with additional gossip_node parameter for flexible target selection --- v2/p2p/protocol.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/v2/p2p/protocol.go b/v2/p2p/protocol.go index 85e805b..8ffa8bd 100644 --- a/v2/p2p/protocol.go +++ b/v2/p2p/protocol.go @@ -52,9 +52,12 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets = append(targets, neighbor) } - gossipFactor, ok := broadcastParams["gossip_factor"].(float64) - if !ok { - gossipFactor = 0.5 // default gossip factor + gossipFactor, ok1 := broadcastParams["gossip_factor"].(float64) + gossipNode, ok2 := broadcastParams["gossip_node"].(int) + + if !ok1 && !ok2 { + ok1 = true + gossipFactor = 0.5 } if len(targets) > 0 { @@ -62,8 +65,13 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets[i], targets[j] = targets[j], targets[i] }) - k := int(float64(len(targets)) * gossipFactor) - targets = targets[:k] + if ok1 { + k := int(float64(len(targets)) * gossipFactor) + targets = targets[:k] + } else if ok2 { + k := gossipNode + targets = targets[:k] + } } return &targets From 325d49e71f410c138013049488caabd61d6b2894 Mon Sep 17 00:00:00 2001 From: elecbug Date: Fri, 19 Jun 2026 10:35:32 +0900 Subject: [PATCH 2/5] feat: modify eachPublish to accept start time for improved latency handling --- v2/p2p/peer.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/v2/p2p/peer.go b/v2/p2p/peer.go index 3dee3bf..9686959 100644 --- a/v2/p2p/peer.go +++ b/v2/p2p/peer.go @@ -75,8 +75,8 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { if first { go func(msg Message) { - time.Sleep(time.Duration(p.processingLatency) * time.Millisecond) - p.eachPublish(network, msg) + currentTime := time.Now() + p.eachPublish(network, msg, currentTime) }(msg) } } @@ -85,7 +85,7 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { } // eachPublish sends the message to neighbors, excluding 'exclude' and already-sent targets. -func (p *peer) eachPublish(network *P2P, msg Message) { +func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { content := msg.Content protocol := msg.Protocol hopCount := msg.HopCount @@ -128,6 +128,10 @@ func (p *peer) eachPublish(network *P2P, msg Message) { } } + for time.Since(start) < time.Duration(p.processingLatency)*time.Millisecond { + time.Sleep(1 * time.Millisecond) + } + for _, e := range willSendEdges { edgeCopy := e p.sentTo[content][e.targetID] = struct{}{} From bb19cfad575bf09f0835085c6cea1b1028276783 Mon Sep 17 00:00:00 2001 From: elecbug Date: Wed, 1 Jul 2026 12:37:02 +0900 Subject: [PATCH 3/5] feat: implement logging for message flow in P2P peers with PeerLog function --- v2/p2p/p2p.go | 19 ++++++++++++++ v2/p2p/peer.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/v2/p2p/p2p.go b/v2/p2p/p2p.go index 2c04371..4ea582e 100644 --- a/v2/p2p/p2p.go +++ b/v2/p2p/p2p.go @@ -266,3 +266,22 @@ func (p *P2P) MessageInfo(peerID PeerID, content string) (map[string]any, error) return info, nil } + +// PeerLog returns a copy of the log entries for the specified peer, allowing for inspection of message flow and events. +func (p * P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, error) { + peer := p.peers[peerID] + + if peer == nil { + return nil, fmt.Errorf("peer %s not found", peerID) + } + + peer.mu.Lock() + defer peer.mu.Unlock() + + logCopy := make(map[string][]logEntry) + for k, v := range peer.log { + logCopy[k] = append([]logEntry(nil), v...) + } + + return logCopy, nil +} \ No newline at end of file diff --git a/v2/p2p/peer.go b/v2/p2p/peer.go index 9686959..1ce52ce 100644 --- a/v2/p2p/peer.go +++ b/v2/p2p/peer.go @@ -21,6 +21,8 @@ type peer struct { mu sync.Mutex // mutex to protect access to the peer's state alive bool // indicates whether the peer is active in the network + + log map[string][]logEntry } // edge represents a connection from one node to another in the P2P network. @@ -29,6 +31,15 @@ type edge struct { networkLatency float64 // latency for a message sent from this peer to the target peer, in milliseconds } +type logEntry struct { + ID PeerID `json:"id"` // ID of the peer + Timestamp string `json:"timestamp"` // timestamp of the log entry + Type string `json:"type"` // type of log entry (e.g., "recv", "send") + From PeerID `json:"from"` // ID of the sender peer + To PeerID `json:"to"` // ID of the target peer + First bool `json:"first"` // indicates if this is the first time the message is seen +} + // newPeer creates a new Node with the given ID and node latency. func newPeer(id PeerID, nodeLatency float64) *peer { return &peer{ @@ -43,18 +54,25 @@ func newPeer(id PeerID, nodeLatency float64) *peer { msgQueue: make(chan Message, 1000), mu: sync.Mutex{}, + + log: make(map[string][]logEntry), } } // eachRun starts the message handling routine for the peer. func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { go func(ctx context.Context, wg *sync.WaitGroup) { + p.mu.Lock() p.alive = true + p.mu.Unlock() + wg.Done() select { case <-ctx.Done(): + p.mu.Lock() p.alive = false + p.mu.Unlock() return default: for msg := range p.msgQueue { @@ -66,10 +84,32 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { } p.recvFrom[msg.Content][msg.From] = struct{}{} + if _, ok := p.log[msg.Content]; !ok { + p.log[msg.Content] = make([]logEntry, 0) + } + if _, ok := p.seenAt[msg.Content]; !ok { p.seenAt[msg.Content] = time.Now() p.firstFrom[msg.Content] = msg.From first = true + + p.log[msg.Content] = append(p.log[msg.Content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "recv", + From: msg.From, + To: p.id, + First: true, + }) + } else { + p.log[msg.Content] = append(p.log[msg.Content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "recv", + From: msg.From, + To: p.id, + First: false, + }) } p.mu.Unlock() @@ -128,17 +168,36 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { } } - for time.Since(start) < time.Duration(p.processingLatency)*time.Millisecond { - time.Sleep(1 * time.Millisecond) + delay := time.Duration(p.processingLatency * float64(time.Millisecond)) + if remain := delay - time.Since(start); remain > 0 { + time.Sleep(remain) } for _, e := range willSendEdges { edgeCopy := e p.sentTo[content][e.targetID] = struct{}{} + + if _, ok := p.log[msg.Content]; !ok { + p.log[msg.Content] = make([]logEntry, 0) + } + + p.log[content] = append(p.log[content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "send", + From: p.id, + To: e.targetID, + First: false, + }) go func(e edge) { time.Sleep(time.Duration(e.networkLatency) * time.Millisecond) + targetPeer, ok := network.peers[e.targetID] + if !ok || targetPeer == nil { + return + } + network.peers[e.targetID].msgQueue <- Message{ Publisher: msg.Publisher, From: p.id, @@ -159,3 +218,8 @@ func (p *peer) eachStop() { p.alive = false close(p.msgQueue) } + +// timestamp returns the current time formatted as a string for logging purposes. +func timestamp() string { + return time.Now().Format("2006-01-02 15:04:05.000") +} \ No newline at end of file From 6b520bf8808d939061598aafdfca49bbb756c605 Mon Sep 17 00:00:00 2001 From: elecbug Date: Wed, 1 Jul 2026 12:38:42 +0900 Subject: [PATCH 4/5] fix: correct formatting and alignment in logEntry struct and related functions --- v2/p2p/p2p.go | 4 ++-- v2/p2p/peer.go | 50 +++++++++++++++++++++++++------------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/v2/p2p/p2p.go b/v2/p2p/p2p.go index 4ea582e..93d3d8d 100644 --- a/v2/p2p/p2p.go +++ b/v2/p2p/p2p.go @@ -268,7 +268,7 @@ func (p *P2P) MessageInfo(peerID PeerID, content string) (map[string]any, error) } // PeerLog returns a copy of the log entries for the specified peer, allowing for inspection of message flow and events. -func (p * P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, error) { +func (p *P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, error) { peer := p.peers[peerID] if peer == nil { @@ -284,4 +284,4 @@ func (p * P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, er } return logCopy, nil -} \ No newline at end of file +} diff --git a/v2/p2p/peer.go b/v2/p2p/peer.go index 1ce52ce..a69102d 100644 --- a/v2/p2p/peer.go +++ b/v2/p2p/peer.go @@ -32,12 +32,12 @@ type edge struct { } type logEntry struct { - ID PeerID `json:"id"` // ID of the peer - Timestamp string `json:"timestamp"` // timestamp of the log entry - Type string `json:"type"` // type of log entry (e.g., "recv", "send") - From PeerID `json:"from"` // ID of the sender peer - To PeerID `json:"to"` // ID of the target peer - First bool `json:"first"` // indicates if this is the first time the message is seen + ID PeerID `json:"id"` // ID of the peer + Timestamp string `json:"timestamp"` // timestamp of the log entry + Type string `json:"type"` // type of log entry (e.g., "recv", "send") + From PeerID `json:"from"` // ID of the sender peer + To PeerID `json:"to"` // ID of the target peer + First bool `json:"first"` // indicates if this is the first time the message is seen } // newPeer creates a new Node with the given ID and node latency. @@ -87,28 +87,28 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { if _, ok := p.log[msg.Content]; !ok { p.log[msg.Content] = make([]logEntry, 0) } - + if _, ok := p.seenAt[msg.Content]; !ok { p.seenAt[msg.Content] = time.Now() p.firstFrom[msg.Content] = msg.From first = true - + p.log[msg.Content] = append(p.log[msg.Content], logEntry{ - ID: p.id, + ID: p.id, Timestamp: timestamp(), - Type: "recv", - From: msg.From, - To: p.id, - First: true, + Type: "recv", + From: msg.From, + To: p.id, + First: true, }) } else { p.log[msg.Content] = append(p.log[msg.Content], logEntry{ - ID: p.id, + ID: p.id, Timestamp: timestamp(), - Type: "recv", - From: msg.From, - To: p.id, - First: false, + Type: "recv", + From: msg.From, + To: p.id, + First: false, }) } p.mu.Unlock() @@ -176,18 +176,18 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { for _, e := range willSendEdges { edgeCopy := e p.sentTo[content][e.targetID] = struct{}{} - + if _, ok := p.log[msg.Content]; !ok { p.log[msg.Content] = make([]logEntry, 0) } p.log[content] = append(p.log[content], logEntry{ - ID: p.id, + ID: p.id, Timestamp: timestamp(), - Type: "send", - From: p.id, - To: e.targetID, - First: false, + Type: "send", + From: p.id, + To: e.targetID, + First: false, }) go func(e edge) { @@ -222,4 +222,4 @@ func (p *peer) eachStop() { // timestamp returns the current time formatted as a string for logging purposes. func timestamp() string { return time.Now().Format("2006-01-02 15:04:05.000") -} \ No newline at end of file +} From d9b96fb09c7e1320d71713ad1bb11a36a0e5e0db Mon Sep 17 00:00:00 2001 From: elecbug Date: Wed, 1 Jul 2026 12:48:11 +0900 Subject: [PATCH 5/5] feat: sort PeerIDs slice for consistent order in P2P network --- v2/p2p/p2p.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/v2/p2p/p2p.go b/v2/p2p/p2p.go index 93d3d8d..a362a09 100644 --- a/v2/p2p/p2p.go +++ b/v2/p2p/p2p.go @@ -3,6 +3,7 @@ package p2p import ( "context" "fmt" + "slices" "sync" "time" @@ -131,6 +132,8 @@ func (p *P2P) PeerIDs() []PeerID { ids = append(ids, id) } + slices.Sort(ids) + return ids }