Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gin-gonic/gin v1.7.0
github.com/go-resty/resty/v2 v2.6.0
github.com/gorilla/websocket v1.5.0
github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175
github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0
github.com/libp2p/go-libp2p v0.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175 h1:IgXxiPx51WJglOL5EtIurlMbujnrLP4vLYQqyfmR0zg=
github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI=
github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17 h1:VH6ZeNKdnuQ/TrRgZRscyL2jXQeI/pN4RfhNMUXHFL0=
github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down
31 changes: 31 additions & 0 deletions packages/manaverse/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package manaverse

import (
"github.com/iotaledger/hive.go/generics/priorityqueue"

"github.com/iotaledger/goshimmer/packages/tangle"
)

type Bucket struct {
mana int64

*priorityqueue.PriorityQueue[*tangle.Message]
}

func newManaBucket(mana int64) *Bucket {
return &Bucket{
mana: mana,
PriorityQueue: priorityqueue.New[*tangle.Message](),
}
}

func (b *Bucket) Compare(other *Bucket) int {
switch true {
case b.mana < other.mana:
return -1
case b.mana > other.mana:
return 1
default:
return 0
}
}
65 changes: 65 additions & 0 deletions packages/manaverse/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package manaverse

import (
"time"

"github.com/iotaledger/hive.go/generics/event"

"github.com/iotaledger/goshimmer/packages/tangle"
)

// region SchedulerEvents //////////////////////////////////////////////////////////////////////////////////////////////

type SchedulerEvents struct {
BlockQueued *event.Event[*SchedulerBlockEvent]
BlockScheduled *event.Event[*SchedulerBlockEvent]
BlockDropped *event.Event[*SchedulerBlockEvent]
BucketProcessingStarted *event.Event[*SchedulerBucketEvent]
BucketProcessingFinished *event.Event[*SchedulerBucketEvent]
}

