/*
 *
 * Copyright 2015, Google Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *     * Redistributions of source code must retain the above copyright
 * notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above
 * copyright notice, this list of conditions and the following disclaimer
 * in the documentation and/or other materials provided with the
 * distribution.
 *     * Neither the name of Google Inc. nor the names of its
 * contributors may be used to endorse or promote products derived from
 * this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

#include "src/core/channel/child_channel.h"
#include "src/core/iomgr/iomgr.h"
#include <grpc/support/alloc.h>

/* Link back filter: passes up calls to the client channel, pushes down calls
   down */

static void maybe_destroy_channel(grpc_child_channel *channel);

typedef struct {
  gpr_mu mu;
  gpr_cv cv;
  grpc_channel_element *back;
  /* # of active calls on the channel */
  gpr_uint32 active_calls;
  /* has grpc_child_channel_destroy been called? */
  gpr_uint8 destroyed;
  /* has the transport reported itself disconnected? */
  gpr_uint8 disconnected;
  /* are we calling 'back' - our parent channel */
  gpr_uint8 calling_back;
  /* have we or our parent sent goaway yet? - dup suppression */
  gpr_uint8 sent_goaway;
  /* are we currently sending farewell (in this file: goaway + disconnect) */
  gpr_uint8 sending_farewell;
  /* have we sent farewell (goaway + disconnect) */
  gpr_uint8 sent_farewell;
} lb_channel_data;

typedef struct {
  grpc_call_element *back;
  grpc_child_channel *channel;
} lb_call_data;

static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
                       grpc_call_op *op) {
  lb_call_data *calld = elem->call_data;

  switch (op->dir) {
    case GRPC_CALL_UP:
      calld->back->filter->call_op(calld->back, elem, op);
      break;
    case GRPC_CALL_DOWN:
      grpc_call_next_op(elem, op);
      break;
  }
}

/* Currently we assume all channel operations should just be pushed up. */
static void lb_channel_op(grpc_channel_element *elem,
                          grpc_channel_element *from_elem,
                          grpc_channel_op *op) {
  lb_channel_data *chand = elem->channel_data;
  grpc_channel_element *back;
  int calling_back = 0;

  switch (op->dir) {
    case GRPC_CALL_UP:
      gpr_mu_lock(&chand->mu);
      back = chand->back;
      if (back) {
        chand->calling_back++;
        calling_back = 1;
      }
      gpr_mu_unlock(&chand->mu);
      if (back) {
        back->filter->channel_op(chand->back, elem, op);
      } else if (op->type == GRPC_TRANSPORT_GOAWAY) {
        gpr_slice_unref(op->data.goaway.message);
      }
      break;
    case GRPC_CALL_DOWN:
      grpc_channel_next_op(elem, op);
      break;
  }

  gpr_mu_lock(&chand->mu);
  switch (op->type) {
    case GRPC_TRANSPORT_CLOSED:
      chand->disconnected = 1;
      maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
      break;
    case GRPC_CHANNEL_GOAWAY:
      chand->sent_goaway = 1;
      break;
    case GRPC_CHANNEL_DISCONNECT:
    case GRPC_TRANSPORT_GOAWAY:
    case GRPC_ACCEPT_CALL:
      break;
  }

  if (calling_back) {
    chand->calling_back--;
    gpr_cv_signal(&chand->cv);
    maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
  }
  gpr_mu_unlock(&chand->mu);
}

/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
                              const void *server_transport_data) {}

/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}

/* Constructor for channel_data */
static void lb_init_channel_elem(grpc_channel_element *elem,
                                 const grpc_channel_args *args,
                                 grpc_mdctx *metadata_context, int is_first,
                                 int is_last) {
  lb_channel_data *chand = elem->channel_data;
  GPR_ASSERT(is_first);
  GPR_ASSERT(!is_last);
  gpr_mu_init(&chand->mu);
  gpr_cv_init(&chand->cv);
  chand->back = NULL;
  chand->destroyed = 0;
  chand->disconnected = 0;
  chand->active_calls = 0;
  chand->sent_goaway = 0;
  chand->calling_back = 0;
  chand->sending_farewell = 0;
  chand->sent_farewell = 0;
}

/* Destructor for channel_data */
static void lb_destroy_channel_elem(grpc_channel_element *elem) {
  lb_channel_data *chand = elem->channel_data;
  gpr_mu_destroy(&chand->mu);
  gpr_cv_destroy(&chand->cv);
}

