import ( "container/heap" ) type Event struct { Raw string Number uint64 MsgType int FromUserID uint64 ToUserID uint64 } type EventsMinHeap []*Event func (h EventsMinHeap) Len() int { return len(h) } func (h EventsMinHeap) Less(i, j int) bool { return h[i].Number < h[j].Number } func (h EventsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *EventsMinHeap) Push(x interface{}) { *h = append(*h, x.(*Event)) } func (h *EventsMinHeap) Pop() interface{} { old := *h tailIndex := old.Len() - 1 tail := old[tailIndex] old[tailIndex] = nil *h = old[:tailIndex] return tail } type Node struct { Val interface{} Next *Node Prev *Node } type BoundedFifoQueue struct { mx sync.RWMutex Head *Node Tail *Node MaxSize int count int } func NewBoundedQueue(maxSize int) *BoundedFifoQueue { return &BoundedFifoQueue{ MaxSize: maxSize, } } func (b *BoundedFifoQueue) PushBack(val interface{}) { b.mx.Lock() defer b.mx.Unlock() if b.count == b.MaxSize { oldHead := b.Head b.Head = oldHead.Next oldHead = nil b.count-- } newTail := &Node{Val: val, Prev: b.Tail} if b.count > 0 { oldTail := b.Tail oldTail.Next = newTail b.Tail = newTail } else { b.Head = newTail } b.Tail = newTail b.count++ } func (b *BoundedFifoQueue) PushFront(val interface{}) { b.mx.Lock() defer b.mx.Unlock() if b.count == b.MaxSize { oldTail := b.Tail b.Tail = oldTail.Prev oldTail = nil b.count-- } newHead := &Node{Val: val, Next: b.Head} if b.count > 0 { oldHead := b.Head oldHead.Prev = newHead } else { b.Tail = newHead } b.Head = newHead b.count++ } func (b *BoundedFifoQueue) Pop() interface{} { b.mx.Lock() defer b.mx.Unlock() if b.count > 0 { head := b.Head b.Head = head.Next val := head.Val head = nil b.count-- return val } return nil } func (b *BoundedFifoQueue) Front() interface{} { b.mx.RLock() defer b.mx.RUnlock() return b.Head.Val } func (b *BoundedFifoQueue) Back() interface{} { b.mx.RLock() defer b.mx.RUnlock() return b.Tail.Val } func (b *BoundedFifoQueue) Len() int { b.mx.RLock() defer b.mx.RUnlock() return b.count } type PriorityQueue struct { mx sync.RWMutex events *EventsMinHeap maxSize int } func NewPriorityQueue(maxSize int) *PriorityQueue { return &PriorityQueue{ events: new(EventsMinHeap), maxSize: maxSize, } } func (p *PriorityQueue) Push(event *Event) { p.mx.Lock() defer p.mx.Unlock() size := p.events.Len() if size == p.maxSize { heap.Pop(p.events) } heap.Push( p.events, event, ) } func (p *PriorityQueue) Pop() *Event { p.mx.Lock() defer p.mx.Unlock() if p.events.Len() == 0 { return nil } return heap.Pop(p.events).(*Event) } func (p *PriorityQueue) Len() int { p.mx.RLock() defer p.mx.RUnlock() return p.events.Len() }