func newSchedulerEvents() (newInstance *SchedulerEvents) {
return &SchedulerEvents{
BlockQueued: event.New[*SchedulerBlockEvent](),
BlockScheduled: event.New[*SchedulerBlockEvent](),
BlockDropped: event.New[*SchedulerBlockEvent](),
BucketProcessingStarted: event.New[*SchedulerBucketEvent](),
BucketProcessingFinished: event.New[*SchedulerBucketEvent](),
}
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// region SchedulerBlockEvent //////////////////////////////////////////////////////////////////////////////////////////

type SchedulerBlockEvent struct {
Block *tangle.Message
Bucket int64
Time time.Time
}

func newSchedulerBlockEvent(block *tangle.Message, bucket int64, time time.Time) (newInstance *SchedulerBlockEvent) {
return &SchedulerBlockEvent{
Block: block,
Bucket: bucket,
Time: time,
}
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// region SchedulerBucketEvent /////////////////////////////////////////////////////////////////////////////////////////

type SchedulerBucketEvent struct {
Bucket int64
Time time.Time
}

func newSchedulerBucketEvent(bucket int64, time time.Time) (newInstance *SchedulerBucketEvent) {
return &SchedulerBucketEvent{
Bucket: bucket,
Time: time,
}
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
10 changes: 10 additions & 0 deletions packages/manaverse/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package manaverse

import (
"github.com/iotaledger/hive.go/identity"
)

type ManaLedger interface {
IncreaseMana(id identity.ID, mana int64) (newBalance int64)
DecreaseMana(id identity.ID, mana int64) (newBalance int64)
}
46 changes: 46 additions & 0 deletions packages/manaverse/manaverse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package manaverse

import (
"fmt"
"testing"
"time"

"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/generics/event"
"github.com/iotaledger/hive.go/identity"

"github.com/iotaledger/goshimmer/packages/tangle"
)

func Test(t *testing.T) {
identity1KeyPair := ed25519.GenerateKeyPair()
identity1 := identity.New(identity1KeyPair.PublicKey)

manaLedger := NewMockedManaLedger()
manaLedger.IncreaseMana(identity1.ID(), 100)

testTangle := tangle.NewTestTangle()
testFramework := tangle.NewMessageTestFramework(testTangle)
testFramework.CreateMessage("A", tangle.WithStrongParents("Genesis"), tangle.WithIssuer(identity1.PublicKey()))
testFramework.CreateMessage("B", tangle.WithStrongParents("A"), tangle.WithIssuer(identity1.PublicKey()))
testFramework.CreateMessage("C", tangle.WithStrongParents("A"), tangle.WithIssuer(identity1.PublicKey()))

scheduler := NewScheduler(testTangle.ConfirmationOracle, manaLedger)
scheduler.Events.BlockScheduled.Hook(event.NewClosure(func(event *SchedulerBlockEvent) {
fmt.Println(event.Time, "BlockScheduled", event.Block.ID(), event.Bucket)
}))

scheduler.Events.BucketProcessingStarted.Hook(event.NewClosure(func(event *SchedulerBucketEvent) {
fmt.Println(event.Time, "BucketProcessingStarted", event.Bucket)
}))

scheduler.Events.BucketProcessingFinished.Hook(event.NewClosure(func(event *SchedulerBucketEvent) {
fmt.Println(event.Time, "BucketProcessingFinished", event.Bucket)
}))

scheduler.Push(testFramework.Message("A"))
scheduler.Push(testFramework.Message("B"))
scheduler.Push(testFramework.Message("C"))

time.Sleep(2 * time.Second)
}
193 changes: 193 additions & 0 deletions packages/manaverse/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package manaverse

import (
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/generics/event"
"github.com/iotaledger/hive.go/generics/lo"
"github.com/iotaledger/hive.go/generics/priorityqueue"
"github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/timeutil"

"github.com/iotaledger/goshimmer/packages/tangle"
)

type Scheduler struct {
Events *SchedulerEvents

confirmationOracle tangle.ConfirmationOracle
manaLedger ManaLedger
priorityQueue *priorityqueue.PriorityQueue[*Bucket]
bucketsByMana map[int64]*Bucket
currentBucket int64
ticker *timeutil.PrecisionTicker
mutex sync.Mutex

// unqueuedBlocks
unqueuedBlocks map[tangle.MessageID]int
unscheduledBlocks map[tangle.MessageID][]*tangle.Message
}

func NewScheduler(confirmationOracle tangle.ConfirmationOracle, manaLedger ManaLedger) (newScheduler *Scheduler) {
newScheduler = &Scheduler{
Events: newSchedulerEvents(),
confirmationOracle: confirmationOracle,
manaLedger: manaLedger,
priorityQueue: priorityqueue.New[*Bucket](),
bucketsByMana: make(map[int64]*Bucket, 0),
currentBucket: -1,
unqueuedBlocks: make(map[tangle.MessageID]int),
unscheduledBlocks: make(map[tangle.MessageID][]*tangle.Message),
}
newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.ScheduleBlock, 500*time.Millisecond)

return newScheduler
}

func (s *Scheduler) Setup() {
s.confirmationOracle.Events().MessageConfirmed.Attach(event.NewClosure(s.onMessageConfirmed))
}

func (s *Scheduler) onMessageConfirmed(event *tangle.MessageConfirmedEvent) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.markBlockQueued(event.Message)
s.markBlockScheduled(event.Message, time.Now())
}

func (s *Scheduler) Push(block *tangle.Message) {
s.mutex.Lock()
defer s.mutex.Unlock()

if !s.hasUnscheduledParents(block) {
s.queueBlock(block, time.Now())
}
}

func (s *Scheduler) ScheduleBlock() {
s.mutex.Lock()
defer s.mutex.Unlock()

s.scheduleNextBlock(time.Now())
}

func (s *Scheduler) queueBlock(block *tangle.Message, now time.Time) {
bucket := s.bucket(block.BurnedMana())
bucket.Push(block)

s.markBlockQueued(block)

s.Events.BlockQueued.Trigger(newSchedulerBlockEvent(block, bucket.mana, now))
}

func (s *Scheduler) markBlockQueued(block *tangle.Message) {
delete(s.unqueuedBlocks, block.ID())
}

func (s *Scheduler) markBlockScheduled(block *tangle.Message, now time.Time) {
s.decreaseUnscheduledParentCountersOfChildren(block.ID(), now)

delete(s.unscheduledBlocks, block.ID())
}

func (s *Scheduler) dropBucket() {
bucket, exists := s.priorityQueue.Pop()
if !exists {
panic(errors.New("bucket should never be empty"))
}

delete(s.bucketsByMana, bucket.mana)
s.currentBucket = -1
}

func (s *Scheduler) scheduleNextBlock(now time.Time) {
blockToSchedule, bucket := s.nextBlock(now)
for ; blockToSchedule != nil && !s.issuerHasEnoughMana(blockToSchedule); blockToSchedule, bucket = s.nextBlock(now) {
s.Events.BlockDropped.Trigger(newSchedulerBlockEvent(blockToSchedule, bucket, now))

}

if blockToSchedule == nil {
return
}

s.markBlockScheduled(blockToSchedule, now)

s.Events.BlockScheduled.Trigger(newSchedulerBlockEvent(blockToSchedule, bucket, now))
}

func (s *Scheduler) nextBlock(now time.Time) (block *tangle.Message, bucket int64) {
firstBucket, success := s.priorityQueue.Peek()
if !success {
return nil, 0
}

if s.currentBucket != firstBucket.mana {
s.currentBucket = firstBucket.mana
s.Events.BucketProcessingStarted.Trigger(newSchedulerBucketEvent(firstBucket.mana, now))
}

if block, success = firstBucket.Pop(); !success {
panic(errors.Errorf("bucket %v should never be empty", firstBucket))
}

if firstBucket.IsEmpty() {
s.Events.BucketProcessingFinished.Trigger(newSchedulerBucketEvent(firstBucket.mana, now))

s.dropBucket()
}

return block, firstBucket.mana
}

func (s *Scheduler) issuerHasEnoughMana(block *tangle.Message) (canSchedule bool) {
return s.manaLedger.DecreaseMana(identity.NewID(block.IssuerPublicKey()), block.BurnedMana()) >= 0
}

func (s *Scheduler) hasUnscheduledParents(block *tangle.Message) (hasUnscheduledParents bool) {
s.unscheduledBlocks[block.ID()] = make([]*tangle.Message, 0)

if unscheduledParents := s.unscheduledParents(block); unscheduledParents > 0 {
s.unqueuedBlocks[block.ID()] = unscheduledParents

return true
}

return false
}

func (s *Scheduler) unscheduledParents(block *tangle.Message) (unscheduledParents int) {
for it := lo.Unique(block.Parents()).Iterator(); it.HasNext(); {
parentID := it.Next()

if children, isUnscheduled := s.unscheduledBlocks[parentID]; isUnscheduled {
s.unscheduledBlocks[parentID] = append(children, block)

unscheduledParents++
}
}

return unscheduledParents
}

func (s *Scheduler) bucket(mana int64) (bucket *Bucket) {
bucket, exists := s.bucketsByMana[mana]
if !exists {
bucket = newManaBucket(mana)
s.bucketsByMana[mana] = bucket
s.priorityQueue.Push(bucket)
}

return bucket
}

func (s *Scheduler) decreaseUnscheduledParentCountersOfChildren(blockID tangle.MessageID, now time.Time) {
for _, child := range s.unscheduledBlocks[blockID] {
if s.unqueuedBlocks[child.ID()]--; s.unqueuedBlocks[child.ID()] == 0 {
s.queueBlock(child, now)
}
}
}
Loading