Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticsearch: How to find unassigned shards and assign them?

I got 1 node, 1 shard, 1 replica architecture on my low-sources hardware configured machine. I have to keep Elasticsearch heap size at 20% of total memory and I index 1k~1m documents to Elasticsearch regarding hardware configuration. I have different kind of machines from 2GB to 16GB but since they are in 32bit architecture I can only use 300m to 1.5GB maximum memory for heap size.

For some reasons, I do not know why, Elasticsearch creates some index with unassigned shards and makes cluster health to red. I tried to recover and assign shards without creating a new node and transfer data into it because I should not. I also tried this configuration to reroute indices with this command:

curl -XPUT 'localhost:9200/_settings' -d '{
  "index.routing.allocation.disable_allocation": false
}'

Here is my node info:

{
  name: mynode
  transport_address: inet[/192.168.1.4:9300]
  host: myhost
  ip: 127.0.0.1
  version: 1.0.0
  build: a46900e
  http_address: inet[/192.168.1.4:9200]
  thrift_address: /192.168.1.4:9500
  attributes: {
    master: true
  }
  settings: {
    threadpool: {
      search: {
        type: fixed
        size: 600
        queue_size: 10000
      }
      bulk: {
        type: fixed
        queue_size: 10000
        size: 600
      }
      index: {
        type: fixed
        queue_size: 10000
        size: 600
      }
    }
    node: {
      data: true
      master: true
      name: mynode
    }
    index: {
      mapper: {
        dynamic: false
      }
      routing: {
        allocation: {
          disable_allocation: false
        }
      }
      store: {
        fs: {
          lock: none
        }
        compress: {
          stored: true
        }
      }
      number_of_replicas: 0
      analysis: {
        analyzer: {
          string_lowercase: {
            filter: lowercase
            tokenizer: keyword
          }
        }
      }
      cache: {
        field: {
          type: soft
          expire: 24h
          max_size: 50000
        }
      }
      number_of_shards: 1
    }
    bootstrap: {
      mlockall: true
    }
    gateway: {
      expected_nodes: 1
    }
    transport: {
      tcp: {
        compress: true
      }
    }
    name: mynode
    pidfile: /var/run/elasticsearch.pid
    path: {
      data: /var/lib/es/data
      work: /tmp/es
      home: /opt/elasticsearch
      logs: /var/log/elasticsearch
    }
    indices: {
      memory: {
        index_buffer_size: 80%
      }
    }
    cluster: {
      routing: {
        allocation: {
          node_initial_primaries_recoveries: 1
          node_concurrent_recoveries: 1
        }
      }
      name: my-elasticsearch
    }
    max_open_files: false
    discovery: {
      zen: {
        ping: {
          multicast: {
            enabled: false
          }
        }
      }
    }
  }
  os: {
    refresh_interval: 1000
    available_processors: 4
    cpu: {
      vendor: Intel
      model: Core(TM) i3-3220 CPU @ 3.30GHz
      mhz: 3292
      total_cores: 4
      total_sockets: 4
      cores_per_socket: 16
      cache_size_in_bytes: 3072
    }
    mem: {
      total_in_bytes: 4131237888
    }
    swap: {
      total_in_bytes: 4293591040
    }
  }
  process: {
    refresh_interval: 1000
    id: 24577
    max_file_descriptors: 65535
    mlockall: true
  }
  jvm: {
    pid: 24577
    version: 1.7.0_55
    vm_name: Java HotSpot(TM) Server VM
    vm_version: 24.55-b03
    vm_vendor: Oracle Corporation
    start_time: 1405942239741
    mem: {
      heap_init_in_bytes: 845152256
      heap_max_in_bytes: 818348032
      non_heap_init_in_bytes: 19136512
      non_heap_max_in_bytes: 117440512
      direct_max_in_bytes: 818348032
    }
    gc_collectors: [
      ParNew
      ConcurrentMarkSweep
    ]
    memory_pools: [
      Code Cache
      Par Eden Space
      Par Survivor Space
      CMS Old Gen
      CMS Perm Gen
    ]
  }
  thread_pool: {
    generic: {
      type: cached
      keep_alive: 30s
    }
    index: {
      type: fixed
      min: 600
      max: 600
      queue_size: 10k
    }
    get: {
      type: fixed
      min: 4
      max: 4
      queue_size: 1k
    }
    snapshot: {
      type: scaling
      min: 1
      max: 2
      keep_alive: 5m
    }
    merge: {
      type: scaling
      min: 1
      max: 2
      keep_alive: 5m
    }
    suggest: {
      type: fixed
      min: 4
      max: 4
      queue_size: 1k
    }
    bulk: {
      type: fixed
      min: 600
      max: 600
      queue_size: 10k
    }
    optimize: {
      type: fixed
      min: 1
      max: 1
    }
    warmer: {
      type: scaling
      min: 1
      max: 2
      keep_alive: 5m
    }
    flush: {
      type: scaling
      min: 1
      max: 2
      keep_alive: 5m
    }
    search: {
      type: fixed
      min: 600
      max: 600
      queue_size: 10k
    }
    percolate: {
      type: fixed
      min: 4
      max: 4
      queue_size: 1k
    }
    management: {
      type: scaling
      min: 1
      max: 5
      keep_alive: 5m
    }
    refresh: {
      type: scaling
      min: 1
      max: 2
      keep_alive: 5m
    }
  }
  network: {
    refresh_interval: 5000
    primary_interface: {
      address: 192.168.1.2
      name: eth0
      mac_address: 00:90:0B:2F:A9:08
    }
  }
  transport: {
    bound_address: inet[/0:0:0:0:0:0:0:0:9300]
    publish_address: inet[/192.168.1.4:9300]
  }
  http: {
    bound_address: inet[/0:0:0:0:0:0:0:0:9200]
    publish_address: inet[/192.168.1.4:9200]
    max_content_length_in_bytes: 104857600
  }
  plugins: [
    {
      name: transport-thrift
      version: NA
      description: Exports elasticsearch REST APIs over thrift
      jvm: true
      site: false
    }
  ]
}

