potassco / clingo

🤔 A grounder and solver for logic programs.
https://potassco.org/clingo
MIT License
599 stars 79 forks source link

Thread-safety of clingo_ground_callback #412

Closed RenatoGeh closed 1 year ago

RenatoGeh commented 1 year ago

Hi,

I was wondering whether calling clingo_control_ground with a clingo_ground_callback_t was supposed to be thread-safe. I'm running several clingo_control_t's in parallel, one for each thread, and grounding with a clingo_ground_callback_t with no memory sharing of programs between threads. I'm getting non-deterministic segfaults together with

clingo | error code 0: <block>:1:7-68: info: operation undefined
  function 'externalfunc' not found

(where externalfunc is the external function to be injected in clingo_ground_callback_t) errors.

To be more precise on this (seemingly) thread-safety problem itself, gdb points to clingo_ground_callback_t being called on the wrong thread. For instance, suppose I have two threads $T_1$ and $T_2$, each with its own clingo_control_t, and callbacks $f_1$ and $f_2$. When running in parallel, it seems clingo_control_ground sometimes calls $f_2$ for $T_1$ instead of $f_1$ (and vice-versa).

Adding mutex locks before and after clingo_control_ground calls seems to fix this issue.

I tried to write a minimal working example of this problem but I couldn't replicate the error, which is weird since it seemed to be thread-safe in the MWE but not in my use-case (even though the issue was fixed with mutex locks in my code). I've been debugging this issue for weeks now. Please help me keep my sanity. :)

Thanks

rkaminsk commented 1 year ago

A control object should not be accessed simultaneously from different threads. However, two threads can access two different control objects in parallel. There is shared state, the symbols, but access to them is locked and thread safe.

I do not see how it is possible that two different callbacks can be mixed up. They are not stored in global state.

RenatoGeh commented 1 year ago

Hi again.

I managed to reproduce the issue with the minimal working example below.

Essentially, what I'm doing is creating a separate clingo_control_t for each thread and running some mockup task on the program after grounding with a callback. There is no clingo_control_t sharing and there shouldn't be any memory sharing among threads.

I compiled the minimal working example with gcc.

What's interesting is, if one compiles with -D N_T=1 as a flag, which tells the program to run with a single thread sequentially, then everything works perfectly. If one compiles with -D N_T=n, where $n > 1$ ($n$ is the number of threads to be run in parallel), grounding breaks and error

clingo | error code 0: <block>:1:7-68: info: operation undefined
  function 'call' not found

appears again.

What am I doing wrong here?

Thanks


#include <clingo.h>
#include <stdio.h>
#include <stdbool.h>
#include <pthread.h>
#include <string.h>

/********************** Start of model printing functions *********************/
#include <stdlib.h>
#include <inttypes.h>

typedef struct model_buffer {
  clingo_symbol_t *symbols;
  size_t           symbols_n;
  char            *string;
  size_t           string_n;
} model_buffer_t;

void free_model_buffer(model_buffer_t *buf) {
  if (buf->symbols) {
    free(buf->symbols);
    buf->symbols   = NULL;
    buf->symbols_n = 0;
  }
  if (buf->string) {
    free(buf->string);
    buf->string   = NULL;
    buf->string_n = 0;
  }
}

bool print_symbol(clingo_symbol_t symbol, model_buffer_t *buf) {
  bool ret = true;
  char *string;
  size_t n;
  // determine size of the string representation of the next symbol in the model
  if (!clingo_symbol_to_string_size(symbol, &n)) { goto error; }
  if (buf->string_n < n) {
    // allocate required memory to hold the symbol's string
    if (!(string = (char*)realloc(buf->string, sizeof(*buf->string) * n))) {
      clingo_set_error(clingo_error_bad_alloc, "could not allocate memory for symbol's string");
      goto error;
    }
    buf->string   = string;
    buf->string_n = n;
  }
  // retrieve the symbol's string
  if (!clingo_symbol_to_string(symbol, buf->string, n)) { goto error; }
  printf("%s", buf->string);
  goto out;
error:
  ret = false;
out:
  return ret;
}

