toptal / chewy

High-level Elasticsearch Ruby framework based on the official elasticsearch-ruby client
MIT License
1.89k stars 368 forks source link

reindex documents from Mysql #346

Closed dang-hoang-hieu closed 7 years ago

dang-hoang-hieu commented 8 years ago

I have a problem when reindex nearly 400k documents from Mysql to ES. When chewy send request to ES is extremely fast! but when query documents from Mysql, it took more than 10 hours. When I examine query log, it seems that my model have lots of belongs_to connection, so for each belongs_to it query to Mysql again.

Then I think if I can prepare reindex data first in json (by optimizing Mysql query) then send manually to Chewy for reindex in ES or send directly to ES!

is possible to send data through Chewy to reindex in ES? Or could you suggest me any better ideas? thanks a lot

pyromaniac commented 8 years ago

To optimize reindexing and solve n+1 problem you have to preload associations:

define_type YourModel.includes(:assoc1, :assoc2) do
  ...
end

So exacly the same way as usually. Also you can use https://github.com/toptal/chewy#crutches-technology if preloading will be slow for you.

dang-hoang-hieu commented 8 years ago

sorry for late reply ;( @pyromaniac thanks for your help, I can include assocs to avoid N + 1 :) But it still take hours to reindex so I need a better solution! My app situation exactly like (here](https://www.elastic.co/blog/changing-mapping-with-zero-downtime) So can I use chewy for solution mentioned above link? there are cases:

  1. Add fields to Mysql and need to update mapping, values to ES
  2. Change field types and need to update to ES
  3. rename fields to ES can I use chewy rake chewy:update to automatically update? or other supported methods of Chewy? thanks a lot!
pyromaniac commented 8 years ago

rake chewy:reset does exactly the thing described in those link.

dang-hoang-hieu commented 8 years ago

rake chewy:reset take too long for 400k documents as in the issue above. I add associations but chewy limit to 61 references and it still takes hours.

pyromaniac commented 8 years ago

What do you mean by limit to 61 references ?

dang-hoang-hieu commented 8 years ago

ah it Mysql

Mysql2::Error: Too many tables; MySQL can only use 61 tables in a join

I have more than 90 tables to join

pyromaniac commented 8 years ago

Have no idea why all the includes are converted to joins, they supposed to be simply additional queries. Anyway, you are able to use https://github.com/toptal/chewy#crutches-technology instead, it will probably do the trick

pyromaniac commented 8 years ago

Or probably you are doing something wrong, could you show your type definition please?

dang-hoang-hieu commented 8 years ago
PARAMS_CANDIDATE_MULTI = Candidate::PARAMS_CANDIDATE_MULTI.first.map do |param|
    param.first.to_s.sub("_ids", "").pluralize.to_sym
  end
  PARAMS_FITTING_MULTI = [:desire_business_types, :desire_occupational_types, :desire_areas]
  PARAMS_CANDIDATE_MULTI_NAME = (PARAMS_CANDIDATE_MULTI + PARAMS_FITTING_MULTI)
    .map{|param| "#{param}_names"}
  PARAMS_CANDIDATE_REFERENCE_NAME = Candidate::PARAMS_CANDIDATE_REFERENCE.map do |param|
    param.to_s.sub("_id", "_name").to_sym
  end
  FULL_PARAMS_SEARCH_CANDIDATE = Candidate::PARAMS_CANDIDATE_STRING + PARAMS_CANDIDATE_MULTI_NAME +
    PARAMS_CANDIDATE_REFERENCE_NAME + [:id]
  INCLUDED_ASSOCIATIONS = PARAMS_CANDIDATE_MULTI + [:fitting_condition]
  INCLUDED_ASSOCIATIONS += [:gender, :rank, :phase, :counseling_type, :progresses, :prefecture, :release_reason, :close_reason,
    :employment_status, :marriage, :adviser, :desire, :move, :residence_type, :interview_time, :excel, :english,
    :axis1, :axis2, :axis3, :work_procedure1, :work_procedure2, :work_procedure3, :social_disposition1, :social_disposition2,
    :two_week, :one_month, :two_month, :six_month, :one_year, :one_year_six_month, :two_year, :two_year_six_month, :three_year,
    :en_judgment, :njnr_flag, :assistant, :call_center_charge, :call_center_action, :matriculant_type, :cc_result, :resources_type,
    :interview_phase, :nikkei_career_phase, :mainabi_agent_phase, :astamuse_type, :astamuse_status, :en_leave_flag, :axis_introduction_phase,
    :natural_inflow_phase, :attached_file, :know_reason, :hp_inflow_wicket, :created_by, :updated_by, :cc_assigned_to, :counseling_date_assigned_to,
    :turnover_period, :possible_duplicate
  ]
  INCLUDED_ASSOCIATIONS += [
    :candidates_m_shuffle_box_teams, :shuffle_box_teams, :shuffle_box_teams, :shuffle_box_divisions, :candidates_m_business_categories,
    :experience_business_categories, :candidates_m_occupational_categories, :experience_occupational_categories, :candidates_m_pational_categories,
    :main_pational_categories, :candidates_m_areas, :areas, :candidates_m_matriculant_types, :matriculant_statuses, :candidates_m_academic_backgrounds,
    :academic_background_judments, :candidates_m_resource_ranks, :precious_resources, :candidates_m_human_resource_types, :resource_groups,
    :candidates_m_resources_type_searches, :resources_type_searches, :progresses, :work_flows, :fitting_results, :action_logs, :history_calls,
    :possible_duplicated_candidates, :possible_duplicates, :candidates_m_distribution_teams, :candidates_m_distribution_divisions,
    :fitting_condition, :form_requirement, :sender, :receiver, :cc_receiver, :bcc_receiver, :candidate_test, :import_failure_data, :origin_candidate,
    :jobs_have_fitting_score, :jobs_in_progress
  ]

define_type Candidate.includes(*INCLUDED_ASSOCIATIONS) do
    field *Candidate::PARAMS_CANDIDATE_STRING, analyzer: "custom_analyzer"
    field *Candidate::PARAMS_CANDIDATE_REFERENCE, :id, :created_by_id, :updated_by_id
    field *Candidate::PARAMS_CANDIDATE_NUMBER, type: "long"
    field *Candidate::PARAMS_CANDIDATE_DATE, :created_at, :updated_at, type: "date"
    PARAMS_CANDIDATE_MULTI.each do |param|
      field param, index: "not_analyzed", value: ->{send(param).map &:id}
      field "#{param}_names", analyzer: "custom_analyzer", value: ->{send(param).map &:name}
    end
    field *PARAMS_CANDIDATE_REFERENCE_NAME, analyzer: "custom_analyzer"
    PARAMS_FITTING_MULTI.each do |param|
      case param
      when :desire_areas
        master_model, middle_model = M::Location, FittingConditionsMArea
      when :desire_occupational_types
        master_model, middle_model = M::JobName, FittingConditionsMOccupationalCategory
      when :desire_business_types
        master_model, middle_model = M::BusinessCategory, FittingConditionsMBusinessCategory
      end
      crutch param do |candidates|
        candidate_ids = candidates.map &:id
        fitting_condition_map = FittingCondition.by_owner_ids(candidate_ids)
          .inject({}) do |hash, fitting_condition|
          hash[fitting_condition.id] = fitting_condition.owner_id
          hash
        end
        data = middle_model.desire_by_candidates(fitting_condition_map.values)
          .inject({}) do |hash, item|
          candidate_id = fitting_condition_map[item.fitting_condition_id]
          master_item = master_model.find_by id: item.master_data_id
          hash[candidate_id] ||= []
          hash[candidate_id] << master_item
          hash
        end
      end
      field param, index: "not_analyzed", value: ->(candidate, crutches) do
        if crutches.send(param)[candidate.id]
          crutches.send(param)[candidate.id].map &:id
        end
      end
      field "#{param}_names", analyzer: "custom_analyzer", value: ->(candidate, crutches) do
        if crutches.send(param)[candidate.id]
          crutches.send(param)[candidate.id].map &:name
        end
      end
    end
    field :desire_income, type: "long", value: ->{fitting_condition_desire_income}
    # field :number_interview_jobs, value: ->{progresses.first_interviewed.count}
    field :number_interview_jobs, value: ->{progresses.to_a.select{|p| p.date_pass_first_interview.present?}.size}
    # field :demo2, type: "string"
  end

I think because of belongs_to references so I have to join

pyromaniac commented 8 years ago

Wow! Unexpectable! Cruches might help in this case, I believe. Also do you really need all this association for this particular type? I don't see enough fields, did you omit them?

pyromaniac commented 8 years ago

Also I feel you pain, bro

pyromaniac commented 8 years ago

I'm actually experiencing a desire to hug you. Are you able to switch to PG or reduce your schema maybe?

dang-hoang-hieu commented 8 years ago

Many fields are hidden in Candidate model :) which have more than 90 reference including has_many and belongs_to. My colleague has added Cruches recently, as you can see above, but it seem not help much :( thanks bro, I really need a hug now =)) I dont think we can switch to postgresql or reduce the schema :( I think about a way to do partial update the index

pyromaniac commented 8 years ago

Yeah, probably chewy doesn't fit your requirements currently, or you have to figure out a way to make it fit.

You can use crutches for those belongs_to associations preload and pass them somehow optionally to methods to replace associations usage while indexing, it should actually help.

pyromaniac commented 8 years ago

Also how are you going to optimize your SQL query you've mentioned? You are actually able to pass any query to chewy type definition, so if you have optimized query which works fast - you are simply able to use it.

dang-hoang-hieu commented 8 years ago

I think about 2 options :)

  1. as you said, I try crutches to preload belongs_to and optimizing SQL query to avoid N + 1 query
  2. manually, for each migrations, I can know which field is add, edit or delete from mysql. Then query only those fields, pull old index from ES and manually change mapping and value. But I dont have much exp to work with ES API so this seem overwhelming and take time!