const grpc_channel_filter grpc_child_channel_top_filter = {
    lb_call_op,           lb_channel_op,           sizeof(lb_call_data),
    lb_init_call_elem,    lb_destroy_call_elem,    sizeof(lb_channel_data),
    lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };

/* grpc_child_channel proper */

#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \
  grpc_channel_stack_element((channel), 0)

#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)

static void finally_destroy_channel(void *c, int success) {
  /* ignore success or not... this is a destruction callback and will only
     happen once - the only purpose here is to release resources */
  grpc_child_channel *channel = c;
  lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
  /* wait for the initiator to leave the mutex */
  gpr_mu_lock(&chand->mu);
  gpr_mu_unlock(&chand->mu);
  grpc_channel_stack_destroy(channel);
  gpr_free(channel);
}

static void send_farewells(void *c, int success) {
  grpc_child_channel *channel = c;
  grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
  lb_channel_data *chand = lbelem->channel_data;
  int send_goaway;
  grpc_channel_op op;

  gpr_mu_lock(&chand->mu);
  send_goaway = !chand->sent_goaway;
  chand->sent_goaway = 1;
  gpr_mu_unlock(&chand->mu);

  if (send_goaway) {
    op.type = GRPC_CHANNEL_GOAWAY;
    op.dir = GRPC_CALL_DOWN;
    op.data.goaway.status = GRPC_STATUS_OK;
    op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
    grpc_channel_next_op(lbelem, &op);
  }

  op.type = GRPC_CHANNEL_DISCONNECT;
  op.dir = GRPC_CALL_DOWN;
  grpc_channel_next_op(lbelem, &op);

  gpr_mu_lock(&chand->mu);
  chand->sending_farewell = 0;
  chand->sent_farewell = 1;
  maybe_destroy_channel(channel);
  gpr_mu_unlock(&chand->mu);
}

static void maybe_destroy_channel(grpc_child_channel *channel) {
  lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
  if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
      !chand->sending_farewell && !chand->calling_back) {
    grpc_iomgr_add_callback(finally_destroy_channel, channel);
  } else if (chand->destroyed && !chand->disconnected &&
             chand->active_calls == 0 && !chand->sending_farewell &&
             !chand->sent_farewell) {
    chand->sending_farewell = 1;
    grpc_iomgr_add_callback(send_farewells, channel);
  }
}

grpc_child_channel *grpc_child_channel_create(
    grpc_channel_element *parent, const grpc_channel_filter **filters,
    size_t filter_count, const grpc_channel_args *args,
    grpc_mdctx *metadata_context) {
  grpc_channel_stack *stk =
      gpr_malloc(grpc_channel_stack_size(filters, filter_count));
  lb_channel_data *lb;

  grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk);

  lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data;
  gpr_mu_lock(&lb->mu);
  lb->back = parent;
  gpr_mu_unlock(&lb->mu);

  return stk;
}

void grpc_child_channel_destroy(grpc_child_channel *channel,
                                int wait_for_callbacks) {
  grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
  lb_channel_data *chand = lbelem->channel_data;

  gpr_mu_lock(&chand->mu);
  while (wait_for_callbacks && chand->calling_back) {
    gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
  }

  chand->back = NULL;
  chand->destroyed = 1;
  maybe_destroy_channel(channel);
  gpr_mu_unlock(&chand->mu);
}

void grpc_child_channel_handle_op(grpc_child_channel *channel,
                                  grpc_channel_op *op) {
  grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op);
}

grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
                                                grpc_call_element *parent) {
  grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
  grpc_call_element *lbelem;
  lb_call_data *lbcalld;
  lb_channel_data *lbchand;

  grpc_call_stack_init(channel, NULL, stk);
  lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
  lbchand = lbelem->channel_data;
  lbcalld = lbelem->call_data;
  lbcalld->back = parent;
  lbcalld->channel = channel;

  gpr_mu_lock(&lbchand->mu);
  lbchand->active_calls++;
  gpr_mu_unlock(&lbchand->mu);

  return stk;
}

void grpc_child_call_destroy(grpc_child_call *call) {
  grpc_call_element *lbelem = LINK_BACK_ELEM_FROM_CALL(call);
  lb_call_data *calld = lbelem->call_data;
  lb_channel_data *chand = lbelem->channel_data;
  grpc_child_channel *channel = calld->channel;
  grpc_call_stack_destroy(call);
  gpr_free(call);
  gpr_mu_lock(&chand->mu);
  chand->active_calls--;
  maybe_destroy_channel(channel);
  gpr_mu_unlock(&chand->mu);
}

grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) {
  return LINK_BACK_ELEM_FROM_CALL(call);
}
