79 | 79 |
parser_state = eredis_parser:init(),
|
80 | 80 |
queue = queue:new()},
|
81 | 81 |
|
82 | |
case connect(State) of
|
83 | |
{ok, NewState} ->
|
84 | |
{ok, NewState};
|
85 | |
{error, Reason} ->
|
86 | |
{stop, Reason}
|
|
82 |
case ReconnectSleep of
|
|
83 |
no_reconnect ->
|
|
84 |
case connect(State) of
|
|
85 |
{ok, _NewState} = Res -> Res;
|
|
86 |
{error, Reason} -> {stop, Reason}
|
|
87 |
end;
|
|
88 |
T when is_integer(T) ->
|
|
89 |
self() ! initiate_connection,
|
|
90 |
{ok, State}
|
87 | 91 |
end.
|
88 | 92 |
|
89 | 93 |
handle_call({request, Req}, From, State) ->
|
|
142 | 146 |
%% clients. If desired, spawn of a new process which will try to reconnect and
|
143 | 147 |
%% notify us when Redis is ready. In the meantime, we can respond with
|
144 | 148 |
%% an error message to all our clients.
|
145 | |
handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect,
|
146 | |
queue = Queue} = State) ->
|
147 | |
reply_all({error, tcp_closed}, Queue),
|
148 | |
%% If we aren't going to reconnect, then there is nothing else for
|
149 | |
%% this process to do.
|
150 | |
{stop, normal, State#state{socket = undefined}};
|
151 | |
|
152 | |
handle_info({tcp_closed, _Socket}, #state{queue = Queue} = State) ->
|
153 | |
Self = self(),
|
154 | |
spawn(fun() -> reconnect_loop(Self, State) end),
|
155 | |
|
156 | |
%% tell all of our clients what has happened.
|
157 | |
reply_all({error, tcp_closed}, Queue),
|
158 | |
|
159 | |
%% Throw away the socket and the queue, as we will never get a
|
160 | |
%% response to the requests sent on the old socket. The absence of
|
161 | |
%% a socket is used to signal we are "down"
|
162 | |
{noreply, State#state{socket = undefined, queue = queue:new()}};
|
|
149 |
handle_info({tcp_closed, _Socket}, State) ->
|
|
150 |
maybe_reconnect(tcp_closed, State);
|
163 | 151 |
|
164 | 152 |
%% Redis is ready to accept requests, the given Socket is a socket
|
165 | 153 |
%% already connected and authenticated.
|
|
170 | 158 |
%% that Poolboy uses to manage the connections.
|
171 | 159 |
handle_info(stop, State) ->
|
172 | 160 |
{stop, shutdown, State};
|
|
161 |
|
|
162 |
handle_info(initiate_connection, #state{socket = undefined} = State) ->
|
|
163 |
case connect(State) of
|
|
164 |
{ok, NewState} ->
|
|
165 |
{noreply, NewState};
|
|
166 |
{error, Reason} ->
|
|
167 |
maybe_reconnect(Reason, State)
|
|
168 |
end;
|
173 | 169 |
|
174 | 170 |
handle_info(_Info, State) ->
|
175 | 171 |
{stop, {unhandled_message, _Info}, State}.
|
|
264 | 260 |
queue:in_r({N - 1, From, [Value | Replies]}, NewQueue);
|
265 | 261 |
{empty, Queue} ->
|
266 | 262 |
%% Oops
|
267 | |
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
|
268 | |
throw(empty_queue)
|
|
263 |
error_logger:info_msg("eredis: Nothing in queue, but got value from parser~n"),
|
|
264 |
exit(empty_queue)
|
269 | 265 |
end.
|
270 | 266 |
|
271 | 267 |
%% @doc Send `Value' to each client in queue. Only useful for sending
|
|
296 | 292 |
try erlang:send(Pid, Value)
|
297 | 293 |
catch
|
298 | 294 |
Err:Reason ->
|
299 | |
error_logger:info_msg("Failed to send message to ~p with reason ~p~n", [Pid, {Err, Reason}])
|
|
295 |
error_logger:info_msg("eredis: Failed to send message to ~p with reason ~p~n", [Pid, {Err, Reason}])
|
300 | 296 |
end.
|
301 | 297 |
|
302 | 298 |
%% @doc: Helper for connecting to Redis, authenticating and selecting
|
|
305 | 301 |
%% {SomeError, Reason}.
|
306 | 302 |
connect(State) ->
|
307 | 303 |
{ok, {AFamily, Addr}} = get_addr(State#state.host),
|
308 | |
case gen_tcp:connect(Addr, State#state.port,
|
|
304 |
Port = case AFamily of
|
|
305 |
local -> 0;
|
|
306 |
_ -> State#state.port
|
|
307 |
end,
|
|
308 |
case gen_tcp:connect(Addr, Port,
|
309 | 309 |
[AFamily | ?SOCKET_OPTS], State#state.connect_timeout) of
|
310 | 310 |
{ok, Socket} ->
|
311 | 311 |
case authenticate(Socket, State#state.password) of
|
|
323 | 323 |
{error, {connection_error, Reason}}
|
324 | 324 |
end.
|
325 | 325 |
|
|
326 |
get_addr({local, Path}) ->
|
|
327 |
{ok, {local, {local, Path}}};
|
326 | 328 |
get_addr(Hostname) ->
|
327 | 329 |
case inet:parse_address(Hostname) of
|
328 | 330 |
{ok, {_,_,_,_} = Addr} -> {ok, {inet, Addr}};
|
|
367 | 369 |
{error, Reason} ->
|
368 | 370 |
{error, Reason}
|
369 | 371 |
end.
|
|
372 |
|
|
373 |
maybe_reconnect(Reason, #state{reconnect_sleep = no_reconnect, queue = Queue} = State) ->
|
|
374 |
reply_all({error, Reason}, Queue),
|
|
375 |
%% If we aren't going to reconnect, then there is nothing else for
|
|
376 |
%% this process to do.
|
|
377 |
{stop, normal, State#state{socket = undefined}};
|
|
378 |
maybe_reconnect(Reason, #state{queue = Queue} = State) ->
|
|
379 |
error_logger:error_msg("eredis: Re-establishing connection to ~p:~p due to ~p",
|
|
380 |
[State#state.host, State#state.port, Reason]),
|
|
381 |
Self = self(),
|
|
382 |
spawn_link(fun() -> reconnect_loop(Self, State) end),
|
|
383 |
|
|
384 |
%% tell all of our clients what has happened.
|
|
385 |
reply_all({error, Reason}, Queue),
|
|
386 |
|
|
387 |
%% Throw away the socket and the queue, as we will never get a
|
|
388 |
%% response to the requests sent on the old socket. The absence of
|
|
389 |
%% a socket is used to signal we are "down"
|
|
390 |
{noreply, State#state{socket = undefined, queue = queue:new()}}.
|
370 | 391 |
|
371 | 392 |
%% @doc: Loop until a connection can be established, this includes
|
372 | 393 |
%% successfully issuing the auth and select calls. When we have a
|