pyromaniac commented 8 years ago

The point is that you are unable to update ES mapping. It is impossible. I mean, you can add fields probably, but change existing ones - nope. So you are really limited using the second solution. Also chewy doesn't support partial index updates specifically because of the reason above

dang-hoang-hieu commented 8 years ago

Could you please explain more for me, I tested to add new fields to ES, then POST /candidates/candidate/id/_update and then I can GET /candidates/candidate/_search, so is it index updated partially? I am confused in this case many thanks :)

dang-hoang-hieu commented 8 years ago

ah, mistake, it's adding new fields! about changing existing ones, I can pull old index, and edit mapping in place? then push back to ES

pyromaniac commented 8 years ago

Have no idea, you have to experiment on it, but probably it is possible, not sure how reliable is it however.

dang-hoang-hieu commented 8 years ago

ok thanks a lot, I will try :)

pyromaniac commented 8 years ago

Welcome, feel free to ask more question if you have

dang-hoang-hieu commented 8 years ago

Can I change ES _id to use mysql id in chewy?

pyromaniac commented 8 years ago

Sure you can, it is actually used by default

dang-hoang-hieu commented 8 years ago

really? because I dont see _id use id from mysql? I have to set it in settings or define_type?

pyromaniac commented 8 years ago

if you want to redefine it:

define_type ... do
  root _id: -> { custom_id } do
    field ...
    field ...
  end
end
dang-hoang-hieu commented 8 years ago

thanks I will try I refactoring sql query and use crutch, but I have error like can not call Candidate model inside define_type Candidate inside crutch:

ArgumentError: wrong number of arguments (1 for 0) #when call candidates.select().joins....
NoMethodError: private method `select' called for nil:NilClass # when call Candidate.where().select...
dang-hoang-hieu commented 8 years ago

It seems that the problem lie in buliding json for ES I optimize to reduce query, but time for chewy handling index still slow

dang-hoang-hieu commented 8 years ago

around 100 fields: 8 field for date, 60 for custom_analyzer, the rest are string type :(

dang-hoang-hieu commented 8 years ago

it's possible to use multi-threading for chewy?

pyromaniac commented 8 years ago

https://github.com/toptal/chewy/pull/321

dang-hoang-hieu commented 8 years ago

wow, awesome, I will give it a try! :hug:

pyromaniac commented 8 years ago

Hey, could you try new Witchcraft™ feature? You have to add witchcraft! call right after define_type line like this:

define_type YourType do
  witchcraft!

  field :first
  field :second
end

It should work or not :) But if it will work, it will probably speed-up your import up to 20% Don't forget to get the master master though!

dang-hoang-hieu commented 8 years ago

@pyromaniac thanks I will try it :D

pyromaniac commented 8 years ago

https://github.com/toptal/chewy/#witchcraft-technology Docs added btw

dang-hoang-hieu commented 8 years ago

@pyromaniac I have this error

(eval):13:in `block in class_eval': undefined local variable or method `param' for #<Class:#<Chewy::Type::Witchcraft::Cauldron:0x007faf7d0de998>> (NameError)
pyromaniac commented 8 years ago

