文章摘要
这篇文章描述了一个Go语言实现的“服务注册中心”(Registry Service)框架,主要用于管理服务注册、更新和删除操作。该框架支持客户端向注册中心提交服务变更请求,并通过心跳检测确保服务的存活状态。文章的核心内容包括:
1. **服务注册与更新**:通过`add`函数将服务添加到注册中心,`sendRequiredServices`函数生成更新请求并提交给服务。
2. **心跳检测与服务存活管理**:通过`heartbeat`函数定期检查服务存活状态,并在心跳成功时重新注册服务,失败时进行服务回收。
3. **互斥机制**:使用`sync.RWMutex`确保并发安全,防止多个客户端或服务同时修改注册中心的数据。
4. **服务通知机制**:通过`notify`函数将服务变更通知客户端,并通过HTTP POST提交更新请求。
文章通过代码示例展示了服务注册中心的完整实现,强调了其在分布式系统中维护服务状态和确保服务可用性方面的应用价值。
package registry
import (
“bytes”
“encoding/json”
“fmt”
“io/ioutil”
“log”
“net/http”
“sync”
“time”
)
const ServerPort=”:3000″
const ServicesURL=”http://localhost” + ServerPort + “/services” // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations=append(r.registrations, reg)
r.mutex.Unlock()
err :=r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg :=range r.registrations {
go func(reg Registration) {
for _, reqService :=range reg.RequiredServices {
p :=patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate :=false
for _, added :=range fullPatch.Added {
if added.Name==reqService {
p.Added=append(p.Added, added)
sendUpdate=true
}
}
for _, removed :=range fullPatch.Removed {
if removed.Name==reqService {
p.Removed=append(p.Removed, removed)
sendUpdate=true
}
}
if sendUpdate {
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg :=range r.registrations {
for _, reqService :=range reg.RequiredServices {
if serviceReg.ServiceName==reqService {
p.Added=append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err :=json.Marshal(p)
if err !=nil {
return err
}
_, err=http.Post(url, “application/json”, bytes.NewBuffer(d))
if err !=nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i :=range reg.registrations {
if reg.registrations[i].ServiceURL==url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations=append(reg.registrations[:i], reg.registrations[i+1:]…)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf(“Service at URL %s not found”, url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg :=range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success :=true
for attemps :=0; attemps < 3; attemps++ {
res, err :=http.Get(reg.HeartbeatURL)
if err !=nil {
log.Println(err)
} else if res.StatusCode==http.StatusOK {
log.Printf(“Heartbeat check passed for %v”, reg.ServiceName)
// 如果心跳恢复了,把服务重新注册回来
if !success {
r.add(reg)
}
break;
}
// 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了
log.Printf(“Heartbeat check failed for %v”, reg.ServiceName)
if success {
success=false
r.remove(reg.ServiceURL)
}
time.Sleep(1 * time.Second)
}
}(reg)
wg.Wait()
time.Sleep(freq)
}
}
}
var once sync.Once
func SetupRegistryService() {
// 保证执行一次进行服务到心跳 每三秒循环一遍
once.Do(func() {
go reg.heartbeat(3 * time.Second)
})
}
var reg=registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(“Request received”)
switch r.Method {
case http.MethodPost:
dec :=json.NewDecoder(r.Body)
var r Registration
err :=dec.Decode(&r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf(“Adding service: %v with URL: %s\n”, r.ServiceName,
r.ServiceURL)
err=reg.add(r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err :=ioutil.ReadAll(r.Body)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url :=string(payload)
log.Printf(“Removing service at URL: %s”, url)
err=reg.remove(url)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}
import (
“bytes”
“encoding/json”
“fmt”
“io/ioutil”
“log”
“net/http”
“sync”
“time”
)
const ServerPort=”:3000″
const ServicesURL=”http://localhost” + ServerPort + “/services” // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations=append(r.registrations, reg)
r.mutex.Unlock()
err :=r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg :=range r.registrations {
go func(reg Registration) {
for _, reqService :=range reg.RequiredServices {
p :=patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate :=false
for _, added :=range fullPatch.Added {
if added.Name==reqService {
p.Added=append(p.Added, added)
sendUpdate=true
}
}
for _, removed :=range fullPatch.Removed {
if removed.Name==reqService {
p.Removed=append(p.Removed, removed)
sendUpdate=true
}
}
if sendUpdate {
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg :=range r.registrations {
for _, reqService :=range reg.RequiredServices {
if serviceReg.ServiceName==reqService {
p.Added=append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err :=json.Marshal(p)
if err !=nil {
return err
}
_, err=http.Post(url, “application/json”, bytes.NewBuffer(d))
if err !=nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i :=range reg.registrations {
if reg.registrations[i].ServiceURL==url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations=append(reg.registrations[:i], reg.registrations[i+1:]…)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf(“Service at URL %s not found”, url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg :=range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success :=true
for attemps :=0; attemps < 3; attemps++ {
res, err :=http.Get(reg.HeartbeatURL)
if err !=nil {
log.Println(err)
} else if res.StatusCode==http.StatusOK {
log.Printf(“Heartbeat check passed for %v”, reg.ServiceName)
// 如果心跳恢复了,把服务重新注册回来
if !success {
r.add(reg)
}
break;
}
// 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了
log.Printf(“Heartbeat check failed for %v”, reg.ServiceName)
if success {
success=false
r.remove(reg.ServiceURL)
}
time.Sleep(1 * time.Second)
}
}(reg)
wg.Wait()
time.Sleep(freq)
}
}
}
var once sync.Once
func SetupRegistryService() {
// 保证执行一次进行服务到心跳 每三秒循环一遍
once.Do(func() {
go reg.heartbeat(3 * time.Second)
})
}
var reg=registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(“Request received”)
switch r.Method {
case http.MethodPost:
dec :=json.NewDecoder(r.Body)
var r Registration
err :=dec.Decode(&r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf(“Adding service: %v with URL: %s\n”, r.ServiceName,
r.ServiceURL)
err=reg.add(r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err :=ioutil.ReadAll(r.Body)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url :=string(payload)
log.Printf(“Removing service at URL: %s”, url)
err=reg.remove(url)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。