[go: up one dir, main page]

Skip to content

Commit

Permalink
Merge pull request #39 from rizalgowandy/arwego/feat/pgx
Browse files Browse the repository at this point in the history
Add Writer and Reader Concept
  • Loading branch information
rizalgowandy committed May 9, 2022
2 parents fff8f43 + ec033ae commit 571a98c
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 210 deletions.
31 changes: 31 additions & 0 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package balancer

import (
"sync"

"github.com/rizalgowandy/gdk/pkg/errorx/v1"
)

func NewRoundRobin(items []interface{}) (*RoundRobin, error) {
if len(items) == 0 {
return nil, errorx.E("no items passed")
}

return &RoundRobin{
items: items,
}, nil
}

type RoundRobin struct {
mux sync.Mutex
next int
items []interface{}
}

func (b *RoundRobin) Next() interface{} {
b.mux.Lock()
r := b.items[b.next]
b.next = (b.next + 1) % len(b.items)
b.mux.Unlock()
return r
}
10 changes: 4 additions & 6 deletions pkg/queue/nsqx/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,21 @@ func NewConsumerController(interceptors ...ConsumerInterceptor) *ConsumerControl
}

func (c *ConsumerController) AddConsumers(params []ConsumerParam) error {
const op errorx.Op = "nsqx/ConsumerController.AddConsumers"

for _, param := range params {
if param.Config == nil {
param.Config = &ConsumerConfiguration{}
}
if param.Consumer == nil {
return errorx.E("invalid consumer", op)
return errorx.E("invalid consumer")
}

if err := param.Config.Validate(); err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

consumer, err := nsq.NewConsumer(param.Topic, param.Channel, param.Config.NSQ)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

consumer.AddConcurrentHandlers(
Expand All @@ -64,7 +62,7 @@ func (c *ConsumerController) AddConsumers(params []ConsumerParam) error {

err = consumer.ConnectToNSQLookupds(param.Config.LookupAddress)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

c.consumers = append(c.consumers, consumer)
Expand Down
22 changes: 8 additions & 14 deletions pkg/queue/nsqx/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@ type Producer struct {
// NewProducer creates a client to publish message to nsq.
func NewProducer(config *ProducerConfiguration) (*Producer, error) {
onceNewProducer.Do(func() {
const op errorx.Op = "nsqx.NewProducer"

if err := config.Validate(); err != nil {
onceNewProducerErr = errorx.E(err, op)
onceNewProducerErr = errorx.E(err)
return
}

client, err := nsq.NewProducer(config.DaemonAddress, config.NSQ)
if err != nil {
onceNewProducerErr = errorx.E(err, op)
onceNewProducerErr = errorx.E(err)
return
}

Expand All @@ -50,24 +48,22 @@ func NewProducer(config *ProducerConfiguration) (*Producer, error) {

// Publish sends data to nsq.
func (p *Producer) Publish(_ context.Context, topic string, data interface{}) error {
const op errorx.Op = "nsqx/Producer.Publish"

if topic == "" {
return errorx.E("topic cannot be empty", op)
return errorx.E("topic cannot be empty")
}

var err error
dataByte, ok := (data).([]byte)
if !ok {
dataByte, err = jsonx.Marshal(data)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}
}

err = p.publish(topic, dataByte)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

return nil
Expand All @@ -80,24 +76,22 @@ func (p *Producer) DeferredPublish(
delay time.Duration,
data interface{},
) error {
const op errorx.Op = "nsqx/Producer.DeferredPublish"

if topic == "" {
return errorx.E("topic cannot be empty", op)
return errorx.E("topic cannot be empty")
}

var err error
dataByte, ok := (data).([]byte)
if !ok {
dataByte, err = jsonx.Marshal(data)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}
}

err = p.deferredPublish(topic, delay, dataByte)
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

return nil
Expand Down
61 changes: 21 additions & 40 deletions pkg/storage/cache/go_redis_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-redis/redis/v8"
"github.com/rizalgowandy/gdk/pkg/errorx/v2"
"github.com/rizalgowandy/gdk/pkg/fn"
"github.com/rizalgowandy/gdk/pkg/logx"
"github.com/rizalgowandy/gdk/pkg/syncx"
"github.com/rizalgowandy/gdk/pkg/tags"
Expand Down Expand Up @@ -64,17 +65,15 @@ type GoRedisCluster struct {

// Get gets the value from redis in []byte form.
func (r *GoRedisCluster) Get(ctx context.Context, key string) ([]byte, error) {
const op errorx.Op = "cache/GoRedisCluster.Get"

res, err := r.client.Get(ctx, key).Result()
if err != nil {
return nil, errorx.E(err, op)
return nil, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return []byte(res), nil
}

Expand All @@ -85,14 +84,12 @@ func (r *GoRedisCluster) SetEX(
seconds int64,
value string,
) error {
const op errorx.Op = "cache/GoRedisCluster.SetEX"

_, err := r.client.SetEX(ctx, key, value, time.Duration(seconds)*time.Second).Result()
if err != nil {
return errorx.E(err, op)
return errorx.E(err)
}

logx.DBG(ctx, logx.KV{tags.Key: key}, string(op)+" success")
logx.DBG(ctx, logx.KV{tags.Key: key}, fn.Name()+" success")
return nil
}

Expand All @@ -104,33 +101,29 @@ func (r *GoRedisCluster) SetNX(
seconds int64,
value string,
) (bool, error) {
const op errorx.Op = "cache/GoRedisCluster.SetNX"

res, err := r.client.SetNX(ctx, key, value, time.Duration(seconds)*time.Second).Result()
if err != nil {
return false, errorx.E(err, op)
return false, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res, nil
}

// Exists checks whether the key exists in redis.
func (r *GoRedisCluster) Exists(ctx context.Context, key string) (bool, error) {
const op errorx.Op = "cache/GoRedisCluster.Exists"

res, err := r.client.Exists(ctx, key).Result()
if err != nil {
return false, errorx.E(err, op)
return false, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res > 0, nil
}

Expand All @@ -140,88 +133,76 @@ func (r *GoRedisCluster) Expire(
key string,
seconds int64,
) (bool, error) {
const op errorx.Op = "cache/GoRedisCluster.Expire"

res, err := r.client.Expire(ctx, key, time.Duration(seconds)*time.Second).Result()
if err != nil {
return false, errorx.E(err, op)
return false, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res, nil
}

// TTL gets the time to live of a key / expiry time.
func (r *GoRedisCluster) TTL(ctx context.Context, key string) (int64, error) {
const op errorx.Op = "cache/GoRedisCluster.TTL"

res, err := r.client.TTL(ctx, key).Result()
if err != nil {
return 0, errorx.E(err, op)
return 0, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return int64(res), nil
}

// HGet gets the value of a hash field.
func (r *GoRedisCluster) HGet(ctx context.Context, key, field string) ([]byte, error) {
const op errorx.Op = "cache/GoRedisCluster.HGet"

res, err := r.client.HGet(ctx, key, field).Result()
if err != nil {
return nil, errorx.E(err, op)
return nil, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return []byte(res), nil
}

// HExists determines if a hash field exists.
func (r *GoRedisCluster) HExists(ctx context.Context, key, field string) (bool, error) {
const op errorx.Op = "cache/GoRedisCluster.HExists"

res, err := r.client.HExists(ctx, key, field).Result()
if err != nil {
return false, errorx.E(err, op)
return false, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res, nil
}

// HSet sets the string value of a hash field.
func (r *GoRedisCluster) HSet(ctx context.Context, key, field, value string) (bool, error) {
const op errorx.Op = "cache/GoRedisCluster.HSet"

res, err := r.client.HSet(ctx, key, field, value).Result()
if err != nil {
return false, errorx.E(err, op)
return false, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res > 0, nil
}

// Del deletes a key.
func (r *GoRedisCluster) Del(ctx context.Context, key ...interface{}) (int64, error) {
const op errorx.Op = "cache/GoRedisCluster.Del"

stdKeys := make([]string, len(key))
for i, v := range key {
stdKey, ok := v.(string)
Expand All @@ -232,13 +213,13 @@ func (r *GoRedisCluster) Del(ctx context.Context, key ...interface{}) (int64, er

res, err := r.client.Del(ctx, stdKeys...).Result()
if err != nil {
return 0, errorx.E(err, op)
return 0, errorx.E(err)
}

logx.DBG(ctx, logx.KV{
tags.Key: key,
tags.Detail: res,
}, string(op)+" success")
}, fn.Name()+" success")
return res, nil
}

Expand Down
Loading

0 comments on commit 571a98c

Please sign in to comment.