Seems like something with your type definition could you post it?

pyromaniac commented 8 years ago

Most probably it is a variable from binding, you may have to get rid of it somehow

dang-hoang-hieu commented 8 years ago
define_type Candidate.includes(*INCLUDED_ASSOCIATIONS) do
    witchcraft!
    field *Candidate::PARAMS_CANDIDATE_STRING, analyzer: "custom_analyzer"
    field *Candidate::PARAMS_CANDIDATE_REFERENCE, :id, :created_by_id, :updated_by_id
    field *Candidate::PARAMS_CANDIDATE_NUMBER, type: "long"
    field *Candidate::PARAMS_CANDIDATE_DATE, :created_at, :updated_at, type: "date"
    PARAMS_CANDIDATE_MULTI.each do |param|
      field param, index: "not_analyzed", value: ->{send(param).map &:id}
      field "#{param}_names", analyzer: "custom_analyzer", value: ->{send(param).map &:name}
    end
    PARAMS_CANDIDATE_REFERENCE_NAME.each do |param|
      if masters[param]
        field param, analyzer: "custom_analyzer", value: ->(candidate) do
          if candidate.send(param.to_s.sub("_name", "_id"))
            masters[param][candidate.send(param.to_s.sub("_name", "_id"))]
          else
            ""
          end
        end
      else
        field param, analyzer: "custom_analyzer"
      end
    end

    PARAMS_FITTING_MULTI.each do |param|
      case param
      when :desire_areas
        master_model, middle_model = M::Location, FittingConditionsMArea
      when :desire_occupational_types
        master_model, middle_model = M::JobName, FittingConditionsMOccupationalCategory
      when :desire_business_types
        master_model, middle_model = M::BusinessCategory, FittingConditionsMBusinessCategory
      end
      crutch param do |candidates|
        candidate_ids = candidates.map &:id

        fitting_condition_map = FittingCondition.where(owner_id: candidate_ids).pluck(:id, :owner_id)
          .inject({}) do |hash, fitting_condition|
          hash[fitting_condition[0]] = fitting_condition[1]
          hash
        end 
        data = middle_model.desire_by_candidates(fitting_condition_map.values)
          .inject({}) do |hash, item|
          candidate_id = fitting_condition_map[item.fitting_condition_id]
          master_item = master_model.find_by id: item.master_data_id
          hash[candidate_id] ||= []
          hash[candidate_id] << master_item

          hash
        end

      end
      field param, index: "not_analyzed", value: ->(candidate, crutches) do
        if crutches.send(param)[candidate.id]
          crutches.send(param)[candidate.id].compact!
          crutches.send(param)[candidate.id].map &:id
        end
      end
      field "#{param}_names", analyzer: "custom_analyzer", value: ->(candidate, crutches) do
        if crutches.send(param)[candidate.id]
          crutches.send(param)[candidate.id].map &:name
        end
      end
    end

    field :desire_income, type: "long", value: ->{fitting_condition_desire_income}

    field :number_interview_jobs, value: ->(candidate) do
      interview_job_masters[candidate.id]
    end
  end

I dont have any binding in application

pyromaniac commented 8 years ago

Ah, I see it now in your previous post. You probably have to get rid of those external iterator in type definition and do everything in copy-paste manner

dang-hoang-hieu commented 8 years ago

okay I will try it now!

pyromaniac commented 8 years ago

Yeah, and I'll try to fix Witchcraft to support such a cases

pyromaniac commented 8 years ago

Not sure if it is possible however

dang-hoang-hieu commented 8 years ago

yep :( copy and paste such hundreds of fields scare me a lot :-s

pyromaniac commented 8 years ago

Hey, try the latest master, but you have to fix some fields to use arguments in procs like this:

field param, index: "not_analyzed", value: ->(o) { o.send(param).map &:id }
dang-hoang-hieu commented 8 years ago

oki I will try it thanks!

dang-hoang-hieu commented 8 years ago

@pyromaniac :dancers: it worked, faster 40% with my application (y)

pyromaniac commented 8 years ago

Haha, fokken awesome! Witchcraft FTW. But check everything carefully. Like that documents are generated correctly. Also sacrifice a goat please.

dang-hoang-hieu commented 8 years ago

:beers: :+1: :laughing: