go 文件队列缓存处理系统

 : jank    :   : 2164    : 2018-07-28 01:44  go

闲话:很长时间没更新博客了,也许是换了新工作太忙的原因,但最主要还是自己没有花更多的业余时间来充电,后期会更努力驱动自己。

     一、 文件队列缓存处理系统,服务名字有点长,听起来绕耳,但这东西做起来也是有点恶心。

     二、 假如有个文件送检服务,流程是本地(外界访问不了)文件上传至云端服务处理,由于这个检测过程时间比较长,快则几分钟,慢则一小时。所以中间需要 本地不停地请求云端处理结果。给到这个项目,也许我们最先能想到的就是使用golang的map来处理,确实很方便。但是有个问题,假设之前送检的文件还未从云端取回检测结果,本地服务挂了,或是本地服务需重启,那么这些任务都将丢失,所以map肯定是不适合来做这项事情的。也考虑使用mysql和redis来处理,但是考虑到需要频繁的增删改,如果量大就更不合适(加入条件是只能跟服务一起部署单机版),所以选择使用文件来做件事情。

      三、下面先来看看整个文件处理系统的架构图:

jann.png


           

                                    架构图


        四、代码实现:

package file

import (
	"bufio"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"path"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

const (
	FILE_CACHE_LATEST_FILE_COUNT = 100 //开机启动时,只读缓存目录下中的100个缓存文件
)

var (
	FILE_CHAN_SPILLED_ERROR = errors.New("file cache,file chan spilled")
	PRO_CHAN_SPILLED_ERROR  = errors.New("file cache,pro chan spilled")
)

type FileCache struct {
	CloudFileSuccessCount uint64
	FilePrefix            string
	FileDir               string
	FileName              string
	MaxTimes              int   //each task performs the maximum number of times, if MaxTimes = 0, performs forever.
	TimeInterval          int64 //intervals of each execution
	FileChan              chan string
	ProChan               chan string
	File                  *os.File
	FileMutex             sync.RWMutex
	Func                  func(interface{}, int) bool
	GetStruct             func() interface{}
}

func NewFileCache(dir string, prefix string, mt int, ti int64, Operation func(interface{}, int) bool, Structs func() interface{}) (*FileCache, error) {
	if len(dir) < 1 {
		LogInfo("dir is null, next use default dir: /data/filecache/ ")
		dir = "/data/filecache/"
	}
	if len(prefix) < 1 {
		return nil, errors.New("prefix can`t be null")
	}
	if ti < 1 {
		return nil, errors.New("ti value must be greater than zero")
	}
	if Operation == nil {
		return nil, errors.New("Operation can`t be nil")
	}
	if Structs == nil {
		return nil, errors.New("Stucts cant`t be nil")
	}

	if _, err := os.Stat(dir); err != nil {
		LogInfo("NewFileCache: dir: %s not exists, next create", dir)
		if err = os.MkdirAll(dir, os.ModePerm); err != nil {
			return nil, err
		}

	}

	if dir[len(dir)-1:] != "/" {
		dir = dir + "/"
	}

	fc := &FileCache{
		FileDir:      dir,
		FilePrefix:   prefix,
		MaxTimes:     mt,
		TimeInterval: ti,
		FileChan:     make(chan string, 10000),
		ProChan:      make(chan string, 10000),
		Func:         Operation,
		GetStruct:    Structs,
	}
	go fc.Process()
	LogInfo("NewFileCache: Start fc.Process() success!")
	go fc.TransFileTask()
	LogInfo("NewFileCache: Start fc.TransFileTask() success!")
	go fc.FileTask()
	LogInfo("NewFileCache: Start fc.FileTask() success!")
	time.Sleep(2 * time.Second) //sleep 2 second, wait for goroutine start
	//file before processing
	fc.StartProcessCache()
	LogInfo("NewFileCache: start fc.StartProcessCache() success!")
	return fc, nil
}

func (fc *FileCache) StartProcessCache() error {
	fdir, err := ioutil.ReadDir(fc.FileDir)
	if err != nil {
		LogError("FileCache.StartProcessCache: ioutil.ReadDir failed(%v)", err)
		return err
	}

	var fileCaches []string
	for _, f := range fdir {
		if !f.IsDir() && strings.HasPrefix(f.Name(), fc.FilePrefix) {
			path := fc.FileDir + f.Name()
			LogDebug("FileCache.StartProcessCache: add the existing file: %s", path)
			fileCaches = append(fileCaches, path)
		}
	}
	lenCs := len(fileCaches)
	if lenCs > FILE_CACHE_LATEST_FILE_COUNT { //超出指定个数,则取最后*个
		LogDebug("FileCache.StartProcessCache: more than %v", FILE_CACHE_LATEST_FILE_COUNT)
		for i := 0; i < lenCs; i++ {
			if lenCs-i <= FILE_CACHE_LATEST_FILE_COUNT {
				if fc.FileChan != nil {
					LogInfo("FileCache.StartProcessCache: read cache file: %s", fileCaches[i])
					//	fc.FileChan <- fileCaches[i]
					fc.PushFileChan(fileCaches[i])
				}
			} else { //剩下的都删掉
				err = os.Remove(fileCaches[i])
				if err != nil {
					LogWarn("FileCache.StartProcessCache: os.Remove(%s) failed(%s)", fileCaches[i], err)
				}
				LogInfo("FileCache.StartProcessCache: remove cache file: %s success!", fileCaches[i])
			}
		}
	} else { //未超出个数则取全部
		for _, v := range fileCaches {
			if fc.FileChan != nil {
				LogInfo("FileCache.StartProcessCache: read cache file: %s", v)1
				fc.PushFileChan(v)
			}
		}
	}
	return nil

}

func (fc *FileCache) PushFileChan(data string) error {

	if fc.FileChan == nil {
		LogError("FileCache.PushFileChan: file chan is nil")
		return errors.New("file chan is nil")
	}

	select {
	case fc.FileChan <- data:
	default:
		LogWarn("FileCache.PushFileChan: file chan spilled")
		return FILE_CHAN_SPILLED_ERROR
	}
	return nil
}

func (fc *FileCache) PushProChan(data string) error {
	if fc.ProChan == nil {
		LogError("FileCache.PushProChan: pro chan is nil")
		return errors.New("pro chan is nil")
	}

	select {
	case fc.ProChan <- data:
	default:
		LogWarn("FileCache.PushFileChan: pro chan spilled")
		return PRO_CHAN_SPILLED_ERROR
	}
	return nil
}

//add data to open files
func (fc *FileCache) AddFileData(jsonStr string) error {
	fc.FileMutex.Lock()
	defer fc.FileMutex.Unlock()
	if fc.File == nil {
		fc.File, _ = os.OpenFile(fc.FileName, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm|os.ModeTemporary)
	}
	fc.File.Write([]byte(jsonStr + "
"))
	return nil
}

/*
	the file name in the cache is updated every second. If the content of the cache is not empty, it will be closed and sent to the file
*/
func (fc *FileCache) FileTask() {
	for {
		if fc.FileChan == nil {
			LogInfo("FileCache.FileTask: FileChan is nil, return")
			return
		}
		FileName := fmt.Sprintf("%s%s-%d-%d", fc.FileDir, fc.FilePrefix, time.Now().Unix(), 0)
		fc.FileMutex.Lock()
		if strings.Compare(FileName, fc.FileName) > 0 {
			if fc.File != nil {
				fc.File.Close()
				fc.PushFileChan(fc.FileName)
				fc.File = nil
			}
			fc.FileName = FileName
		}
		fc.FileMutex.Unlock()
		time.Sleep(1 * time.Second)
	}
}

/**
  transfer the file name to be processed
*/
func (fc *FileCache) TransFileTask() {
	count := 0
	process := 0
	for {
		LogDebug("FileCache.TransFileTask: start TransFileTask, now filechan count: %d, process: %d", len(fc.FileChan), process)
		if fc.FileChan == nil {
			LogError("FileCache.TransFileTash: filechan is nil")
			return
		}
		filename := <-fc.FileChan
		LogDebug("FileCache.TransFileTash: filename is %v", filename)
		if len(filename) < 1 {
			LogError("FileCache.TransFileTask: FileName is null ")
			continue
		}
		process += 1
		basename := path.Base(filename)
		splitArr := strings.Split(basename, "-")
		if len(splitArr) != 3 || splitArr[0] != fc.FilePrefix {
			LogError("FileCache.TransFileTask: filename: %s not valid", filename)
			continue
		}

		nums, _ := strconv.Atoi(splitArr[2])
		//if MaxTimes = 0 that caching forever
		if fc.MaxTimes != 0 {
			if nums >= fc.MaxTimes {
				err := os.Remove(filename)
				if err != nil {
					LogError("FileCache.TransFileTask: os.Remove(%s), failed(%s)", filename, err)
				}
				continue
			}
		}

		runTime, err := strconv.ParseInt(splitArr[1], 10, 64)
		if err != nil {
			LogError("FileCache.TransFileTask: time.ParseInt(%s) failed(%s)", splitArr[1], err)
			continue
		}

		// if need to process
		if time.Now().Unix() > runTime {
			outputFileName := fmt.Sprintf("%s%s-%d-%d", fc.FileDir, fc.FilePrefix, time.Now().Unix()+fc.TimeInterval, nums+1)
			fc.PushProChan(filename + "===" + outputFileName)

		} else {
			if process > count {
				process = 0
				time.Sleep(3 * time.Second)
				count = len(fc.FileChan)
			}
			fc.PushFileChan(filename)
		}
	}
}

/**
traversal files and Processing each row of data
*/

func (fc *FileCache) Process() {
	for {
		if fc.ProChan == nil {
			LogError("FileCache.Process: ProChan close, return")
			return
		}
		LogDebug("FileCache.Process.start Handle, now ProChan count: %d", len(fc.ProChan))
		fileNames := <-fc.ProChan
		if len(fileNames) < 1 {
			LogWarn("FileCache.Process: filename is null")
			continue
		}
		fileArr := strings.Split(fileNames, "===")
		if len(fileArr) != 2 {
			LogError("FileCache.Process: fileNames: %s error", fileNames)
			continue
		}
		inputFilename := fileArr[0]
		outputFilename := fileArr[1]
		arrs := strings.Split(fileArr[0], "-")
		if len(arrs) != 3 {
			LogError("FileCache.Process: fileName: %s error", fileArr[0])
			continue
		}
		nums, err := strconv.Atoi(arrs[2]) //已检测次数
		if err != nil {
			LogError("FileCache.Process: fileName: %s, strconv.Atoi failed(%s)", fileArr[0], err)
			continue
		}

		var outputFile *os.File = nil
		// processing inputfile
		pfile, _ := os.OpenFile(inputFilename, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm|os.ModeTemporary)
		buffer := bufio.NewReader(pfile)
		for {
			tmpLine, err := buffer.ReadString('
')

			if (io.EOF == err) || (err != nil) {
				break
			}

			cd := fc.GetStruct()
			err = json.Unmarshal([]byte(tmpLine), cd)
			if err != nil {
				LogError("FileCache.Process: json.Unmarshal() failed(%s)", err)
				continue
			}

			//processing and obtaining results, if not true,add to a new file
			if !fc.Func(cd, nums) {
				if outputFilename != "" {
					if outputFile == nil {
						outputFile, _ = os.OpenFile(outputFilename, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm|os.ModeTemporary)
					}

					jsonStr, _ := json.Marshal(cd)
					outputFile.Write([]byte(string(jsonStr) + "
"))
				}
			} else {
				atomic.AddUint64(&fc.CloudFileSuccessCount, 1)
			}
		}

		if outputFile != nil {
			outputFile.Close()
			if fc.FileChan != nil {
				//	fc.FileChan <- outputFilename
				fc.PushFileChan(outputFilename)
			}
		}

		pfile.Close()
		if err = os.Remove(inputFilename); err != nil {
			LogError("FileCache.Process: file: %s delete error: %s", inputFilename, err)
		}
	}
}

func (fc *FileCache) Close() {
	fc.FileMutex.Lock()
	defer fc.FileMutex.Unlock()
	if fc.FileChan != nil {
		close(fc.FileChan)
		fc.FileChan = nil
		LogInfo("FileCache.Close: FileChan close Success!")
	}

	if fc.ProChan != nil {
		close(fc.ProChan)
		fc.ProChan = nil
		LogInfo("FileCache.Close: ProChan close Success!")
	}

}

func (fc *FileCache) Status() interface{} {
	return fc
}

        

          五、test

package file

import (
	//	"encoding/json"
	"fmt"
	"math/rand"
	//	"runtime"
	//	"sync"
	"encoding/json"
	. "tdp/util"
	"testing"
	"time"
)

var test_FileCache *FileCache

func TestNewFileCache(t *testing.T) {
	var err error
	test_FileCache, err = NewFileCache("/data/filecache", "check_file_test", 6, 10, requestCloud, newCheckFileCloudData)
	if err != nil {
		t.Error(err)
	}
	fmt.Printf("%#v", test_FileCache)
	data := &checkFileCloudData{
		UploadKey: "jank.com",
	}
	d, _ := json.Marshal(data)
	test_FileCache.AddFileData(string(d))
}

//自定义处理函数
func requestCloud(data interface{}, nums int) bool {
	if data == nil {
		return false
	}
	d := data.(*checkFileCloudData)
	a := rand.Intn(100)
	if a%2 == 0 {
		fmt.Println("true, numes: ", nums)
		return true
	} else {
		d.UpdateTime = time.Now().Format("2006/01/02 15:04:05")
		LogDebug("false, numes: %d ", nums)
		return false
	}

	return false
}

func BenchmarkAddFileData(b *testing.B) {
	LogInfo("BenchmarkAddFileData: start now!")
	for i := 0; i < b.N; i++ {
		data := &checkFileCloudData{
			UploadKey: fmt.Sprintf("jank%d.com", i*2),
		}
		d, _ := json.Marshal(data)
		test_FileCache.AddFileData(string(d))
	}

}

//自定义文件内容结构体
type checkFileCloudData struct {
	UploadKey  string `json:"upload_key"`
	UpdateTime string `json:"update_time"`
}

func newCheckFileCloudData() interface{} {
	return &checkFileCloudData{}
}

        最后,文件缓存队列处理系统也不是只有以上方法,只是针对场景不一,后期会新增另一种文件缓存队列。

      githup地址: https://github.com/jank1369/filecache

   

备案编号:赣ICP备15011386号

联系方式:qq:1150662577    邮箱:1150662577@qq.com