bool print_model(const clingo_model_t *model, model_buffer_t *buf, const char *label, clingo_show_type_bitset_t show) {
  bool ret = true;
  clingo_symbol_t *symbols;
  size_t n;
  const clingo_symbol_t *it, *ie;
  // determine the number of (shown) symbols in the model
  if (!clingo_model_symbols_size(model, show, &n)) { goto error; }
  // allocate required memory to hold all the symbols
  if (buf->symbols_n < n) {
    if (!(symbols = (clingo_symbol_t*)malloc(sizeof(*buf->symbols) * n))) {
      clingo_set_error(clingo_error_bad_alloc, "could not allocate memory for atoms");
      goto error;
    }
    buf->symbols   = symbols;
    buf->symbols_n = n;
  }
  // retrieve the symbols in the model
  if (!clingo_model_symbols(model, show, buf->symbols, n)) { goto error; }
  printf("%s:", label);
  for (it = buf->symbols, ie = buf->symbols + n; it != ie; ++it) {
    printf(" ");
    if (!print_symbol(*it, buf)) { goto error; }
  }
  printf("\n");
  goto out;
error:
  ret = false;
out:
  return ret;
}

bool print_solution(const clingo_model_t *model) {
  static pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
  bool ret = true;
  uint64_t number;
  clingo_model_type_t type;
  const char *type_string = "";
  model_buffer_t data = {NULL, 0, NULL, 0};
  // get model type
  if (!clingo_model_type(model, &type)) { goto error; }
  switch ((enum clingo_model_type_e)type) {
    case clingo_model_type_stable_model:          { type_string = "Stable model"; break; }
    case clingo_model_type_brave_consequences:    { type_string = "Brave consequences"; break; }
    case clingo_model_type_cautious_consequences: { type_string = "Cautious consequences"; break; }
  }
  // get running number of model
  if (!clingo_model_number(model, &number)) { goto error; }
  pthread_mutex_lock(&mu);
  printf("%s %" PRIu64 ":\n", type_string, number);
  if (!print_model(model, &data, "  shown", clingo_show_type_shown)) { goto error; }
  if (!print_model(model, &data, "  atoms", clingo_show_type_atoms)) { goto error; }
  if (!print_model(model, &data, "  terms", clingo_show_type_terms)) { goto error; }
  if (!print_model(model, &data, " ~atoms", clingo_show_type_complement
                                         | clingo_show_type_atoms)) { goto error; }
  pthread_mutex_unlock(&mu);
  goto out;
error:
  ret = false;
out:
  free_model_buffer(&data);
  return ret;
}
/********************** End of model printing functions *********************/

#ifndef N_F
/* Number of facts a(1), ..., a(N_F). */
#define N_F 5
#endif
#ifndef N_T
/* Number of threads to be run in parallel. */
#define N_T 4
#endif
/* Max length of atoms a(i). */
#define ATOM_LEN 8
/* Default clingo ground parts. */
const clingo_part_t GROUND_DEFAULT_PARTS[] = {{"base", NULL, 0}};

/* Increment the choice vector by one. */
bool incr_choice(bool choice[N_F], int i) {
  if (i == N_F-1 || incr_choice(choice, i+1)) return !(choice[i] = !choice[i]);
  return 0;
}

/* This logger ignores clingo_warning_atom_undefined. */
void logger(clingo_warning_t code, const char *msg, void *_) {
  if (code == clingo_warning_atom_undefined) return;
  printf("clingo | error code %d: %s\n", code, msg);
}

/* This external function is a no-op. Return the argument as is to be injected. */
bool call(const clingo_location_t *loc, const char *name, const clingo_symbol_t *args,
    size_t argc, void *data, clingo_symbol_callback_t sym_cb, void *sym_data) {
  return sym_cb(args, argc, sym_data);
}

/* Create a new control, mock a grounding with callback. */
clingo_control_t* new_ctrl(bool choice[N_F], char F[N_F][ATOM_LEN]) {
  bool ok = false;
  clingo_control_t *C = NULL;
  if (!clingo_control_new(NULL, 0, logger, NULL, 20, &C)) return NULL;
  /* Add rules with variables with an external function named call. */
  if (!clingo_control_add(C, "base", NULL, 0,
        "f(@call(X)) :- a(X), X \\ 2 = 0.\n"
        "g(@call(X)) :- a(X), X \\ 2 = 1.\n")) goto cleanup;
  /* Add facts according to the current choice. */
  for (int i = 0; i < N_F; ++i)
    if (choice[i])
      if (!clingo_control_add(C, "base", NULL, 0, F[i])) goto cleanup;
  /* Call the grounder. */
  if (!clingo_control_ground(C, GROUND_DEFAULT_PARTS, 1, call, NULL)) goto cleanup;
  ok = true;
cleanup:
  if (!ok) clingo_control_free(C);
  return C;
}