The worst story is to find unassigned shard and delete belonging index but I would like to prevent creating unassigned shards.

Any idea?

like image 730
Fatih Karatana Avatar asked Sep 30 '22 07:09

Fatih Karatana


1 Answers

I found a logical solution and here what is applied with python: Please see comment blocks in code and any improvement will be appreciated:

type_pattern = re.compile(r"""
        (?P<type>\w*?)$ # Capture doc_type from index name
        """, re.UNICODE|re.VERBOSE)
# Get mapping content from mapping file
mapping_file = utilities.system_config_path + "mapping.json"
server_mapping = None

try:
    with open(mapping_file, "r") as mapper:
        mapping = json.loads(unicode(mapper.read()))
    # Loop all indices to get and find mapping
    all_indices = [index for index in self.__conn.indices.get_aliases().iterkeys()]
    for index in all_indices:
        # Gather doc_type from index name
        doc_type = type_pattern.search(index).groupdict("type")['type']

        index_mapping = self.__conn.indices.get_mapping(index=index)
        default_mapping = [key for key in [key for key in mapping[doc_type].itervalues()][0]["properties"].iterkeys()]

        if len(index_mapping) > 0:
            # Create lists by iter values to get columns and compare them either they are different or not
             server_mapping = [key for key in [key for key in index_mapping[index]["mappings"].itervalues()][0]["properties"].iterkeys()]

            # Check if index' status is red then delete it
            if self.__conn.cluster.health(index=index)["status"] == "red":
                # Then delete index
                self.__conn.indices.delete(index)
                print "%s has been deleted because of it was in status RED" % index

                self.__conn.indices.create(
                    index=index,
                    body={
                        'settings': {
                        # just one shard, no replicas for testing
                        'number_of_shards': 1,
                        'number_of_replicas': 0,
                           }
                        },
                        # ignore already existing index
                        ignore=400
                    )
                print "%s has been created." % index

                self.__conn.indices.put_mapping(
                        index=index,
                        doc_type=doc_type,
                        body=mapping[doc_type]
                    )
                print "%s mapping has been inserted." % index

                # Check if server mapping is different than what it is supposed to be
            elif server_mapping and len(set(server_mapping) - set(default_mapping)) > 0:
                # Delete recent mapping from server regarding index
                self.__conn.indices.delete_mapping(index=index, doc_type=doc_type)
                print "%s mapping has been deleted." % index

                # Put default mapping in order to match data store columns
                self.__conn.indices.put_mapping(
                    index=index,
                    doc_type=doc_type,
                    body=mapping[doc_type])
                print "%s mapping has been inserted." % index
                # Check if index is healthy but has no mapping then put mapping into
            elif len(index_mapping) == 0:
                print "%s has no mapping. Thus the default mapping will be pushed into it." % index

                self.__conn.indices.put_mapping(
                    index=index,
                    doc_type=doc_type,
                    body=mapping[doc_type])
                print "%s mapping has been inserted." % index
        return "Database has been successfully repaired."
 except:
     # Any exception you would like here
like image 112
Fatih Karatana Avatar answered Oct 04 '22 19:10

Fatih Karatana