Skip to content

Commit

Permalink
Add the sync dir
Browse files Browse the repository at this point in the history
  • Loading branch information
asim committed Jun 7, 2019
1 parent 8cd4d67 commit 3362364
Show file tree
Hide file tree
Showing 7 changed files with 736 additions and 0 deletions.
93 changes: 93 additions & 0 deletions sync/data/etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Package etcd is an etcd v3 implementation of kv
package etcd

import (
"context"
"log"

"github.com/micro/go-micro/sync/data"
client "go.etcd.io/etcd/clientv3"
)

type ekv struct {
kv client.KV
}

func (e *ekv) Read(key string) (*data.Record, error) {
keyval, err := e.kv.Get(context.Background(), key)
if err != nil {
return nil, err
}

if keyval == nil || len(keyval.Kvs) == 0 {
return nil, data.ErrNotFound
}

return &data.Record{
Key: string(keyval.Kvs[0].Key),
Value: keyval.Kvs[0].Value,
}, nil
}

func (e *ekv) Delete(key string) error {
_, err := e.kv.Delete(context.Background(), key)
return err
}

func (e *ekv) Write(record *data.Record) error {
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err
}

func (e *ekv) Dump() ([]*data.Record, error) {
keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix())
if err != nil {
return nil, err
}
var vals []*data.Record
if keyval == nil || len(keyval.Kvs) == 0 {
return vals, nil
}
for _, keyv := range keyval.Kvs {
vals = append(vals, &data.Record{
Key: string(keyv.Key),
Value: keyv.Value,
})
}
return vals, nil
}

func (e *ekv) String() string {
return "etcd"
}

func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}

var endpoints []string

for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}

if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}

// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}

return &ekv{
kv: client.NewKV(c),
}
}
178 changes: 178 additions & 0 deletions sync/data/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package memcached

import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"strings"
"time"

mc "github.com/bradfitz/gomemcache/memcache"
"github.com/micro/go-micro/sync/data"
)

type mkv struct {
Server *mc.ServerList
Client *mc.Client
}

func (m *mkv) Read(key string) (*data.Record, error) {
keyval, err := m.Client.Get(key)
if err != nil && err == mc.ErrCacheMiss {
return nil, data.ErrNotFound
} else if err != nil {
return nil, err
}

if keyval == nil {
return nil, data.ErrNotFound
}

return &data.Record{
Key: keyval.Key,
Value: keyval.Value,
Expiration: time.Second * time.Duration(keyval.Expiration),
}, nil
}

func (m *mkv) Delete(key string) error {
return m.Client.Delete(key)
}

func (m *mkv) Write(record *data.Record) error {
return m.Client.Set(&mc.Item{
Key: record.Key,
Value: record.Value,
Expiration: int32(record.Expiration.Seconds()),
})
}

func (m *mkv) Dump() ([]*data.Record, error) {
// stats
// cachedump
// get keys

var keys []string

//data := make(map[string]string)
if err := m.Server.Each(func(c net.Addr) error {
cc, err := net.Dial("tcp", c.String())
if err != nil {
return err
}
defer cc.Close()

b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc))

// get records
if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil {
return err
}

b.Flush()

v, err := b.ReadSlice('\n')
if err != nil {
return err
}

parts := bytes.Split(v, []byte("\n"))
if len(parts) < 1 {
return nil
}
vals := strings.Split(string(parts[0]), ":")
records := vals[1]

// drain
for {
buf, err := b.ReadSlice('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(string(buf), "END") {
break
}
}

b.Writer.Reset(cc)
b.Reader.Reset(cc)

if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil {
return err
}
b.Flush()

for {
v, err := b.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(v, "END") {
break
}
key := strings.Split(v, " ")[0]
keys = append(keys, strings.TrimPrefix(key, "key="))
}

return nil
}); err != nil {
return nil, err
}

var vals []*data.Record

// concurrent op
ch := make(chan *data.Record, len(keys))

for _, k := range keys {
go func(key string) {
i, _ := m.Read(key)
ch <- i
}(k)
}

for i := 0; i < len(keys); i++ {
record := <-ch

if record == nil {
continue
}

vals = append(vals, record)
}

close(ch)

return vals, nil
}

func (m *mkv) String() string {
return "memcached"
}

func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}

if len(options.Nodes) == 0 {
options.Nodes = []string{"127.0.0.1:11211"}
}

ss := new(mc.ServerList)
ss.SetServers(options.Nodes...)

return &mkv{
Server: ss,
Client: mc.New(options.Nodes...),
}
}
82 changes: 82 additions & 0 deletions sync/data/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package redis

import (
"github.com/micro/go-micro/sync/data"
redis "gopkg.in/redis.v3"
)

type rkv struct {
Client *redis.Client
}

func (r *rkv) Read(key string) (*data.Record, error) {
val, err := r.Client.Get(key).Bytes()

if err != nil && err == redis.Nil {
return nil, data.ErrNotFound
} else if err != nil {
return nil, err
}

if val == nil {
return nil, data.ErrNotFound
}

d, err := r.Client.TTL(key).Result()
if err != nil {
return nil, err
}

return &data.Record{
Key: key,
Value: val,
Expiration: d,
}, nil
}

func (r *rkv) Delete(key string) error {
return r.Client.Del(key).Err()
}

func (r *rkv) Write(record *data.Record) error {
return r.Client.Set(record.Key, record.Value, record.Expiration).Err()
}

func (r *rkv) Dump() ([]*data.Record, error) {
keys, err := r.Client.Keys("*").Result()
if err != nil {
return nil, err
}
var vals []*data.Record
for _, k := range keys {
i, err := r.Read(k)
if err != nil {
return nil, err
}
vals = append(vals, i)
}
return vals, nil
}

func (r *rkv) String() string {
return "redis"
}

func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}

if len(options.Nodes) == 0 {
options.Nodes = []string{"127.0.0.1:6379"}
}

return &rkv{
Client: redis.NewClient(&redis.Options{
Addr: options.Nodes[0],
Password: "", // no password set
DB: 0, // use default DB
}),
}
}
Loading

0 comments on commit 3362364

Please sign in to comment.