package AnyEvent::Twitter::Stream;
use strict;
use 5.008_001;
our $VERSION = '0.28';
use AnyEvent;
use AnyEvent::HTTP;
use AnyEvent::Util;
use MIME::Base64;
use URI;
use URI::Escape;
use Carp;
use Compress::Raw::Zlib;
our $STREAMING_SERVER = 'stream.twitter.com';
our $USERSTREAM_SERVER = 'userstream.twitter.com';
our $SITESTREAM_SERVER = 'sitestream.twitter.com';
our $PROTOCOL = 'https';
our $US_PROTOCOL = 'https'; # for testing
my %methods = (
filter => [ POST => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/filter.json" } ],
sample => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/sample.json" } ],
firehose => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/firehose.json" } ],
userstream => [ GET => sub { "$US_PROTOCOL://$USERSTREAM_SERVER/1.1/user.json" } ],
sitestream => [ GET => sub { "$PROTOCOL://$SITESTREAM_SERVER/1.1/site.json" } ],
# DEPRECATED
links => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/links.json" } ],
retweet => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/retweet.json" } ],
);
sub new {
my $class = shift;
my %args = @_;
my $username = delete $args{username};
my $password = delete $args{password};
my $consumer_key = delete $args{consumer_key};
my $consumer_secret = delete $args{consumer_secret};
my $token = delete $args{token};
my $token_secret = delete $args{token_secret};
my $method = delete $args{method};
my $on_connect = delete $args{on_connect} || sub { };
my $on_tweet = delete $args{on_tweet};
my $on_error = delete $args{on_error} || sub { die @_ };
my $on_eof = delete $args{on_eof} || sub { };
my $on_keepalive = delete $args{on_keepalive} || sub { };
my $on_delete = delete $args{on_delete};
my $on_friends = delete $args{on_friends};
my $on_direct_message = delete $args{on_direct_message};
my $on_event = delete $args{on_event};
my $timeout = delete $args{timeout};
my $decode_json;
unless (delete $args{no_decode_json}) {
require JSON;
$decode_json = 1;
}
my ($zlib, $_zstatus);
if (delete $args{use_compression}){
($zlib, $_zstatus) = Compress::Raw::Zlib::Inflate->new(
-LimitOutput => 1,
-AppendOutput => 1,
-WindowBits => WANT_GZIP_OR_ZLIB,
);
die "Can't make inflator: $_zstatus" unless $zlib;
}
unless ($methods{$method} || exists $args{api_url} ) {
$on_error->("Method $method not available.");
return;
}
my $uri = URI->new(delete $args{api_url} || $methods{$method}[1]());
my $request_body;
my $request_method = delete $args{request_method} || $methods{$method}[0] || 'GET';
if ( $request_method eq 'POST' ) {
$request_body = join '&', map "$_=" . URI::Escape::uri_escape_utf8($args{$_}), keys %args;
}else{
$uri->query_form(%args);
}
my $auth;
if ($consumer_key) {
eval {require Net::OAuth;};
die $@ if $@;
my $request = Net::OAuth->request('protected resource')->new(
version => '1.0',
consumer_key => $consumer_key,
consumer_secret => $consumer_secret,
token => $token,
token_secret => $token_secret,
request_method => $request_method,
signature_method => 'HMAC-SHA1',
timestamp => time,
nonce => MIME::Base64::encode( time . $$ . rand ),
request_url => $uri,
$request_method eq 'POST' ? (extra_params => \%args) : (),
);
$request->sign;
$auth = $request->to_authorization_header;
}else{
$auth = "Basic ".MIME::Base64::encode("$username:$password", '');
}
my $self = bless {}, $class;
{
Scalar::Util::weaken(my $self = $self);
my $set_timeout = $timeout
? sub { $self->{timeout} = AE::timer($timeout, 0, sub { $on_error->('timeout') }) }
: sub {};
my $on_json_message = sub {
my ($json) = @_;
# Twitter stream returns "\x0a\x0d\x0a" if there's no matched tweets in ~30s.
$set_timeout->();
if ($json !~ /^\s*$/) {
my $tweet = $decode_json ? JSON::decode_json($json) : $json;
if ($on_delete && $tweet->{delete} && $tweet->{delete}->{status}) {
$on_delete->($tweet->{delete}->{status}->{id}, $tweet->{delete}->{status}->{user_id});
}elsif($on_friends && $tweet->{friends}) {
$on_friends->($tweet->{friends});
}elsif($on_direct_message && $tweet->{direct_message}) {
$on_direct_message->($tweet->{direct_message});
}elsif($on_event && $tweet->{event}) {
$on_event->($tweet);
}else{
$on_tweet->($tweet);
}
}
else {
$on_keepalive->();
}
};
$set_timeout->();
$self->{connection_guard} = http_request($request_method, $uri,
headers => {
Accept => '*/*',
( defined $zlib ? ('Accept-Encoding' => 'deflate, gzip') : ()),
Authorization => $auth,
($request_method eq 'POST'
? ('Content-Type' => 'application/x-www-form-urlencoded')
: ()
),
},
body => $request_body,
on_header => sub {
my($headers) = @_;
if ($headers->{Status} ne '200') {
$on_error->("$headers->{Status}: $headers->{Reason}");
return;
}
return 1;
},
want_body_handle => 1, # for some reason on_body => sub {} doesn't work :/
sub {
my ($handle, $headers) = @_;
return unless $handle;
my $input;
my $chunk_reader;
$chunk_reader = sub {
my ($handle, $line) = @_;
$line =~ /^([0-9a-fA-F]+)/ or die 'bad chunk (incorrect length)';
my $len = hex $1;
my $chunk_part_reader;
$chunk_part_reader = sub {
my ($handle, $chunk_raw) = @_;
my $chunk = $chunk_raw;
$chunk =~ s/\r\n$//;
$input .= $chunk;
unless ($headers->{'content-encoding'}) {
while ($input =~ s/^(.*?)\r\n//) {
my ($json_raw) = $1;
$on_json_message->($json_raw);
}
} elsif ($headers->{'content-encoding'} =~ 'deflate|gzip') {
my ($message);
do {
$_zstatus = $zlib->inflate(\$input, \$message);
return unless $_zstatus == Z_OK || $_zstatus == Z_BUF_ERROR;
} while ( $_zstatus == Z_OK && length $input );
$on_json_message->($message);
} else {
die "Don't know how to decode $headers->{'content-encoding'}"
}
$handle->push_read(line => $chunk_reader);
}; # chunk_part_reader
$handle->push_read(chunk => $len + 2, $chunk_part_reader);
};
my $line_reader = sub {
my ($handle, $line) = @_;
$on_json_message->($line);
};
$handle->on_error(sub {
undef $handle;
$on_error->($_[2]);
});
$handle->on_eof(sub {
undef $handle;
$on_eof->(@_);
});
if (($headers->{'transfer-encoding'} || '') =~ /\bchunked\b/i) {
$handle->on_read(sub {
my ($handle) = @_;
$handle->push_read(line => $chunk_reader);
});
} else {
$handle->on_read(sub {
my ($handle) = @_;
$handle->push_read(line => $line_reader);
});
}
$self->{guard} = AnyEvent::Util::guard {
$handle->destroy if $handle;
};
$on_connect->();
}
);
}
return $self;
}
1;
__END__
=encoding utf-8
=for stopwords
API AnyEvent
=for test_synopsis
my($user, $password, @following_ids, $consumer_key, $consumer_secret, $token, $token_secret);
=head1 NAME
AnyEvent::Twitter::Stream - Receive Twitter streaming API in an event loop
=head1 SYNOPSIS
use AnyEvent::Twitter::Stream;
my $done = AE::cv;
# receive updates from @following_ids
my $listener = AnyEvent::Twitter::Stream->new(
username => $user,
password => $password,
method => "filter", # "firehose" for everything, "sample" for sample timeline
follow => join(",", @following_ids), # numeric IDs
on_tweet => sub {
my $tweet = shift;
warn "$tweet->{user}{screen_name}: $tweet->{text}\n";
},
on_keepalive => sub {
warn "ping\n";
},
on_delete => sub {
my ($tweet_id, $user_id) = @_; # callback executed when twitter send a delete notification
...
},
timeout => 45,
);
# track keywords
my $guard = AnyEvent::Twitter::Stream->new(
username => $user,
password => $password,
method => "filter",
track => "Perl,Test,Music",
on_tweet => sub { },
);
# to use OAuth authentication
my $listener = AnyEvent::Twitter::Stream->new(
consumer_key => $consumer_key,
consumer_secret => $consumer_secret,
token => $token,
token_secret => $token_secret,
method => "filter",
track => "...",
on_tweet => sub { ... },
);
$done->recv;
=head1 DESCRIPTION
AnyEvent::Twitter::Stream is an AnyEvent user to receive Twitter streaming
API, available at L<http://dev.twitter.com/pages/streaming_api> and
L<http://dev.twitter.com/pages/user_streams>.
See L<eg/track.pl> for more client code example.
=head1 METHODS
=head2 my $streamer = AnyEvent::Twitter::Stream->new(%args);
=over 4
=item B<username> B<password>
These arguments are used for basic authentication.
=item B<consumer_key> B<consumer_secret> B<token> B<token_secret>
If you want to use the OAuth authentication mechanism, you need to set these arguments
=item B<method>
The name of the method you want to use on the stream. Currently, any one of :
=over 2
=item B<firehose>
=item B<sample>
=item B<userstream>
To use this method, you need to use the OAuth mechanism.
=item B<filter>
With this method you can specify what you want to filter amongst B<track>, B<follow> and B<locations>.
See L<https://dev.twitter.com/docs/api/1.1/post/statuses/filter> for the details of the parameters.
=back
=item B<api_url>
Pass this to override the default URL for the API endpoint.
=item B<request_method>
Pass this to override the default HTTP request method.
=item B<timeout>
Set the timeout value.
=item B<on_connect>
Callback to execute when a stream is connected.
=item B<on_tweet>
Callback to execute when a new tweet is received. The argument is the tweet, a hashref documented at
L<https://dev.twitter.com/docs/api/1/get/statuses/show/%3Aid>.
=item B<on_error>
=item B<on_eof>
=item B<on_keepalive>
=item B<on_delete>
Callback to execute when the stream send a delete notification.
=item B<on_friends>
B<Only with the usertream method>. Callback to execute when the stream send a list of friends.
=item B<on_direct_message>
B<Only with the usertream method>. Callback to execute when a direct message is received in the stream.
=item B<on_event>
B<Only with the userstream method>. Callback to execute when the stream send an event notification (follow, ...).
=item B<additional agruments>
Any additional arguments are assumed to be parameters to the underlying API method and are passed to Twitter.
=back
=head1 NOTES
To use the B<userstream> method, Twitter recommend using the HTTPS protocol. For this, you need to set the B<ANYEVENT_TWITTER_STREAM_SSL> environment variable, and install the L<Net::SSLeay> module.
=head1 AUTHOR
Tatsuhiko Miyagawa E<lt>miyagawa@bulknews.netE<gt>
=head1 LICENSE
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=head1 SEE ALSO
L<AnyEvent::Twitter>, L<Net::Twitter::Stream>
=cut