/* Thread data. */
typedef struct {
  bool choice[N_F];        /* Array of each fact being chosen. */
  char F[N_F][ATOM_LEN];   /* Strings of facts a(i). */
  pthread_mutex_t *wakeup; /* Address to wakeup mutex. */
  pthread_cond_t *avail;   /* Address to avail condition variable. */
  bool *busy_procs;        /* Address to table of busy threads. */
  int pid;                 /* Thread ID. */
} data_t;

/* Mockup of a thread function running some task over the models of a program. */
void* routine(void *data) {
  data_t *D = (data_t*) data;
  clingo_control_t *C = new_ctrl(D->choice, D->F);
  int N;
  bool ok = false;
  if (!C) return NULL;

  /* Run solve and iterate models. */ {
    clingo_solve_handle_t *hdl;
    const clingo_model_t *M;

    if (!clingo_control_solve(C, clingo_solve_mode_yield, NULL, 0, NULL, NULL, &hdl)) goto error;

    for (N = 0; true; ++N) {
      if (!clingo_solve_handle_resume(hdl)) goto error;
      if (!clingo_solve_handle_model(hdl, &M)) goto error;
#ifdef PRINT
      if (M) print_solution(M);
      else break;
#else
      if (!M) break;
#endif
    }

    if (!clingo_solve_handle_close(hdl)) goto error;
  }

  ok = true;
  goto cleanup;
error:
  ok = false;
cleanup:
  clingo_control_free(C);
  if (!ok) puts("Error!");
  /* Thread is finished. Notify the busy_procs table and set it as available. */
  pthread_mutex_lock(D->wakeup);
  D->busy_procs[D->pid] = false;
  pthread_cond_signal(D->avail);
  pthread_mutex_unlock(D->wakeup);
  return NULL;
}

/* Retrieve a free thread's ID. */
int retr_free_proc(bool *busy_procs, pthread_mutex_t *wakeup, pthread_cond_t *avail) {
  int i, id = -1;
  pthread_mutex_lock(wakeup);
  /* Keep looking for an available ID. */
  while (true) {
    for (i = 0, id = -1; i < N_T; ++i) if (!busy_procs[i]) { id = i; break; }
    if (id != -1) break;
    pthread_cond_wait(avail, wakeup);
  }
  /* Found one. Set it as busy and return it. */
  busy_procs[id] = true;
  pthread_mutex_unlock(wakeup);
  return id;
}

int main() {
  char F[N_F][ATOM_LEN] = {0};                        /* Facts a(0), ..., a(N_F). */
  bool choice[N_F] = {0};                             /* Choice of picking each a(i) (or not). */
  pthread_t threads[N_T];                             /* Thread pool. */
  data_t D[N_T] = {0};                                /* Data for each thread. */
  bool busy_procs[N_T] = {0};                         /* Whether the i-th thread is busy. */
  pthread_mutex_t wakeup = PTHREAD_MUTEX_INITIALIZER; /* Mutex for accessing busy_procs. */
  pthread_cond_t avail = PTHREAD_COND_INITIALIZER;    /* Signal whether the thread is available. */

  /* Write out each a(i) fact. */
  for (int i = 0; i < N_F; ++i) sprintf(F[i], "a(%d).", i);

  /* Initialize thread data. */
  for (int i = 0; i < N_T; ++i) {
    D[i].avail = &avail; D[i].wakeup = &wakeup;
    D[i].busy_procs = busy_procs; D[i].pid = i;
  }

  /* Iterate through all possible combinations of N_F binary variables. */
  do {
    int id = retr_free_proc(busy_procs, &wakeup, &avail);
    /* Copy the current combination to the thread's data. */
    for (int i = 0; i < N_F; ++i) {
      D[id].choice[i] = choice[i];
      memcpy(D[id].F[i], F[i], ATOM_LEN*sizeof(char));
    }
    /* Assign thread id to this task. */
    pthread_create(&threads[id], NULL, routine, (void*) &D[id]);
  } while(!incr_choice(choice, 0));

  /* Wait for threads to finish. */
  for (int i = 0; i < N_T; ++i) pthread_join(threads[i], NULL);

  return 0;
}
rkaminsk commented 1 year ago

Thanks, I'll have a look.

Update: This is indeed a bug in clingo. There is a global struct involved that is changed when calling ground. It should be relatively easy to avoid changing this struct. I'll link a PR once I start working on this.

RenatoGeh commented 1 year ago

Update: This is indeed a bug in clingo. There is a global struct involved that is changed when calling ground. It should be relatively easy to avoid changing this struct. I'll link a PR once I start working on this.

Awesome, thanks!

rkaminsk commented 1 year ago

The problem should be fixed in the linked PR above. Thanks for providing the MWE. I would not have found this bug without an example.