Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Load data from reading files during startup and then process new files and clear old state from the map

I am working on a project where during startup I need to read certain files and store it in memory in a map and then periodically look for new files if there are any and then replace whatever I had in memory in the map earlier during startup with this new data. Basically every time if there is a new file which is a full state then I want to refresh my in memory map objects to this new one instead of appending to it.

Below method loadAtStartupAndProcessNewChanges is called during server startup which reads the file and store data in memory. Also it starts a go-routine detectNewFiles which periodically checks if there are any new files and store it on a deltaChan channel which is later accessed by another go-routine processNewFiles to read that new file again and store data in the same map. If there is any error then we store it on err channel. loadFiles is the function which will read files in memory and store it in map.

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

This method basically figures out if there are any new files that needs to be consumed and if there is any then it will put it on the deltaChan channel which will be later on consumed by processNewFiles go-routine and read the file in memory. If there is any error then it will add error to the error channel.

func (r *customerConfig) detectNewFiles(rootPath string) {

}

This will read all s3 files and store it in memory and return error. In this method I clear previous state of my map so that it can have fresh state from new files. This method is called during server startup and also called whenever we need to process new files from processNewFiles go-routine.

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

This method read the files and add in the data concurrent map.

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

This method will pick what is there on the delta channel and if there are any new files then it will start reading that new file by calling loadFiles method. If there is any error then it will add error to the error channel.

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call "loadFiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

If there is any error on the error channel then it will log those errors from below method -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

Problem Statement

Above logic works for me without any issues but there is one small bug in my code which I am not able to figure out on how to solve it. As you can see I have a concurrent map which I am populating in my read method and also clearing that whole map in loadFiles method. Because whenever there is a new file on delta channel I don't want to keep previous state in the map so that's why I am removing everything from the map and then adding new state from new files to it.

Now if there is any error in read method then the bug happens bcoz I have already cleared all the data in my data map which will have empty map which is not what I want. Basically if there is any error then I would like to preserve previous state in the data map. How can I resolve this issue in my above current design.

Note: I am using golang concurrent map

like image 644
dragons Avatar asked Oct 18 '25 06:10

dragons


1 Answers

I think your design is over complicated. It can be solved much simpler, which gives all the benefits you desire:

  • safe for concurrent access
  • detected changes are reloaded
  • accessing the config gives you the most recent, successfully loaded config
  • the most recent config is always, immediately accessible, even if loading a new config due to detected changes takes long
  • if loading new config fails, the previous "snapshot" is kept and remains the current
  • as a bonus, it's much simpler and doesn't even use 3rd party libs

Let's see how to achieve this:


Have a CustomerConfig struct holding everything you want to cache (this is the "snapshot"):

type CustomerConfig struct {
    Data map[string]bool

    // Add other props if you need:
    LoadedAt time.Time
}

Provide a function that loads the config you wish to cache. Note: this function is stateless, it does not access / operate on package level variables:

func loadConfig() (*CustomerConfig, error) {
    cfg := &CustomerConfig{
        Data:     map[string]bool{},
        LoadedAt: time.Now(),
    }

    // Logic to load files, and populate cfg.Data
    // If an error occurs, return it

    // If loading succeeds, return the config
    return cfg, nil
}

Now let's create our "cache manager". The cache manager stores the actual / current config (the snapshot), and provides access to it. For safe concurrent access (and update), we use a sync.RWMutex. Also has means to stop the manager (to stop the concurrent refreshing):

type ConfigCache struct {
    configMu sync.RWMutex
    config   *CustomerConfig
    closeCh  chan struct{}
}

Creating a cache loads the initial config. Also launches a goroutine that will be responsible to periodically check for changes.

func NewConfigCache() (*ConfigCache, error) {
    cfg, err := loadConfig()
    if err != nil {
        return nil, fmt.Errorf("loading initial config failed: %w", err)
    }

    cc := &ConfigCache{
        config:  cfg,
        closeCh: make(chan struct{}),
    }

    // launch goroutine to periodically check for changes, and load new configs
    go cc.refresher()

    return cc, nil
}

The refresher() periodically checks for changes, and if changes are detected, calls loadConfig() to load new data to be cached, and stores it as the current / actual config (while locking configMu). It also monitors closeCh to stop if that is requested:

func (cc *ConfigCache) refresher() {
    ticker := time.NewTicker(1 * time.Minute) // Every minute
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Check if there are changes
            changes := false // logic to detect changes
            if !changes {
                continue // No changes, continue
            }

            // Changes! load new config:
            cfg, err := loadConfig()
            if err != nil {
                log.Printf("Failed to load config: %v", err)
                continue // Keep the previous config
            }

            // Apply / store new config
            cc.configMu.Lock()
            cc.config = cfg
            cc.configMu.Unlock()

        case <-cc.closeCh:
            return
        }
    }
}

Closing the cache manager (the refresher goroutine) is as easy as:

func (cc *ConfigCache) Stop() {
    close(cc.closeCh)
}

The last missing piece is how you access the current config. That's a simple GetConfig() method (that also uses configMu, but in read-only mode):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
    cc.configMu.RLock()
    defer cc.configMu.RUnlock()
    return cc.config
}

This is how you can use this:

cc, err := NewConfigCache()
if err != nil {
    // Decide what to do: retry, terminate etc.
}

// Where ever, whenever you need the actual (most recent) config in your app:

cfg := cc.GetConfig()
// Use cfg

Before you shut down your app (or you want to stop the refreshing), you may call cc.Stop().

like image 185
icza Avatar answered Oct 20 '25 06:10

icza



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!